PlaylistFetcher.cpp revision 964adb17885185808398507d2de88665fe193ee2
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll;
53// LCM of 188 (size of a TS packet) & 1k works well
54const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
55const int32_t PlaylistFetcher::kNumSkipFrames = 5;
56
57PlaylistFetcher::PlaylistFetcher(
58        const sp<AMessage> &notify,
59        const sp<LiveSession> &session,
60        const char *uri,
61        int32_t subtitleGeneration)
62    : mNotify(notify),
63      mStartTimeUsNotify(notify->dup()),
64      mSession(session),
65      mURI(uri),
66      mStreamTypeMask(0),
67      mStartTimeUs(-1ll),
68      mSegmentStartTimeUs(-1ll),
69      mDiscontinuitySeq(-1ll),
70      mStartTimeUsRelative(false),
71      mLastPlaylistFetchTimeUs(-1ll),
72      mSeqNumber(-1),
73      mNumRetries(0),
74      mStartup(true),
75      mAdaptive(false),
76      mPrepared(false),
77      mNextPTSTimeUs(-1ll),
78      mMonitorQueueGeneration(0),
79      mSubtitleGeneration(subtitleGeneration),
80      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
81      mFirstPTSValid(false),
82      mAbsoluteTimeAnchorUs(0ll),
83      mVideoBuffer(new AnotherPacketSource(NULL)) {
84    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
85    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
86    mStartTimeUsNotify->setInt32("streamMask", 0);
87}
88
89PlaylistFetcher::~PlaylistFetcher() {
90}
91
92int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
93    CHECK(mPlaylist != NULL);
94
95    int32_t firstSeqNumberInPlaylist;
96    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
97                "media-sequence", &firstSeqNumberInPlaylist)) {
98        firstSeqNumberInPlaylist = 0;
99    }
100
101    int32_t lastSeqNumberInPlaylist =
102        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
103
104    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
105    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
106
107    int64_t segmentStartUs = 0ll;
108    for (int32_t index = 0;
109            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
110        sp<AMessage> itemMeta;
111        CHECK(mPlaylist->itemAt(
112                    index, NULL /* uri */, &itemMeta));
113
114        int64_t itemDurationUs;
115        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
116
117        segmentStartUs += itemDurationUs;
118    }
119
120    return segmentStartUs;
121}
122
123int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
124    int64_t nowUs = ALooper::GetNowUs();
125
126    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
127        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
128        return 0ll;
129    }
130
131    if (mPlaylist->isComplete()) {
132        return (~0llu >> 1);
133    }
134
135    int32_t targetDurationSecs;
136    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
137
138    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
139
140    int64_t minPlaylistAgeUs;
141
142    switch (mRefreshState) {
143        case INITIAL_MINIMUM_RELOAD_DELAY:
144        {
145            size_t n = mPlaylist->size();
146            if (n > 0) {
147                sp<AMessage> itemMeta;
148                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
149
150                int64_t itemDurationUs;
151                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
152
153                minPlaylistAgeUs = itemDurationUs;
154                break;
155            }
156
157            // fall through
158        }
159
160        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
161        {
162            minPlaylistAgeUs = targetDurationUs / 2;
163            break;
164        }
165
166        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
167        {
168            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
169            break;
170        }
171
172        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
173        {
174            minPlaylistAgeUs = targetDurationUs * 3;
175            break;
176        }
177
178        default:
179            TRESPASS();
180            break;
181    }
182
183    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
184    return delayUs > 0ll ? delayUs : 0ll;
185}
186
187status_t PlaylistFetcher::decryptBuffer(
188        size_t playlistIndex, const sp<ABuffer> &buffer,
189        bool first) {
190    sp<AMessage> itemMeta;
191    bool found = false;
192    AString method;
193
194    for (ssize_t i = playlistIndex; i >= 0; --i) {
195        AString uri;
196        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
197
198        if (itemMeta->findString("cipher-method", &method)) {
199            found = true;
200            break;
201        }
202    }
203
204    if (!found) {
205        method = "NONE";
206    }
207    buffer->meta()->setString("cipher-method", method.c_str());
208
209    if (method == "NONE") {
210        return OK;
211    } else if (!(method == "AES-128")) {
212        ALOGE("Unsupported cipher method '%s'", method.c_str());
213        return ERROR_UNSUPPORTED;
214    }
215
216    AString keyURI;
217    if (!itemMeta->findString("cipher-uri", &keyURI)) {
218        ALOGE("Missing key uri");
219        return ERROR_MALFORMED;
220    }
221
222    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
223
224    sp<ABuffer> key;
225    if (index >= 0) {
226        key = mAESKeyForURI.valueAt(index);
227    } else {
228        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
229
230        if (err < 0) {
231            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
232            return ERROR_IO;
233        } else if (key->size() != 16) {
234            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
235            return ERROR_MALFORMED;
236        }
237
238        mAESKeyForURI.add(keyURI, key);
239    }
240
241    AES_KEY aes_key;
242    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
243        ALOGE("failed to set AES decryption key.");
244        return UNKNOWN_ERROR;
245    }
246
247    size_t n = buffer->size();
248    if (!n) {
249        return OK;
250    }
251    CHECK(n % 16 == 0);
252
253    if (first) {
254        // If decrypting the first block in a file, read the iv from the manifest
255        // or derive the iv from the file's sequence number.
256
257        AString iv;
258        if (itemMeta->findString("cipher-iv", &iv)) {
259            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
260                    || iv.size() != 16 * 2 + 2) {
261                ALOGE("malformed cipher IV '%s'.", iv.c_str());
262                return ERROR_MALFORMED;
263            }
264
265            memset(mAESInitVec, 0, sizeof(mAESInitVec));
266            for (size_t i = 0; i < 16; ++i) {
267                char c1 = tolower(iv.c_str()[2 + 2 * i]);
268                char c2 = tolower(iv.c_str()[3 + 2 * i]);
269                if (!isxdigit(c1) || !isxdigit(c2)) {
270                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
271                    return ERROR_MALFORMED;
272                }
273                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
274                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
275
276                mAESInitVec[i] = nibble1 << 4 | nibble2;
277            }
278        } else {
279            memset(mAESInitVec, 0, sizeof(mAESInitVec));
280            mAESInitVec[15] = mSeqNumber & 0xff;
281            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
282            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
283            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
284        }
285    }
286
287    AES_cbc_encrypt(
288            buffer->data(), buffer->data(), buffer->size(),
289            &aes_key, mAESInitVec, AES_DECRYPT);
290
291    return OK;
292}
293
294status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
295    AString method;
296    CHECK(buffer->meta()->findString("cipher-method", &method));
297    if (method == "NONE") {
298        return OK;
299    }
300
301    uint8_t padding = 0;
302    if (buffer->size() > 0) {
303        padding = buffer->data()[buffer->size() - 1];
304    }
305
306    if (padding > 16) {
307        return ERROR_MALFORMED;
308    }
309
310    for (size_t i = buffer->size() - padding; i < padding; i++) {
311        if (buffer->data()[i] != padding) {
312            return ERROR_MALFORMED;
313        }
314    }
315
316    buffer->setRange(buffer->offset(), buffer->size() - padding);
317    return OK;
318}
319
320void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
321    int64_t maxDelayUs = delayUsToRefreshPlaylist();
322    if (maxDelayUs < minDelayUs) {
323        maxDelayUs = minDelayUs;
324    }
325    if (delayUs > maxDelayUs) {
326        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
327        delayUs = maxDelayUs;
328    }
329    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
330    msg->setInt32("generation", mMonitorQueueGeneration);
331    msg->post(delayUs);
332}
333
334void PlaylistFetcher::cancelMonitorQueue() {
335    ++mMonitorQueueGeneration;
336}
337
338void PlaylistFetcher::startAsync(
339        const sp<AnotherPacketSource> &audioSource,
340        const sp<AnotherPacketSource> &videoSource,
341        const sp<AnotherPacketSource> &subtitleSource,
342        int64_t startTimeUs,
343        int64_t segmentStartTimeUs,
344        int32_t startDiscontinuitySeq,
345        bool adaptive) {
346    sp<AMessage> msg = new AMessage(kWhatStart, this);
347
348    uint32_t streamTypeMask = 0ul;
349
350    if (audioSource != NULL) {
351        msg->setPointer("audioSource", audioSource.get());
352        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
353    }
354
355    if (videoSource != NULL) {
356        msg->setPointer("videoSource", videoSource.get());
357        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
358    }
359
360    if (subtitleSource != NULL) {
361        msg->setPointer("subtitleSource", subtitleSource.get());
362        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
363    }
364
365    msg->setInt32("streamTypeMask", streamTypeMask);
366    msg->setInt64("startTimeUs", startTimeUs);
367    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
368    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
369    msg->setInt32("adaptive", adaptive);
370    msg->post();
371}
372
373void PlaylistFetcher::pauseAsync() {
374    (new AMessage(kWhatPause, this))->post();
375}
376
377void PlaylistFetcher::stopAsync(bool clear) {
378    sp<AMessage> msg = new AMessage(kWhatStop, this);
379    msg->setInt32("clear", clear);
380    msg->post();
381}
382
383void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
384    AMessage* msg = new AMessage(kWhatResumeUntil, this);
385    msg->setMessage("params", params);
386    msg->post();
387}
388
389void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
390    switch (msg->what()) {
391        case kWhatStart:
392        {
393            status_t err = onStart(msg);
394
395            sp<AMessage> notify = mNotify->dup();
396            notify->setInt32("what", kWhatStarted);
397            notify->setInt32("err", err);
398            notify->post();
399            break;
400        }
401
402        case kWhatPause:
403        {
404            onPause();
405
406            sp<AMessage> notify = mNotify->dup();
407            notify->setInt32("what", kWhatPaused);
408            notify->post();
409            break;
410        }
411
412        case kWhatStop:
413        {
414            onStop(msg);
415
416            sp<AMessage> notify = mNotify->dup();
417            notify->setInt32("what", kWhatStopped);
418            notify->post();
419            break;
420        }
421
422        case kWhatMonitorQueue:
423        case kWhatDownloadNext:
424        {
425            int32_t generation;
426            CHECK(msg->findInt32("generation", &generation));
427
428            if (generation != mMonitorQueueGeneration) {
429                // Stale event
430                break;
431            }
432
433            if (msg->what() == kWhatMonitorQueue) {
434                onMonitorQueue();
435            } else {
436                onDownloadNext();
437            }
438            break;
439        }
440
441        case kWhatResumeUntil:
442        {
443            onResumeUntil(msg);
444            break;
445        }
446
447        default:
448            TRESPASS();
449    }
450}
451
452status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
453    mPacketSources.clear();
454
455    uint32_t streamTypeMask;
456    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
457
458    int64_t startTimeUs;
459    int64_t segmentStartTimeUs;
460    int32_t startDiscontinuitySeq;
461    int32_t adaptive;
462    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
463    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
464    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
465    CHECK(msg->findInt32("adaptive", &adaptive));
466
467    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
468        void *ptr;
469        CHECK(msg->findPointer("audioSource", &ptr));
470
471        mPacketSources.add(
472                LiveSession::STREAMTYPE_AUDIO,
473                static_cast<AnotherPacketSource *>(ptr));
474    }
475
476    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
477        void *ptr;
478        CHECK(msg->findPointer("videoSource", &ptr));
479
480        mPacketSources.add(
481                LiveSession::STREAMTYPE_VIDEO,
482                static_cast<AnotherPacketSource *>(ptr));
483    }
484
485    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
486        void *ptr;
487        CHECK(msg->findPointer("subtitleSource", &ptr));
488
489        mPacketSources.add(
490                LiveSession::STREAMTYPE_SUBTITLES,
491                static_cast<AnotherPacketSource *>(ptr));
492    }
493
494    mStreamTypeMask = streamTypeMask;
495
496    mSegmentStartTimeUs = segmentStartTimeUs;
497    mDiscontinuitySeq = startDiscontinuitySeq;
498
499    if (startTimeUs >= 0) {
500        mStartTimeUs = startTimeUs;
501        mSeqNumber = -1;
502        mStartup = true;
503        mPrepared = false;
504        mAdaptive = adaptive;
505    }
506
507    postMonitorQueue();
508
509    return OK;
510}
511
512void PlaylistFetcher::onPause() {
513    cancelMonitorQueue();
514}
515
516void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
517    cancelMonitorQueue();
518
519    int32_t clear;
520    CHECK(msg->findInt32("clear", &clear));
521    if (clear) {
522        for (size_t i = 0; i < mPacketSources.size(); i++) {
523            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
524            packetSource->clear();
525        }
526    }
527
528    mPacketSources.clear();
529    mStreamTypeMask = 0;
530}
531
532// Resume until we have reached the boundary timestamps listed in `msg`; when
533// the remaining time is too short (within a resume threshold) stop immediately
534// instead.
535status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
536    sp<AMessage> params;
537    CHECK(msg->findMessage("params", &params));
538
539    size_t stopCount = 0;
540    for (size_t i = 0; i < mPacketSources.size(); i++) {
541        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
542
543        const char *stopKey;
544        int streamType = mPacketSources.keyAt(i);
545
546        if (streamType == LiveSession::STREAMTYPE_SUBTITLES) {
547            // the subtitle track can always be stopped
548            ++stopCount;
549            continue;
550        }
551
552        switch (streamType) {
553        case LiveSession::STREAMTYPE_VIDEO:
554            stopKey = "timeUsVideo";
555            break;
556
557        case LiveSession::STREAMTYPE_AUDIO:
558            stopKey = "timeUsAudio";
559            break;
560
561        default:
562            TRESPASS();
563        }
564
565        // check if this stream has too little data left to be resumed
566        int32_t discontinuitySeq;
567        int64_t latestTimeUs = 0, stopTimeUs = 0;
568        sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta();
569        if (latestMeta != NULL
570                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
571                && discontinuitySeq == mDiscontinuitySeq
572                && latestMeta->findInt64("timeUs", &latestTimeUs)
573                && params->findInt64(stopKey, &stopTimeUs)
574                && stopTimeUs - latestTimeUs < kFetcherResumeThreshold) {
575            ++stopCount;
576        }
577    }
578
579    // Don't resume if all streams are within a resume threshold
580    if (stopCount == mPacketSources.size()) {
581        for (size_t i = 0; i < mPacketSources.size(); i++) {
582            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
583        }
584        stopAsync(/* clear = */ false);
585        return OK;
586    }
587
588    mStopParams = params;
589    onDownloadNext();
590
591    return OK;
592}
593
594void PlaylistFetcher::notifyError(status_t err) {
595    sp<AMessage> notify = mNotify->dup();
596    notify->setInt32("what", kWhatError);
597    notify->setInt32("err", err);
598    notify->post();
599}
600
601void PlaylistFetcher::queueDiscontinuity(
602        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
603    for (size_t i = 0; i < mPacketSources.size(); ++i) {
604        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
605        // (seek will discard buffer by abandoning old fetchers)
606        mPacketSources.valueAt(i)->queueDiscontinuity(
607                type, extra, false /* discard */);
608    }
609}
610
611void PlaylistFetcher::onMonitorQueue() {
612    bool downloadMore = false;
613    refreshPlaylist();
614
615    int32_t targetDurationSecs;
616    int64_t targetDurationUs = kMinBufferedDurationUs;
617    if (mPlaylist != NULL) {
618        if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
619                "target-duration", &targetDurationSecs)) {
620            ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag");
621            notifyError(ERROR_MALFORMED);
622            return;
623        }
624        targetDurationUs = targetDurationSecs * 1000000ll;
625    }
626
627    // buffer at least 3 times the target duration, or up to 10 seconds
628    int64_t durationToBufferUs = targetDurationUs * 3;
629    if (durationToBufferUs > kMinBufferedDurationUs)  {
630        durationToBufferUs = kMinBufferedDurationUs;
631    }
632
633    int64_t bufferedDurationUs = 0ll;
634    status_t finalResult = NOT_ENOUGH_DATA;
635    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
636        sp<AnotherPacketSource> packetSource =
637            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
638
639        bufferedDurationUs =
640                packetSource->getBufferedDurationUs(&finalResult);
641        finalResult = OK;
642    } else {
643        // Use max stream duration to prevent us from waiting on a non-existent stream;
644        // when we cannot make out from the manifest what streams are included in a playlist
645        // we might assume extra streams.
646        for (size_t i = 0; i < mPacketSources.size(); ++i) {
647            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
648                continue;
649            }
650
651            int64_t bufferedStreamDurationUs =
652                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
653            ALOGV("buffered %" PRId64 " for stream %d",
654                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
655            if (bufferedStreamDurationUs > bufferedDurationUs) {
656                bufferedDurationUs = bufferedStreamDurationUs;
657            }
658        }
659    }
660    downloadMore = (bufferedDurationUs < durationToBufferUs);
661
662    // signal start if buffered up at least the target size
663    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
664        mPrepared = true;
665
666        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
667                bufferedDurationUs, targetDurationUs);
668    }
669
670    if (finalResult == OK && downloadMore) {
671        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
672                bufferedDurationUs, durationToBufferUs);
673        // delay the next download slightly; hopefully this gives other concurrent fetchers
674        // a better chance to run.
675        // onDownloadNext();
676        sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
677        msg->setInt32("generation", mMonitorQueueGeneration);
678        msg->post(1000l);
679    } else {
680        // Nothing to do yet, try again in a second.
681        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
682        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
683                delayUs, bufferedDurationUs, durationToBufferUs);
684        // :TRICKY: need to enforce minimum delay because the delay to
685        // refresh the playlist will become 0
686        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
687    }
688}
689
690status_t PlaylistFetcher::refreshPlaylist() {
691    if (delayUsToRefreshPlaylist() <= 0) {
692        bool unchanged;
693        sp<M3UParser> playlist = mSession->fetchPlaylist(
694                mURI.c_str(), mPlaylistHash, &unchanged);
695
696        if (playlist == NULL) {
697            if (unchanged) {
698                // We succeeded in fetching the playlist, but it was
699                // unchanged from the last time we tried.
700
701                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
702                    mRefreshState = (RefreshState)(mRefreshState + 1);
703                }
704            } else {
705                ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
706                return ERROR_IO;
707            }
708        } else {
709            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
710            mPlaylist = playlist;
711
712            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
713                updateDuration();
714            }
715        }
716
717        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
718    }
719    return OK;
720}
721
722// static
723bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
724    return buffer->size() > 0 && buffer->data()[0] == 0x47;
725}
726
727void PlaylistFetcher::onDownloadNext() {
728    status_t err = refreshPlaylist();
729    int32_t firstSeqNumberInPlaylist = 0;
730    int32_t lastSeqNumberInPlaylist = 0;
731    bool discontinuity = false;
732
733    if (mPlaylist != NULL) {
734        if (mPlaylist->meta() != NULL) {
735            mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist);
736        }
737
738        lastSeqNumberInPlaylist =
739                firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
740
741        if (mDiscontinuitySeq < 0) {
742            mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
743        }
744    }
745
746    if (mPlaylist != NULL && mSeqNumber < 0) {
747        CHECK_GE(mStartTimeUs, 0ll);
748
749        if (mSegmentStartTimeUs < 0) {
750            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
751                // If this is a live session, start 3 segments from the end on connect
752                mSeqNumber = lastSeqNumberInPlaylist - 3;
753                if (mSeqNumber < firstSeqNumberInPlaylist) {
754                    mSeqNumber = firstSeqNumberInPlaylist;
755                }
756            } else {
757                // When seeking mSegmentStartTimeUs is unavailable (< 0), we
758                // use mStartTimeUs (client supplied timestamp) to determine both start segment
759                // and relative position inside a segment
760                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
761                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
762            }
763            mStartTimeUsRelative = true;
764            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
765                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
766                    lastSeqNumberInPlaylist);
767        } else {
768            // When adapting or track switching, mSegmentStartTimeUs (relative
769            // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
770            // timestamps coming from the media container) is used to determine the position
771            // inside a segments.
772            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
773            if (mAdaptive) {
774                // avoid double fetch/decode
775                mSeqNumber += 1;
776            }
777            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
778            if (mSeqNumber < minSeq) {
779                mSeqNumber = minSeq;
780            }
781
782            if (mSeqNumber < firstSeqNumberInPlaylist) {
783                mSeqNumber = firstSeqNumberInPlaylist;
784            }
785
786            if (mSeqNumber > lastSeqNumberInPlaylist) {
787                mSeqNumber = lastSeqNumberInPlaylist;
788            }
789            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
790                    mSeqNumber, firstSeqNumberInPlaylist,
791                    lastSeqNumberInPlaylist);
792        }
793    }
794
795    // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
796    if (mSeqNumber < firstSeqNumberInPlaylist
797            || mSeqNumber > lastSeqNumberInPlaylist
798            || err != OK) {
799        if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
800            ++mNumRetries;
801
802            if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
803                // make sure we reach this retry logic on refresh failures
804                // by adding an err != OK clause to all enclosing if's.
805
806                // refresh in increasing fraction (1/2, 1/3, ...) of the
807                // playlist's target duration or 3 seconds, whichever is less
808                int64_t delayUs = kMaxMonitorDelayUs;
809                if (mPlaylist != NULL && mPlaylist->meta() != NULL) {
810                    int32_t targetDurationSecs;
811                    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
812                    delayUs = mPlaylist->size() * targetDurationSecs *
813                            1000000ll / (1 + mNumRetries);
814                }
815                if (delayUs > kMaxMonitorDelayUs) {
816                    delayUs = kMaxMonitorDelayUs;
817                }
818                ALOGV("sequence number high: %d from (%d .. %d), "
819                      "monitor in %" PRId64 " (retry=%d)",
820                        mSeqNumber, firstSeqNumberInPlaylist,
821                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
822                postMonitorQueue(delayUs);
823                return;
824            }
825
826            if (err != OK) {
827                notifyError(err);
828                return;
829            }
830
831            // we've missed the boat, let's start 3 segments prior to the latest sequence
832            // number available and signal a discontinuity.
833
834            ALOGI("We've missed the boat, restarting playback."
835                  "  mStartup=%d, was  looking for %d in %d-%d",
836                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
837                    lastSeqNumberInPlaylist);
838            if (mStopParams != NULL) {
839                // we should have kept on fetching until we hit the boundaries in mStopParams,
840                // but since the segments we are supposed to fetch have already rolled off
841                // the playlist, i.e. we have already missed the boat, we inevitably have to
842                // skip.
843                for (size_t i = 0; i < mPacketSources.size(); i++) {
844                    sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
845                    mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
846                }
847                stopAsync(/* clear = */ false);
848                return;
849            }
850            mSeqNumber = lastSeqNumberInPlaylist - 3;
851            if (mSeqNumber < firstSeqNumberInPlaylist) {
852                mSeqNumber = firstSeqNumberInPlaylist;
853            }
854            discontinuity = true;
855
856            // fall through
857        } else {
858            ALOGE("Cannot find sequence number %d in playlist "
859                 "(contains %d - %d)",
860                 mSeqNumber, firstSeqNumberInPlaylist,
861                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
862
863            notifyError(ERROR_END_OF_STREAM);
864            return;
865        }
866    }
867
868    mNumRetries = 0;
869
870    AString uri;
871    sp<AMessage> itemMeta;
872    CHECK(mPlaylist->itemAt(
873                mSeqNumber - firstSeqNumberInPlaylist,
874                &uri,
875                &itemMeta));
876
877    int32_t val;
878    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
879        mDiscontinuitySeq++;
880        discontinuity = true;
881    }
882
883    int64_t range_offset, range_length;
884    if (!itemMeta->findInt64("range-offset", &range_offset)
885            || !itemMeta->findInt64("range-length", &range_length)) {
886        range_offset = 0;
887        range_length = -1;
888    }
889
890    ALOGV("fetching segment %d from (%d .. %d)",
891          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
892
893    ALOGV("fetching '%s'", uri.c_str());
894
895    sp<DataSource> source;
896    sp<ABuffer> buffer, tsBuffer;
897    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
898    // this avoids interleaved connections to the key and segment file.
899    {
900        sp<ABuffer> junk = new ABuffer(16);
901        junk->setRange(0, 16);
902        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
903                true /* first */);
904        if (err != OK) {
905            notifyError(err);
906            return;
907        }
908    }
909
910    // block-wise download
911    bool startup = mStartup;
912    ssize_t bytesRead;
913    do {
914        bytesRead = mSession->fetchFile(
915                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
916
917        if (bytesRead < 0) {
918            status_t err = bytesRead;
919            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
920            notifyError(err);
921            return;
922        }
923
924        CHECK(buffer != NULL);
925
926        size_t size = buffer->size();
927        // Set decryption range.
928        buffer->setRange(size - bytesRead, bytesRead);
929        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
930                buffer->offset() == 0 /* first */);
931        // Unset decryption range.
932        buffer->setRange(0, size);
933
934        if (err != OK) {
935            ALOGE("decryptBuffer failed w/ error %d", err);
936
937            notifyError(err);
938            return;
939        }
940
941        if (startup || discontinuity) {
942            // Signal discontinuity.
943
944            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
945                // If this was a live event this made no sense since
946                // we don't have access to all the segment before the current
947                // one.
948                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
949            }
950
951            if (discontinuity) {
952                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
953
954                queueDiscontinuity(
955                        ATSParser::DISCONTINUITY_FORMATCHANGE,
956                        NULL /* extra */);
957
958                discontinuity = false;
959            }
960
961            startup = false;
962        }
963
964        err = OK;
965        if (bufferStartsWithTsSyncByte(buffer)) {
966            // Incremental extraction is only supported for MPEG2 transport streams.
967            if (tsBuffer == NULL) {
968                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
969                tsBuffer->setRange(0, 0);
970            } else if (tsBuffer->capacity() != buffer->capacity()) {
971                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
972                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
973                tsBuffer->setRange(tsOff, tsSize);
974            }
975            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
976
977            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
978        }
979
980        if (err == -EAGAIN) {
981            // starting sequence number too low/high
982            mTSParser.clear();
983            for (size_t i = 0; i < mPacketSources.size(); i++) {
984                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
985                packetSource->clear();
986            }
987            postMonitorQueue();
988            return;
989        } else if (err == ERROR_OUT_OF_RANGE) {
990            // reached stopping point
991            stopAsync(/* clear = */ false);
992            return;
993        } else if (err != OK) {
994            notifyError(err);
995            return;
996        }
997
998    } while (bytesRead != 0);
999
1000    if (bufferStartsWithTsSyncByte(buffer)) {
1001        // If we don't see a stream in the program table after fetching a full ts segment
1002        // mark it as nonexistent.
1003        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
1004        ATSParser::SourceType srcTypes[kNumTypes] =
1005                { ATSParser::VIDEO, ATSParser::AUDIO };
1006        LiveSession::StreamType streamTypes[kNumTypes] =
1007                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1008
1009        for (size_t i = 0; i < kNumTypes; i++) {
1010            ATSParser::SourceType srcType = srcTypes[i];
1011            LiveSession::StreamType streamType = streamTypes[i];
1012
1013            sp<AnotherPacketSource> source =
1014                static_cast<AnotherPacketSource *>(
1015                    mTSParser->getSource(srcType).get());
1016
1017            if (!mTSParser->hasSource(srcType)) {
1018                ALOGW("MPEG2 Transport stream does not contain %s data.",
1019                      srcType == ATSParser::VIDEO ? "video" : "audio");
1020
1021                mStreamTypeMask &= ~streamType;
1022                mPacketSources.removeItem(streamType);
1023            }
1024        }
1025
1026    }
1027
1028    if (checkDecryptPadding(buffer) != OK) {
1029        ALOGE("Incorrect padding bytes after decryption.");
1030        notifyError(ERROR_MALFORMED);
1031        return;
1032    }
1033
1034    err = OK;
1035    if (tsBuffer != NULL) {
1036        AString method;
1037        CHECK(buffer->meta()->findString("cipher-method", &method));
1038        if ((tsBuffer->size() > 0 && method == "NONE")
1039                || tsBuffer->size() > 16) {
1040            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1041                    "bytes in length.");
1042            notifyError(ERROR_MALFORMED);
1043            return;
1044        }
1045    }
1046
1047    // bulk extract non-ts files
1048    if (tsBuffer == NULL) {
1049        err = extractAndQueueAccessUnits(buffer, itemMeta);
1050        if (err == -EAGAIN) {
1051            // starting sequence number too low/high
1052            postMonitorQueue();
1053            return;
1054        } else if (err == ERROR_OUT_OF_RANGE) {
1055            // reached stopping point
1056            stopAsync(/* clear = */false);
1057            return;
1058        }
1059    }
1060
1061    if (err != OK) {
1062        notifyError(err);
1063        return;
1064    }
1065
1066    ++mSeqNumber;
1067
1068    postMonitorQueue();
1069}
1070
1071int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1072    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1073    if (mPlaylist->meta() == NULL
1074            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1075        firstSeqNumberInPlaylist = 0;
1076    }
1077    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1078
1079    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1080    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1081        sp<AMessage> itemMeta;
1082        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1083
1084        int64_t itemDurationUs;
1085        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1086
1087        anchorTimeUs -= itemDurationUs;
1088        --index;
1089    }
1090
1091    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1092    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1093        return newSeqNumber;
1094    } else {
1095        return lastSeqNumberInPlaylist;
1096    }
1097}
1098
1099int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1100    int32_t firstSeqNumberInPlaylist;
1101    if (mPlaylist->meta() == NULL
1102            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1103        firstSeqNumberInPlaylist = 0;
1104    }
1105
1106    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1107    if (discontinuitySeq < curDiscontinuitySeq) {
1108        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1109    }
1110
1111    size_t index = 0;
1112    while (index < mPlaylist->size()) {
1113        sp<AMessage> itemMeta;
1114        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1115
1116        int64_t discontinuity;
1117        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1118            curDiscontinuitySeq++;
1119        }
1120
1121        if (curDiscontinuitySeq == discontinuitySeq) {
1122            return firstSeqNumberInPlaylist + index;
1123        }
1124
1125        ++index;
1126    }
1127
1128    return firstSeqNumberInPlaylist + mPlaylist->size();
1129}
1130
1131int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1132    int32_t firstSeqNumberInPlaylist;
1133    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1134                "media-sequence", &firstSeqNumberInPlaylist)) {
1135        firstSeqNumberInPlaylist = 0;
1136    }
1137
1138    size_t index = 0;
1139    int64_t segmentStartUs = 0;
1140    while (index < mPlaylist->size()) {
1141        sp<AMessage> itemMeta;
1142        CHECK(mPlaylist->itemAt(
1143                    index, NULL /* uri */, &itemMeta));
1144
1145        int64_t itemDurationUs;
1146        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1147
1148        if (timeUs < segmentStartUs + itemDurationUs) {
1149            break;
1150        }
1151
1152        segmentStartUs += itemDurationUs;
1153        ++index;
1154    }
1155
1156    if (index >= mPlaylist->size()) {
1157        index = mPlaylist->size() - 1;
1158    }
1159
1160    return firstSeqNumberInPlaylist + index;
1161}
1162
1163const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1164        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1165    sp<MetaData> format = source->getFormat();
1166    if (format != NULL) {
1167        // for simplicity, store a reference to the format in each unit
1168        accessUnit->meta()->setObject("format", format);
1169    }
1170
1171    if (discard) {
1172        accessUnit->meta()->setInt32("discard", discard);
1173    }
1174
1175    int32_t targetDurationSecs;
1176    if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) {
1177        accessUnit->meta()->setInt32("targetDuration", targetDurationSecs);
1178    }
1179
1180    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1181    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1182    return accessUnit;
1183}
1184
1185status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1186    if (mTSParser == NULL) {
1187        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1188        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1189    }
1190
1191    if (mNextPTSTimeUs >= 0ll) {
1192        sp<AMessage> extra = new AMessage;
1193        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1194        // ATSParser from skewing the timestamps of access units.
1195        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1196
1197        mTSParser->signalDiscontinuity(
1198                ATSParser::DISCONTINUITY_TIME, extra);
1199
1200        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1201        mNextPTSTimeUs = -1ll;
1202        mFirstPTSValid = false;
1203    }
1204
1205    size_t offset = 0;
1206    while (offset + 188 <= buffer->size()) {
1207        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1208
1209        if (err != OK) {
1210            return err;
1211        }
1212
1213        offset += 188;
1214    }
1215    // setRange to indicate consumed bytes.
1216    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1217
1218    status_t err = OK;
1219    for (size_t i = mPacketSources.size(); i-- > 0;) {
1220        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1221
1222        const char *key;
1223        ATSParser::SourceType type;
1224        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1225        switch (stream) {
1226            case LiveSession::STREAMTYPE_VIDEO:
1227                type = ATSParser::VIDEO;
1228                key = "timeUsVideo";
1229                break;
1230
1231            case LiveSession::STREAMTYPE_AUDIO:
1232                type = ATSParser::AUDIO;
1233                key = "timeUsAudio";
1234                break;
1235
1236            case LiveSession::STREAMTYPE_SUBTITLES:
1237            {
1238                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1239                return ERROR_MALFORMED;
1240                break;
1241            }
1242
1243            default:
1244                TRESPASS();
1245        }
1246
1247        sp<AnotherPacketSource> source =
1248            static_cast<AnotherPacketSource *>(
1249                    mTSParser->getSource(type).get());
1250
1251        if (source == NULL) {
1252            continue;
1253        }
1254
1255        int64_t timeUs;
1256        sp<ABuffer> accessUnit;
1257        status_t finalResult;
1258        while (source->hasBufferAvailable(&finalResult)
1259                && source->dequeueAccessUnit(&accessUnit) == OK) {
1260
1261            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1262
1263            if (mStartup) {
1264                if (!mFirstPTSValid) {
1265                    mFirstTimeUs = timeUs;
1266                    mFirstPTSValid = true;
1267                }
1268                if (mStartTimeUsRelative) {
1269                    timeUs -= mFirstTimeUs;
1270                    if (timeUs < 0) {
1271                        timeUs = 0;
1272                    }
1273                }
1274
1275                if (timeUs < mStartTimeUs) {
1276                    // buffer up to the closest preceding IDR frame
1277                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1278                            timeUs, mStartTimeUs);
1279                    const char *mime;
1280                    sp<MetaData> format  = source->getFormat();
1281                    bool isAvc = false;
1282                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1283                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1284                        isAvc = true;
1285                    }
1286                    if (isAvc && IsIDR(accessUnit)) {
1287                        mVideoBuffer->clear();
1288                    }
1289                    if (isAvc) {
1290                        mVideoBuffer->queueAccessUnit(accessUnit);
1291                    }
1292
1293                    continue;
1294                }
1295            }
1296
1297            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1298            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1299                int32_t firstSeqNumberInPlaylist;
1300                if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1301                            "media-sequence", &firstSeqNumberInPlaylist)) {
1302                    firstSeqNumberInPlaylist = 0;
1303                }
1304
1305                int32_t targetDurationSecs;
1306                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1307                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1308                // mStartup
1309                //   mStartup is true until we have queued a packet for all the streams
1310                //   we are fetching. We queue packets whose timestamps are greater than
1311                //   mStartTimeUs.
1312                // mSegmentStartTimeUs >= 0
1313                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1314                // mSeqNumber > firstSeqNumberInPlaylist
1315                //   don't decrement mSeqNumber if it already points to the 1st segment
1316                // timeUs - mStartTimeUs > targetDurationUs:
1317                //   This and the 2 above conditions should only happen when adapting in a live
1318                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1319                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1320                //   the old fetcher would then continue fetching data until timeUs. We don't want
1321                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1322                //   stop as early as possible. The definition of being "too far ahead" is
1323                //   arbitrary; here we use targetDurationUs as threshold.
1324                if (mStartup && mSegmentStartTimeUs >= 0
1325                        && mSeqNumber > firstSeqNumberInPlaylist
1326                        && timeUs - mStartTimeUs > targetDurationUs) {
1327                    // we just guessed a starting timestamp that is too high when adapting in a
1328                    // live stream; re-adjust based on the actual timestamp extracted from the
1329                    // media segment; if we didn't move backward after the re-adjustment
1330                    // (newSeqNumber), start at least 1 segment prior.
1331                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1332                    if (newSeqNumber >= mSeqNumber) {
1333                        --mSeqNumber;
1334                    } else {
1335                        mSeqNumber = newSeqNumber;
1336                    }
1337                    mStartTimeUsNotify = mNotify->dup();
1338                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1339                    return -EAGAIN;
1340                }
1341
1342                int32_t seq;
1343                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1344                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1345                }
1346                int64_t startTimeUs;
1347                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1348                    mStartTimeUsNotify->setInt64(key, timeUs);
1349
1350                    uint32_t streamMask = 0;
1351                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1352                    streamMask |= mPacketSources.keyAt(i);
1353                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1354
1355                    if (streamMask == mStreamTypeMask) {
1356                        mStartup = false;
1357                        mStartTimeUsNotify->post();
1358                        mStartTimeUsNotify.clear();
1359                    }
1360                }
1361            }
1362
1363            if (mStopParams != NULL) {
1364                // Queue discontinuity in original stream.
1365                int32_t discontinuitySeq;
1366                int64_t stopTimeUs;
1367                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1368                        || discontinuitySeq > mDiscontinuitySeq
1369                        || !mStopParams->findInt64(key, &stopTimeUs)
1370                        || (discontinuitySeq == mDiscontinuitySeq
1371                                && timeUs >= stopTimeUs)) {
1372                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1373                    mStreamTypeMask &= ~stream;
1374                    mPacketSources.removeItemsAt(i);
1375                    break;
1376                }
1377            }
1378
1379            // Note that we do NOT dequeue any discontinuities except for format change.
1380            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1381                const bool discard = true;
1382                status_t status;
1383                while (mVideoBuffer->hasBufferAvailable(&status)) {
1384                    sp<ABuffer> videoBuffer;
1385                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1386                    setAccessUnitProperties(videoBuffer, source, discard);
1387                    packetSource->queueAccessUnit(videoBuffer);
1388                }
1389            }
1390
1391            setAccessUnitProperties(accessUnit, source);
1392            packetSource->queueAccessUnit(accessUnit);
1393        }
1394
1395        if (err != OK) {
1396            break;
1397        }
1398    }
1399
1400    if (err != OK) {
1401        for (size_t i = mPacketSources.size(); i-- > 0;) {
1402            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1403            packetSource->clear();
1404        }
1405        return err;
1406    }
1407
1408    if (!mStreamTypeMask) {
1409        // Signal gap is filled between original and new stream.
1410        ALOGV("ERROR OUT OF RANGE");
1411        return ERROR_OUT_OF_RANGE;
1412    }
1413
1414    return OK;
1415}
1416
1417/* static */
1418bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1419        const sp<ABuffer> &buffer) {
1420    size_t pos = 0;
1421
1422    // skip possible BOM
1423    if (buffer->size() >= pos + 3 &&
1424            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1425        pos += 3;
1426    }
1427
1428    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1429    if (buffer->size() < pos + 6 ||
1430            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1431        return false;
1432    }
1433    pos += 6;
1434
1435    if (buffer->size() == pos) {
1436        return true;
1437    }
1438
1439    uint8_t sep = buffer->data()[pos];
1440    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1441}
1442
1443status_t PlaylistFetcher::extractAndQueueAccessUnits(
1444        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1445    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1446        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1447            ALOGE("This stream only contains subtitles.");
1448            return ERROR_MALFORMED;
1449        }
1450
1451        const sp<AnotherPacketSource> packetSource =
1452            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1453
1454        int64_t durationUs;
1455        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1456        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1457        buffer->meta()->setInt64("durationUs", durationUs);
1458        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1459        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1460        buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1461
1462        packetSource->queueAccessUnit(buffer);
1463        return OK;
1464    }
1465
1466    if (mNextPTSTimeUs >= 0ll) {
1467        mFirstPTSValid = false;
1468        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1469        mNextPTSTimeUs = -1ll;
1470    }
1471
1472    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1473    // stream prefixed by an ID3 tag.
1474
1475    bool firstID3Tag = true;
1476    uint64_t PTS = 0;
1477
1478    for (;;) {
1479        // Make sure to skip all ID3 tags preceding the audio data.
1480        // At least one must be present to provide the PTS timestamp.
1481
1482        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1483        if (!id3.isValid()) {
1484            if (firstID3Tag) {
1485                ALOGE("Unable to parse ID3 tag.");
1486                return ERROR_MALFORMED;
1487            } else {
1488                break;
1489            }
1490        }
1491
1492        if (firstID3Tag) {
1493            bool found = false;
1494
1495            ID3::Iterator it(id3, "PRIV");
1496            while (!it.done()) {
1497                size_t length;
1498                const uint8_t *data = it.getData(&length);
1499
1500                static const char *kMatchName =
1501                    "com.apple.streaming.transportStreamTimestamp";
1502                static const size_t kMatchNameLen = strlen(kMatchName);
1503
1504                if (length == kMatchNameLen + 1 + 8
1505                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1506                    found = true;
1507                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1508                }
1509
1510                it.next();
1511            }
1512
1513            if (!found) {
1514                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1515                return ERROR_MALFORMED;
1516            }
1517        }
1518
1519        // skip the ID3 tag
1520        buffer->setRange(
1521                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1522
1523        firstID3Tag = false;
1524    }
1525
1526    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1527        ALOGW("This stream only contains audio data!");
1528
1529        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1530
1531        if (mStreamTypeMask == 0) {
1532            return OK;
1533        }
1534    }
1535
1536    sp<AnotherPacketSource> packetSource =
1537        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1538
1539    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1540        ABitReader bits(buffer->data(), buffer->size());
1541
1542        // adts_fixed_header
1543
1544        CHECK_EQ(bits.getBits(12), 0xfffu);
1545        bits.skipBits(3);  // ID, layer
1546        bool protection_absent __unused = bits.getBits(1) != 0;
1547
1548        unsigned profile = bits.getBits(2);
1549        CHECK_NE(profile, 3u);
1550        unsigned sampling_freq_index = bits.getBits(4);
1551        bits.getBits(1);  // private_bit
1552        unsigned channel_configuration = bits.getBits(3);
1553        CHECK_NE(channel_configuration, 0u);
1554        bits.skipBits(2);  // original_copy, home
1555
1556        sp<MetaData> meta = MakeAACCodecSpecificData(
1557                profile, sampling_freq_index, channel_configuration);
1558
1559        meta->setInt32(kKeyIsADTS, true);
1560
1561        packetSource->setFormat(meta);
1562    }
1563
1564    int64_t numSamples = 0ll;
1565    int32_t sampleRate;
1566    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1567
1568    int64_t timeUs = (PTS * 100ll) / 9ll;
1569    if (!mFirstPTSValid) {
1570        mFirstPTSValid = true;
1571        mFirstTimeUs = timeUs;
1572    }
1573
1574    size_t offset = 0;
1575    while (offset < buffer->size()) {
1576        const uint8_t *adtsHeader = buffer->data() + offset;
1577        CHECK_LT(offset + 5, buffer->size());
1578
1579        unsigned aac_frame_length =
1580            ((adtsHeader[3] & 3) << 11)
1581            | (adtsHeader[4] << 3)
1582            | (adtsHeader[5] >> 5);
1583
1584        if (aac_frame_length == 0) {
1585            const uint8_t *id3Header = adtsHeader;
1586            if (!memcmp(id3Header, "ID3", 3)) {
1587                ID3 id3(id3Header, buffer->size() - offset, true);
1588                if (id3.isValid()) {
1589                    offset += id3.rawSize();
1590                    continue;
1591                };
1592            }
1593            return ERROR_MALFORMED;
1594        }
1595
1596        CHECK_LE(offset + aac_frame_length, buffer->size());
1597
1598        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1599        offset += aac_frame_length;
1600
1601        // Each AAC frame encodes 1024 samples.
1602        numSamples += 1024;
1603
1604        if (mStartup) {
1605            int64_t startTimeUs = unitTimeUs;
1606            if (mStartTimeUsRelative) {
1607                startTimeUs -= mFirstTimeUs;
1608                if (startTimeUs  < 0) {
1609                    startTimeUs = 0;
1610                }
1611            }
1612            if (startTimeUs < mStartTimeUs) {
1613                continue;
1614            }
1615
1616            if (mStartTimeUsNotify != NULL) {
1617                int32_t targetDurationSecs;
1618                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1619                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1620
1621                // Duplicated logic from how we handle .ts playlists.
1622                if (mStartup && mSegmentStartTimeUs >= 0
1623                        && timeUs - mStartTimeUs > targetDurationUs) {
1624                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1625                    if (newSeqNumber >= mSeqNumber) {
1626                        --mSeqNumber;
1627                    } else {
1628                        mSeqNumber = newSeqNumber;
1629                    }
1630                    return -EAGAIN;
1631                }
1632
1633                mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
1634                mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1635                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
1636                mStartTimeUsNotify->post();
1637                mStartTimeUsNotify.clear();
1638                mStartup = false;
1639            }
1640        }
1641
1642        if (mStopParams != NULL) {
1643            // Queue discontinuity in original stream.
1644            int32_t discontinuitySeq;
1645            int64_t stopTimeUs;
1646            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1647                    || discontinuitySeq > mDiscontinuitySeq
1648                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
1649                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
1650                packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1651                mStreamTypeMask = 0;
1652                mPacketSources.clear();
1653                return ERROR_OUT_OF_RANGE;
1654            }
1655        }
1656
1657        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1658        memcpy(unit->data(), adtsHeader, aac_frame_length);
1659
1660        unit->meta()->setInt64("timeUs", unitTimeUs);
1661        setAccessUnitProperties(unit, packetSource);
1662        packetSource->queueAccessUnit(unit);
1663    }
1664
1665    return OK;
1666}
1667
1668void PlaylistFetcher::updateDuration() {
1669    int64_t durationUs = 0ll;
1670    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1671        sp<AMessage> itemMeta;
1672        CHECK(mPlaylist->itemAt(
1673                    index, NULL /* uri */, &itemMeta));
1674
1675        int64_t itemDurationUs;
1676        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1677
1678        durationUs += itemDurationUs;
1679    }
1680
1681    sp<AMessage> msg = mNotify->dup();
1682    msg->setInt32("what", kWhatDurationUpdate);
1683    msg->setInt64("durationUs", durationUs);
1684    msg->post();
1685}
1686
1687}  // namespace android
1688