1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52// LCM of 188 (size of a TS packet) & 1k works well
53const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
54const int32_t PlaylistFetcher::kNumSkipFrames = 5;
55
56PlaylistFetcher::PlaylistFetcher(
57        const sp<AMessage> &notify,
58        const sp<LiveSession> &session,
59        const char *uri,
60        int32_t subtitleGeneration)
61    : mNotify(notify),
62      mStartTimeUsNotify(notify->dup()),
63      mSession(session),
64      mURI(uri),
65      mStreamTypeMask(0),
66      mStartTimeUs(-1ll),
67      mSegmentStartTimeUs(-1ll),
68      mDiscontinuitySeq(-1ll),
69      mStartTimeUsRelative(false),
70      mLastPlaylistFetchTimeUs(-1ll),
71      mSeqNumber(-1),
72      mNumRetries(0),
73      mStartup(true),
74      mAdaptive(false),
75      mPrepared(false),
76      mNextPTSTimeUs(-1ll),
77      mMonitorQueueGeneration(0),
78      mSubtitleGeneration(subtitleGeneration),
79      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
80      mFirstPTSValid(false),
81      mAbsoluteTimeAnchorUs(0ll),
82      mVideoBuffer(new AnotherPacketSource(NULL)) {
83    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
84    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
85    mStartTimeUsNotify->setInt32("streamMask", 0);
86}
87
88PlaylistFetcher::~PlaylistFetcher() {
89}
90
91int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
92    CHECK(mPlaylist != NULL);
93
94    int32_t firstSeqNumberInPlaylist;
95    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
96                "media-sequence", &firstSeqNumberInPlaylist)) {
97        firstSeqNumberInPlaylist = 0;
98    }
99
100    int32_t lastSeqNumberInPlaylist =
101        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
102
103    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
104    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
105
106    int64_t segmentStartUs = 0ll;
107    for (int32_t index = 0;
108            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
109        sp<AMessage> itemMeta;
110        CHECK(mPlaylist->itemAt(
111                    index, NULL /* uri */, &itemMeta));
112
113        int64_t itemDurationUs;
114        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
115
116        segmentStartUs += itemDurationUs;
117    }
118
119    return segmentStartUs;
120}
121
122int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
123    int64_t nowUs = ALooper::GetNowUs();
124
125    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
126        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
127        return 0ll;
128    }
129
130    if (mPlaylist->isComplete()) {
131        return (~0llu >> 1);
132    }
133
134    int32_t targetDurationSecs;
135    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
136
137    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
138
139    int64_t minPlaylistAgeUs;
140
141    switch (mRefreshState) {
142        case INITIAL_MINIMUM_RELOAD_DELAY:
143        {
144            size_t n = mPlaylist->size();
145            if (n > 0) {
146                sp<AMessage> itemMeta;
147                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
148
149                int64_t itemDurationUs;
150                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
151
152                minPlaylistAgeUs = itemDurationUs;
153                break;
154            }
155
156            // fall through
157        }
158
159        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
160        {
161            minPlaylistAgeUs = targetDurationUs / 2;
162            break;
163        }
164
165        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
166        {
167            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
168            break;
169        }
170
171        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
172        {
173            minPlaylistAgeUs = targetDurationUs * 3;
174            break;
175        }
176
177        default:
178            TRESPASS();
179            break;
180    }
181
182    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
183    return delayUs > 0ll ? delayUs : 0ll;
184}
185
186status_t PlaylistFetcher::decryptBuffer(
187        size_t playlistIndex, const sp<ABuffer> &buffer,
188        bool first) {
189    sp<AMessage> itemMeta;
190    bool found = false;
191    AString method;
192
193    for (ssize_t i = playlistIndex; i >= 0; --i) {
194        AString uri;
195        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
196
197        if (itemMeta->findString("cipher-method", &method)) {
198            found = true;
199            break;
200        }
201    }
202
203    if (!found) {
204        method = "NONE";
205    }
206    buffer->meta()->setString("cipher-method", method.c_str());
207
208    if (method == "NONE") {
209        return OK;
210    } else if (!(method == "AES-128")) {
211        ALOGE("Unsupported cipher method '%s'", method.c_str());
212        return ERROR_UNSUPPORTED;
213    }
214
215    AString keyURI;
216    if (!itemMeta->findString("cipher-uri", &keyURI)) {
217        ALOGE("Missing key uri");
218        return ERROR_MALFORMED;
219    }
220
221    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
222
223    sp<ABuffer> key;
224    if (index >= 0) {
225        key = mAESKeyForURI.valueAt(index);
226    } else {
227        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
228
229        if (err < 0) {
230            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
231            return ERROR_IO;
232        } else if (key->size() != 16) {
233            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
234            return ERROR_MALFORMED;
235        }
236
237        mAESKeyForURI.add(keyURI, key);
238    }
239
240    AES_KEY aes_key;
241    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
242        ALOGE("failed to set AES decryption key.");
243        return UNKNOWN_ERROR;
244    }
245
246    size_t n = buffer->size();
247    if (!n) {
248        return OK;
249    }
250    CHECK(n % 16 == 0);
251
252    if (first) {
253        // If decrypting the first block in a file, read the iv from the manifest
254        // or derive the iv from the file's sequence number.
255
256        AString iv;
257        if (itemMeta->findString("cipher-iv", &iv)) {
258            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
259                    || iv.size() != 16 * 2 + 2) {
260                ALOGE("malformed cipher IV '%s'.", iv.c_str());
261                return ERROR_MALFORMED;
262            }
263
264            memset(mAESInitVec, 0, sizeof(mAESInitVec));
265            for (size_t i = 0; i < 16; ++i) {
266                char c1 = tolower(iv.c_str()[2 + 2 * i]);
267                char c2 = tolower(iv.c_str()[3 + 2 * i]);
268                if (!isxdigit(c1) || !isxdigit(c2)) {
269                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
270                    return ERROR_MALFORMED;
271                }
272                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
273                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
274
275                mAESInitVec[i] = nibble1 << 4 | nibble2;
276            }
277        } else {
278            memset(mAESInitVec, 0, sizeof(mAESInitVec));
279            mAESInitVec[15] = mSeqNumber & 0xff;
280            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
281            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
282            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
283        }
284    }
285
286    AES_cbc_encrypt(
287            buffer->data(), buffer->data(), buffer->size(),
288            &aes_key, mAESInitVec, AES_DECRYPT);
289
290    return OK;
291}
292
293status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
294    status_t err;
295    AString method;
296    CHECK(buffer->meta()->findString("cipher-method", &method));
297    if (method == "NONE") {
298        return OK;
299    }
300
301    uint8_t padding = 0;
302    if (buffer->size() > 0) {
303        padding = buffer->data()[buffer->size() - 1];
304    }
305
306    if (padding > 16) {
307        return ERROR_MALFORMED;
308    }
309
310    for (size_t i = buffer->size() - padding; i < padding; i++) {
311        if (buffer->data()[i] != padding) {
312            return ERROR_MALFORMED;
313        }
314    }
315
316    buffer->setRange(buffer->offset(), buffer->size() - padding);
317    return OK;
318}
319
320void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
321    int64_t maxDelayUs = delayUsToRefreshPlaylist();
322    if (maxDelayUs < minDelayUs) {
323        maxDelayUs = minDelayUs;
324    }
325    if (delayUs > maxDelayUs) {
326        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
327        delayUs = maxDelayUs;
328    }
329    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
330    msg->setInt32("generation", mMonitorQueueGeneration);
331    msg->post(delayUs);
332}
333
334void PlaylistFetcher::cancelMonitorQueue() {
335    ++mMonitorQueueGeneration;
336}
337
338void PlaylistFetcher::startAsync(
339        const sp<AnotherPacketSource> &audioSource,
340        const sp<AnotherPacketSource> &videoSource,
341        const sp<AnotherPacketSource> &subtitleSource,
342        int64_t startTimeUs,
343        int64_t segmentStartTimeUs,
344        int32_t startDiscontinuitySeq,
345        bool adaptive) {
346    sp<AMessage> msg = new AMessage(kWhatStart, id());
347
348    uint32_t streamTypeMask = 0ul;
349
350    if (audioSource != NULL) {
351        msg->setPointer("audioSource", audioSource.get());
352        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
353    }
354
355    if (videoSource != NULL) {
356        msg->setPointer("videoSource", videoSource.get());
357        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
358    }
359
360    if (subtitleSource != NULL) {
361        msg->setPointer("subtitleSource", subtitleSource.get());
362        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
363    }
364
365    msg->setInt32("streamTypeMask", streamTypeMask);
366    msg->setInt64("startTimeUs", startTimeUs);
367    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
368    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
369    msg->setInt32("adaptive", adaptive);
370    msg->post();
371}
372
373void PlaylistFetcher::pauseAsync() {
374    (new AMessage(kWhatPause, id()))->post();
375}
376
377void PlaylistFetcher::stopAsync(bool clear) {
378    sp<AMessage> msg = new AMessage(kWhatStop, id());
379    msg->setInt32("clear", clear);
380    msg->post();
381}
382
383void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
384    AMessage* msg = new AMessage(kWhatResumeUntil, id());
385    msg->setMessage("params", params);
386    msg->post();
387}
388
389void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
390    switch (msg->what()) {
391        case kWhatStart:
392        {
393            status_t err = onStart(msg);
394
395            sp<AMessage> notify = mNotify->dup();
396            notify->setInt32("what", kWhatStarted);
397            notify->setInt32("err", err);
398            notify->post();
399            break;
400        }
401
402        case kWhatPause:
403        {
404            onPause();
405
406            sp<AMessage> notify = mNotify->dup();
407            notify->setInt32("what", kWhatPaused);
408            notify->post();
409            break;
410        }
411
412        case kWhatStop:
413        {
414            onStop(msg);
415
416            sp<AMessage> notify = mNotify->dup();
417            notify->setInt32("what", kWhatStopped);
418            notify->post();
419            break;
420        }
421
422        case kWhatMonitorQueue:
423        case kWhatDownloadNext:
424        {
425            int32_t generation;
426            CHECK(msg->findInt32("generation", &generation));
427
428            if (generation != mMonitorQueueGeneration) {
429                // Stale event
430                break;
431            }
432
433            if (msg->what() == kWhatMonitorQueue) {
434                onMonitorQueue();
435            } else {
436                onDownloadNext();
437            }
438            break;
439        }
440
441        case kWhatResumeUntil:
442        {
443            onResumeUntil(msg);
444            break;
445        }
446
447        default:
448            TRESPASS();
449    }
450}
451
452status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
453    mPacketSources.clear();
454
455    uint32_t streamTypeMask;
456    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
457
458    int64_t startTimeUs;
459    int64_t segmentStartTimeUs;
460    int32_t startDiscontinuitySeq;
461    int32_t adaptive;
462    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
463    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
464    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
465    CHECK(msg->findInt32("adaptive", &adaptive));
466
467    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
468        void *ptr;
469        CHECK(msg->findPointer("audioSource", &ptr));
470
471        mPacketSources.add(
472                LiveSession::STREAMTYPE_AUDIO,
473                static_cast<AnotherPacketSource *>(ptr));
474    }
475
476    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
477        void *ptr;
478        CHECK(msg->findPointer("videoSource", &ptr));
479
480        mPacketSources.add(
481                LiveSession::STREAMTYPE_VIDEO,
482                static_cast<AnotherPacketSource *>(ptr));
483    }
484
485    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
486        void *ptr;
487        CHECK(msg->findPointer("subtitleSource", &ptr));
488
489        mPacketSources.add(
490                LiveSession::STREAMTYPE_SUBTITLES,
491                static_cast<AnotherPacketSource *>(ptr));
492    }
493
494    mStreamTypeMask = streamTypeMask;
495
496    mSegmentStartTimeUs = segmentStartTimeUs;
497    mDiscontinuitySeq = startDiscontinuitySeq;
498
499    if (startTimeUs >= 0) {
500        mStartTimeUs = startTimeUs;
501        mSeqNumber = -1;
502        mStartup = true;
503        mPrepared = false;
504        mAdaptive = adaptive;
505    }
506
507    postMonitorQueue();
508
509    return OK;
510}
511
512void PlaylistFetcher::onPause() {
513    cancelMonitorQueue();
514}
515
516void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
517    cancelMonitorQueue();
518
519    int32_t clear;
520    CHECK(msg->findInt32("clear", &clear));
521    if (clear) {
522        for (size_t i = 0; i < mPacketSources.size(); i++) {
523            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
524            packetSource->clear();
525        }
526    }
527
528    mPacketSources.clear();
529    mStreamTypeMask = 0;
530}
531
532// Resume until we have reached the boundary timestamps listed in `msg`; when
533// the remaining time is too short (within a resume threshold) stop immediately
534// instead.
535status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
536    sp<AMessage> params;
537    CHECK(msg->findMessage("params", &params));
538
539    bool stop = false;
540    for (size_t i = 0; i < mPacketSources.size(); i++) {
541        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
542
543        const char *stopKey;
544        int streamType = mPacketSources.keyAt(i);
545        switch (streamType) {
546        case LiveSession::STREAMTYPE_VIDEO:
547            stopKey = "timeUsVideo";
548            break;
549
550        case LiveSession::STREAMTYPE_AUDIO:
551            stopKey = "timeUsAudio";
552            break;
553
554        case LiveSession::STREAMTYPE_SUBTITLES:
555            stopKey = "timeUsSubtitle";
556            break;
557
558        default:
559            TRESPASS();
560        }
561
562        // Don't resume if we would stop within a resume threshold.
563        int32_t discontinuitySeq;
564        int64_t latestTimeUs = 0, stopTimeUs = 0;
565        sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta();
566        if (latestMeta != NULL
567                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
568                && discontinuitySeq == mDiscontinuitySeq
569                && latestMeta->findInt64("timeUs", &latestTimeUs)
570                && params->findInt64(stopKey, &stopTimeUs)
571                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
572            stop = true;
573        }
574    }
575
576    if (stop) {
577        for (size_t i = 0; i < mPacketSources.size(); i++) {
578            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
579        }
580        stopAsync(/* clear = */ false);
581        return OK;
582    }
583
584    mStopParams = params;
585    postMonitorQueue();
586
587    return OK;
588}
589
590void PlaylistFetcher::notifyError(status_t err) {
591    sp<AMessage> notify = mNotify->dup();
592    notify->setInt32("what", kWhatError);
593    notify->setInt32("err", err);
594    notify->post();
595}
596
597void PlaylistFetcher::queueDiscontinuity(
598        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
599    for (size_t i = 0; i < mPacketSources.size(); ++i) {
600        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
601        // (seek will discard buffer by abandoning old fetchers)
602        mPacketSources.valueAt(i)->queueDiscontinuity(
603                type, extra, false /* discard */);
604    }
605}
606
607void PlaylistFetcher::onMonitorQueue() {
608    bool downloadMore = false;
609    refreshPlaylist();
610
611    int32_t targetDurationSecs;
612    int64_t targetDurationUs = kMinBufferedDurationUs;
613    if (mPlaylist != NULL) {
614        if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
615                "target-duration", &targetDurationSecs)) {
616            ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag");
617            notifyError(ERROR_MALFORMED);
618            return;
619        }
620        targetDurationUs = targetDurationSecs * 1000000ll;
621    }
622
623    // buffer at least 3 times the target duration, or up to 10 seconds
624    int64_t durationToBufferUs = targetDurationUs * 3;
625    if (durationToBufferUs > kMinBufferedDurationUs)  {
626        durationToBufferUs = kMinBufferedDurationUs;
627    }
628
629    int64_t bufferedDurationUs = 0ll;
630    status_t finalResult = NOT_ENOUGH_DATA;
631    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
632        sp<AnotherPacketSource> packetSource =
633            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
634
635        bufferedDurationUs =
636                packetSource->getBufferedDurationUs(&finalResult);
637        finalResult = OK;
638    } else {
639        // Use max stream duration to prevent us from waiting on a non-existent stream;
640        // when we cannot make out from the manifest what streams are included in a playlist
641        // we might assume extra streams.
642        for (size_t i = 0; i < mPacketSources.size(); ++i) {
643            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
644                continue;
645            }
646
647            int64_t bufferedStreamDurationUs =
648                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
649            ALOGV("buffered %" PRId64 " for stream %d",
650                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
651            if (bufferedStreamDurationUs > bufferedDurationUs) {
652                bufferedDurationUs = bufferedStreamDurationUs;
653            }
654        }
655    }
656    downloadMore = (bufferedDurationUs < durationToBufferUs);
657
658    // signal start if buffered up at least the target size
659    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
660        mPrepared = true;
661
662        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
663                bufferedDurationUs, targetDurationUs);
664        sp<AMessage> msg = mNotify->dup();
665        msg->setInt32("what", kWhatTemporarilyDoneFetching);
666        msg->post();
667    }
668
669    if (finalResult == OK && downloadMore) {
670        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
671                bufferedDurationUs, durationToBufferUs);
672        // delay the next download slightly; hopefully this gives other concurrent fetchers
673        // a better chance to run.
674        // onDownloadNext();
675        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
676        msg->setInt32("generation", mMonitorQueueGeneration);
677        msg->post(1000l);
678    } else {
679        // Nothing to do yet, try again in a second.
680
681        sp<AMessage> msg = mNotify->dup();
682        msg->setInt32("what", kWhatTemporarilyDoneFetching);
683        msg->post();
684
685        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
686        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
687                delayUs, bufferedDurationUs, durationToBufferUs);
688        // :TRICKY: need to enforce minimum delay because the delay to
689        // refresh the playlist will become 0
690        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
691    }
692}
693
694status_t PlaylistFetcher::refreshPlaylist() {
695    if (delayUsToRefreshPlaylist() <= 0) {
696        bool unchanged;
697        sp<M3UParser> playlist = mSession->fetchPlaylist(
698                mURI.c_str(), mPlaylistHash, &unchanged);
699
700        if (playlist == NULL) {
701            if (unchanged) {
702                // We succeeded in fetching the playlist, but it was
703                // unchanged from the last time we tried.
704
705                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
706                    mRefreshState = (RefreshState)(mRefreshState + 1);
707                }
708            } else {
709                ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
710                return ERROR_IO;
711            }
712        } else {
713            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
714            mPlaylist = playlist;
715
716            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
717                updateDuration();
718            }
719        }
720
721        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
722    }
723    return OK;
724}
725
726// static
727bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
728    return buffer->size() > 0 && buffer->data()[0] == 0x47;
729}
730
731void PlaylistFetcher::onDownloadNext() {
732    status_t err = refreshPlaylist();
733    int32_t firstSeqNumberInPlaylist = 0;
734    int32_t lastSeqNumberInPlaylist = 0;
735    bool discontinuity = false;
736
737    if (mPlaylist != NULL) {
738        if (mPlaylist->meta() != NULL) {
739            mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist);
740        }
741
742        lastSeqNumberInPlaylist =
743                firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
744
745        if (mDiscontinuitySeq < 0) {
746            mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
747        }
748    }
749
750    if (mPlaylist != NULL && mSeqNumber < 0) {
751        CHECK_GE(mStartTimeUs, 0ll);
752
753        if (mSegmentStartTimeUs < 0) {
754            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
755                // If this is a live session, start 3 segments from the end on connect
756                mSeqNumber = lastSeqNumberInPlaylist - 3;
757                if (mSeqNumber < firstSeqNumberInPlaylist) {
758                    mSeqNumber = firstSeqNumberInPlaylist;
759                }
760            } else {
761                // When seeking mSegmentStartTimeUs is unavailable (< 0), we
762                // use mStartTimeUs (client supplied timestamp) to determine both start segment
763                // and relative position inside a segment
764                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
765                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
766            }
767            mStartTimeUsRelative = true;
768            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
769                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
770                    lastSeqNumberInPlaylist);
771        } else {
772            // When adapting or track switching, mSegmentStartTimeUs (relative
773            // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
774            // timestamps coming from the media container) is used to determine the position
775            // inside a segments.
776            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
777            if (mAdaptive) {
778                // avoid double fetch/decode
779                mSeqNumber += 1;
780            }
781            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
782            if (mSeqNumber < minSeq) {
783                mSeqNumber = minSeq;
784            }
785
786            if (mSeqNumber < firstSeqNumberInPlaylist) {
787                mSeqNumber = firstSeqNumberInPlaylist;
788            }
789
790            if (mSeqNumber > lastSeqNumberInPlaylist) {
791                mSeqNumber = lastSeqNumberInPlaylist;
792            }
793            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
794                    mSeqNumber, firstSeqNumberInPlaylist,
795                    lastSeqNumberInPlaylist);
796        }
797    }
798
799    // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
800    if (mSeqNumber < firstSeqNumberInPlaylist
801            || mSeqNumber > lastSeqNumberInPlaylist
802            || err != OK) {
803        if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
804            ++mNumRetries;
805
806            if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
807                // make sure we reach this retry logic on refresh failures
808                // by adding an err != OK clause to all enclosing if's.
809
810                // refresh in increasing fraction (1/2, 1/3, ...) of the
811                // playlist's target duration or 3 seconds, whichever is less
812                int64_t delayUs = kMaxMonitorDelayUs;
813                if (mPlaylist != NULL && mPlaylist->meta() != NULL) {
814                    int32_t targetDurationSecs;
815                    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
816                    delayUs = mPlaylist->size() * targetDurationSecs *
817                            1000000ll / (1 + mNumRetries);
818                }
819                if (delayUs > kMaxMonitorDelayUs) {
820                    delayUs = kMaxMonitorDelayUs;
821                }
822                ALOGV("sequence number high: %d from (%d .. %d), "
823                      "monitor in %" PRId64 " (retry=%d)",
824                        mSeqNumber, firstSeqNumberInPlaylist,
825                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
826                postMonitorQueue(delayUs);
827                return;
828            }
829
830            if (err != OK) {
831                notifyError(err);
832                return;
833            }
834
835            // we've missed the boat, let's start 3 segments prior to the latest sequence
836            // number available and signal a discontinuity.
837
838            ALOGI("We've missed the boat, restarting playback."
839                  "  mStartup=%d, was  looking for %d in %d-%d",
840                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
841                    lastSeqNumberInPlaylist);
842            if (mStopParams != NULL) {
843                // we should have kept on fetching until we hit the boundaries in mStopParams,
844                // but since the segments we are supposed to fetch have already rolled off
845                // the playlist, i.e. we have already missed the boat, we inevitably have to
846                // skip.
847                for (size_t i = 0; i < mPacketSources.size(); i++) {
848                    sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
849                    mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
850                }
851                stopAsync(/* clear = */ false);
852                return;
853            }
854            mSeqNumber = lastSeqNumberInPlaylist - 3;
855            if (mSeqNumber < firstSeqNumberInPlaylist) {
856                mSeqNumber = firstSeqNumberInPlaylist;
857            }
858            discontinuity = true;
859
860            // fall through
861        } else {
862            ALOGE("Cannot find sequence number %d in playlist "
863                 "(contains %d - %d)",
864                 mSeqNumber, firstSeqNumberInPlaylist,
865                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
866
867            notifyError(ERROR_END_OF_STREAM);
868            return;
869        }
870    }
871
872    mNumRetries = 0;
873
874    AString uri;
875    sp<AMessage> itemMeta;
876    CHECK(mPlaylist->itemAt(
877                mSeqNumber - firstSeqNumberInPlaylist,
878                &uri,
879                &itemMeta));
880
881    int32_t val;
882    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
883        mDiscontinuitySeq++;
884        discontinuity = true;
885    }
886
887    int64_t range_offset, range_length;
888    if (!itemMeta->findInt64("range-offset", &range_offset)
889            || !itemMeta->findInt64("range-length", &range_length)) {
890        range_offset = 0;
891        range_length = -1;
892    }
893
894    ALOGV("fetching segment %d from (%d .. %d)",
895          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
896
897    ALOGV("fetching '%s'", uri.c_str());
898
899    sp<DataSource> source;
900    sp<ABuffer> buffer, tsBuffer;
901    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
902    // this avoids interleaved connections to the key and segment file.
903    {
904        sp<ABuffer> junk = new ABuffer(16);
905        junk->setRange(0, 16);
906        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
907                true /* first */);
908        if (err != OK) {
909            notifyError(err);
910            return;
911        }
912    }
913
914    // block-wise download
915    bool startup = mStartup;
916    ssize_t bytesRead;
917    do {
918        bytesRead = mSession->fetchFile(
919                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
920
921        if (bytesRead < 0) {
922            status_t err = bytesRead;
923            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
924            notifyError(err);
925            return;
926        }
927
928        CHECK(buffer != NULL);
929
930        size_t size = buffer->size();
931        // Set decryption range.
932        buffer->setRange(size - bytesRead, bytesRead);
933        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
934                buffer->offset() == 0 /* first */);
935        // Unset decryption range.
936        buffer->setRange(0, size);
937
938        if (err != OK) {
939            ALOGE("decryptBuffer failed w/ error %d", err);
940
941            notifyError(err);
942            return;
943        }
944
945        if (startup || discontinuity) {
946            // Signal discontinuity.
947
948            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
949                // If this was a live event this made no sense since
950                // we don't have access to all the segment before the current
951                // one.
952                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
953            }
954
955            if (discontinuity) {
956                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
957
958                queueDiscontinuity(
959                        ATSParser::DISCONTINUITY_FORMATCHANGE,
960                        NULL /* extra */);
961
962                discontinuity = false;
963            }
964
965            startup = false;
966        }
967
968        err = OK;
969        if (bufferStartsWithTsSyncByte(buffer)) {
970            // Incremental extraction is only supported for MPEG2 transport streams.
971            if (tsBuffer == NULL) {
972                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
973                tsBuffer->setRange(0, 0);
974            } else if (tsBuffer->capacity() != buffer->capacity()) {
975                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
976                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
977                tsBuffer->setRange(tsOff, tsSize);
978            }
979            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
980
981            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
982        }
983
984        if (err == -EAGAIN) {
985            // starting sequence number too low/high
986            mTSParser.clear();
987            for (size_t i = 0; i < mPacketSources.size(); i++) {
988                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
989                packetSource->clear();
990            }
991            postMonitorQueue();
992            return;
993        } else if (err == ERROR_OUT_OF_RANGE) {
994            // reached stopping point
995            stopAsync(/* clear = */ false);
996            return;
997        } else if (err != OK) {
998            notifyError(err);
999            return;
1000        }
1001
1002    } while (bytesRead != 0);
1003
1004    if (bufferStartsWithTsSyncByte(buffer)) {
1005        // If we don't see a stream in the program table after fetching a full ts segment
1006        // mark it as nonexistent.
1007        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
1008        ATSParser::SourceType srcTypes[kNumTypes] =
1009                { ATSParser::VIDEO, ATSParser::AUDIO };
1010        LiveSession::StreamType streamTypes[kNumTypes] =
1011                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1012
1013        for (size_t i = 0; i < kNumTypes; i++) {
1014            ATSParser::SourceType srcType = srcTypes[i];
1015            LiveSession::StreamType streamType = streamTypes[i];
1016
1017            sp<AnotherPacketSource> source =
1018                static_cast<AnotherPacketSource *>(
1019                    mTSParser->getSource(srcType).get());
1020
1021            if (!mTSParser->hasSource(srcType)) {
1022                ALOGW("MPEG2 Transport stream does not contain %s data.",
1023                      srcType == ATSParser::VIDEO ? "video" : "audio");
1024
1025                mStreamTypeMask &= ~streamType;
1026                mPacketSources.removeItem(streamType);
1027            }
1028        }
1029
1030    }
1031
1032    if (checkDecryptPadding(buffer) != OK) {
1033        ALOGE("Incorrect padding bytes after decryption.");
1034        notifyError(ERROR_MALFORMED);
1035        return;
1036    }
1037
1038    err = OK;
1039    if (tsBuffer != NULL) {
1040        AString method;
1041        CHECK(buffer->meta()->findString("cipher-method", &method));
1042        if ((tsBuffer->size() > 0 && method == "NONE")
1043                || tsBuffer->size() > 16) {
1044            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1045                    "bytes in length.");
1046            notifyError(ERROR_MALFORMED);
1047            return;
1048        }
1049    }
1050
1051    // bulk extract non-ts files
1052    if (tsBuffer == NULL) {
1053        err = extractAndQueueAccessUnits(buffer, itemMeta);
1054        if (err == -EAGAIN) {
1055            // starting sequence number too low/high
1056            postMonitorQueue();
1057            return;
1058        } else if (err == ERROR_OUT_OF_RANGE) {
1059            // reached stopping point
1060            stopAsync(/* clear = */false);
1061            return;
1062        }
1063    }
1064
1065    if (err != OK) {
1066        notifyError(err);
1067        return;
1068    }
1069
1070    ++mSeqNumber;
1071
1072    postMonitorQueue();
1073}
1074
1075int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1076    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1077    if (mPlaylist->meta() == NULL
1078            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1079        firstSeqNumberInPlaylist = 0;
1080    }
1081    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1082
1083    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1084    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1085        sp<AMessage> itemMeta;
1086        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1087
1088        int64_t itemDurationUs;
1089        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1090
1091        anchorTimeUs -= itemDurationUs;
1092        --index;
1093    }
1094
1095    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1096    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1097        return newSeqNumber;
1098    } else {
1099        return lastSeqNumberInPlaylist;
1100    }
1101}
1102
1103int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1104    int32_t firstSeqNumberInPlaylist;
1105    if (mPlaylist->meta() == NULL
1106            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1107        firstSeqNumberInPlaylist = 0;
1108    }
1109
1110    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1111    if (discontinuitySeq < curDiscontinuitySeq) {
1112        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1113    }
1114
1115    size_t index = 0;
1116    while (index < mPlaylist->size()) {
1117        sp<AMessage> itemMeta;
1118        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1119
1120        int64_t discontinuity;
1121        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1122            curDiscontinuitySeq++;
1123        }
1124
1125        if (curDiscontinuitySeq == discontinuitySeq) {
1126            return firstSeqNumberInPlaylist + index;
1127        }
1128
1129        ++index;
1130    }
1131
1132    return firstSeqNumberInPlaylist + mPlaylist->size();
1133}
1134
1135int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1136    int32_t firstSeqNumberInPlaylist;
1137    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1138                "media-sequence", &firstSeqNumberInPlaylist)) {
1139        firstSeqNumberInPlaylist = 0;
1140    }
1141
1142    size_t index = 0;
1143    int64_t segmentStartUs = 0;
1144    while (index < mPlaylist->size()) {
1145        sp<AMessage> itemMeta;
1146        CHECK(mPlaylist->itemAt(
1147                    index, NULL /* uri */, &itemMeta));
1148
1149        int64_t itemDurationUs;
1150        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1151
1152        if (timeUs < segmentStartUs + itemDurationUs) {
1153            break;
1154        }
1155
1156        segmentStartUs += itemDurationUs;
1157        ++index;
1158    }
1159
1160    if (index >= mPlaylist->size()) {
1161        index = mPlaylist->size() - 1;
1162    }
1163
1164    return firstSeqNumberInPlaylist + index;
1165}
1166
1167const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1168        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1169    sp<MetaData> format = source->getFormat();
1170    if (format != NULL) {
1171        // for simplicity, store a reference to the format in each unit
1172        accessUnit->meta()->setObject("format", format);
1173    }
1174
1175    if (discard) {
1176        accessUnit->meta()->setInt32("discard", discard);
1177    }
1178
1179    int32_t targetDurationSecs;
1180    if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) {
1181        accessUnit->meta()->setInt32("targetDuration", targetDurationSecs);
1182    }
1183
1184    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1185    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1186    return accessUnit;
1187}
1188
1189status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1190    if (mTSParser == NULL) {
1191        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1192        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1193    }
1194
1195    if (mNextPTSTimeUs >= 0ll) {
1196        sp<AMessage> extra = new AMessage;
1197        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1198        // ATSParser from skewing the timestamps of access units.
1199        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1200
1201        mTSParser->signalDiscontinuity(
1202                ATSParser::DISCONTINUITY_TIME, extra);
1203
1204        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1205        mNextPTSTimeUs = -1ll;
1206        mFirstPTSValid = false;
1207    }
1208
1209    size_t offset = 0;
1210    while (offset + 188 <= buffer->size()) {
1211        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1212
1213        if (err != OK) {
1214            return err;
1215        }
1216
1217        offset += 188;
1218    }
1219    // setRange to indicate consumed bytes.
1220    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1221
1222    status_t err = OK;
1223    for (size_t i = mPacketSources.size(); i-- > 0;) {
1224        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1225
1226        const char *key;
1227        ATSParser::SourceType type;
1228        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1229        switch (stream) {
1230            case LiveSession::STREAMTYPE_VIDEO:
1231                type = ATSParser::VIDEO;
1232                key = "timeUsVideo";
1233                break;
1234
1235            case LiveSession::STREAMTYPE_AUDIO:
1236                type = ATSParser::AUDIO;
1237                key = "timeUsAudio";
1238                break;
1239
1240            case LiveSession::STREAMTYPE_SUBTITLES:
1241            {
1242                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1243                return ERROR_MALFORMED;
1244                break;
1245            }
1246
1247            default:
1248                TRESPASS();
1249        }
1250
1251        sp<AnotherPacketSource> source =
1252            static_cast<AnotherPacketSource *>(
1253                    mTSParser->getSource(type).get());
1254
1255        if (source == NULL) {
1256            continue;
1257        }
1258
1259        int64_t timeUs;
1260        sp<ABuffer> accessUnit;
1261        status_t finalResult;
1262        while (source->hasBufferAvailable(&finalResult)
1263                && source->dequeueAccessUnit(&accessUnit) == OK) {
1264
1265            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1266
1267            if (mStartup) {
1268                if (!mFirstPTSValid) {
1269                    mFirstTimeUs = timeUs;
1270                    mFirstPTSValid = true;
1271                }
1272                if (mStartTimeUsRelative) {
1273                    timeUs -= mFirstTimeUs;
1274                    if (timeUs < 0) {
1275                        timeUs = 0;
1276                    }
1277                }
1278
1279                if (timeUs < mStartTimeUs) {
1280                    // buffer up to the closest preceding IDR frame
1281                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1282                            timeUs, mStartTimeUs);
1283                    const char *mime;
1284                    sp<MetaData> format  = source->getFormat();
1285                    bool isAvc = false;
1286                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1287                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1288                        isAvc = true;
1289                    }
1290                    if (isAvc && IsIDR(accessUnit)) {
1291                        mVideoBuffer->clear();
1292                    }
1293                    if (isAvc) {
1294                        mVideoBuffer->queueAccessUnit(accessUnit);
1295                    }
1296
1297                    continue;
1298                }
1299            }
1300
1301            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1302            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1303                int32_t firstSeqNumberInPlaylist;
1304                if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1305                            "media-sequence", &firstSeqNumberInPlaylist)) {
1306                    firstSeqNumberInPlaylist = 0;
1307                }
1308
1309                int32_t targetDurationSecs;
1310                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1311                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1312                // mStartup
1313                //   mStartup is true until we have queued a packet for all the streams
1314                //   we are fetching. We queue packets whose timestamps are greater than
1315                //   mStartTimeUs.
1316                // mSegmentStartTimeUs >= 0
1317                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1318                // mSeqNumber > firstSeqNumberInPlaylist
1319                //   don't decrement mSeqNumber if it already points to the 1st segment
1320                // timeUs - mStartTimeUs > targetDurationUs:
1321                //   This and the 2 above conditions should only happen when adapting in a live
1322                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1323                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1324                //   the old fetcher would then continue fetching data until timeUs. We don't want
1325                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1326                //   stop as early as possible. The definition of being "too far ahead" is
1327                //   arbitrary; here we use targetDurationUs as threshold.
1328                if (mStartup && mSegmentStartTimeUs >= 0
1329                        && mSeqNumber > firstSeqNumberInPlaylist
1330                        && timeUs - mStartTimeUs > targetDurationUs) {
1331                    // we just guessed a starting timestamp that is too high when adapting in a
1332                    // live stream; re-adjust based on the actual timestamp extracted from the
1333                    // media segment; if we didn't move backward after the re-adjustment
1334                    // (newSeqNumber), start at least 1 segment prior.
1335                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1336                    if (newSeqNumber >= mSeqNumber) {
1337                        --mSeqNumber;
1338                    } else {
1339                        mSeqNumber = newSeqNumber;
1340                    }
1341                    mStartTimeUsNotify = mNotify->dup();
1342                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1343                    return -EAGAIN;
1344                }
1345
1346                int32_t seq;
1347                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1348                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1349                }
1350                int64_t startTimeUs;
1351                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1352                    mStartTimeUsNotify->setInt64(key, timeUs);
1353
1354                    uint32_t streamMask = 0;
1355                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1356                    streamMask |= mPacketSources.keyAt(i);
1357                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1358
1359                    if (streamMask == mStreamTypeMask) {
1360                        mStartup = false;
1361                        mStartTimeUsNotify->post();
1362                        mStartTimeUsNotify.clear();
1363                    }
1364                }
1365            }
1366
1367            if (mStopParams != NULL) {
1368                // Queue discontinuity in original stream.
1369                int32_t discontinuitySeq;
1370                int64_t stopTimeUs;
1371                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1372                        || discontinuitySeq > mDiscontinuitySeq
1373                        || !mStopParams->findInt64(key, &stopTimeUs)
1374                        || (discontinuitySeq == mDiscontinuitySeq
1375                                && timeUs >= stopTimeUs)) {
1376                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1377                    mStreamTypeMask &= ~stream;
1378                    mPacketSources.removeItemsAt(i);
1379                    break;
1380                }
1381            }
1382
1383            // Note that we do NOT dequeue any discontinuities except for format change.
1384            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1385                const bool discard = true;
1386                status_t status;
1387                while (mVideoBuffer->hasBufferAvailable(&status)) {
1388                    sp<ABuffer> videoBuffer;
1389                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1390                    setAccessUnitProperties(videoBuffer, source, discard);
1391                    packetSource->queueAccessUnit(videoBuffer);
1392                }
1393            }
1394
1395            setAccessUnitProperties(accessUnit, source);
1396            packetSource->queueAccessUnit(accessUnit);
1397        }
1398
1399        if (err != OK) {
1400            break;
1401        }
1402    }
1403
1404    if (err != OK) {
1405        for (size_t i = mPacketSources.size(); i-- > 0;) {
1406            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1407            packetSource->clear();
1408        }
1409        return err;
1410    }
1411
1412    if (!mStreamTypeMask) {
1413        // Signal gap is filled between original and new stream.
1414        ALOGV("ERROR OUT OF RANGE");
1415        return ERROR_OUT_OF_RANGE;
1416    }
1417
1418    return OK;
1419}
1420
1421/* static */
1422bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1423        const sp<ABuffer> &buffer) {
1424    size_t pos = 0;
1425
1426    // skip possible BOM
1427    if (buffer->size() >= pos + 3 &&
1428            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1429        pos += 3;
1430    }
1431
1432    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1433    if (buffer->size() < pos + 6 ||
1434            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1435        return false;
1436    }
1437    pos += 6;
1438
1439    if (buffer->size() == pos) {
1440        return true;
1441    }
1442
1443    uint8_t sep = buffer->data()[pos];
1444    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1445}
1446
1447status_t PlaylistFetcher::extractAndQueueAccessUnits(
1448        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1449    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1450        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1451            ALOGE("This stream only contains subtitles.");
1452            return ERROR_MALFORMED;
1453        }
1454
1455        const sp<AnotherPacketSource> packetSource =
1456            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1457
1458        int64_t durationUs;
1459        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1460        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1461        buffer->meta()->setInt64("durationUs", durationUs);
1462        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1463        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1464        buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1465
1466        packetSource->queueAccessUnit(buffer);
1467        return OK;
1468    }
1469
1470    if (mNextPTSTimeUs >= 0ll) {
1471        mFirstPTSValid = false;
1472        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1473        mNextPTSTimeUs = -1ll;
1474    }
1475
1476    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1477    // stream prefixed by an ID3 tag.
1478
1479    bool firstID3Tag = true;
1480    uint64_t PTS = 0;
1481
1482    for (;;) {
1483        // Make sure to skip all ID3 tags preceding the audio data.
1484        // At least one must be present to provide the PTS timestamp.
1485
1486        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1487        if (!id3.isValid()) {
1488            if (firstID3Tag) {
1489                ALOGE("Unable to parse ID3 tag.");
1490                return ERROR_MALFORMED;
1491            } else {
1492                break;
1493            }
1494        }
1495
1496        if (firstID3Tag) {
1497            bool found = false;
1498
1499            ID3::Iterator it(id3, "PRIV");
1500            while (!it.done()) {
1501                size_t length;
1502                const uint8_t *data = it.getData(&length);
1503
1504                static const char *kMatchName =
1505                    "com.apple.streaming.transportStreamTimestamp";
1506                static const size_t kMatchNameLen = strlen(kMatchName);
1507
1508                if (length == kMatchNameLen + 1 + 8
1509                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1510                    found = true;
1511                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1512                }
1513
1514                it.next();
1515            }
1516
1517            if (!found) {
1518                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1519                return ERROR_MALFORMED;
1520            }
1521        }
1522
1523        // skip the ID3 tag
1524        buffer->setRange(
1525                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1526
1527        firstID3Tag = false;
1528    }
1529
1530    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1531        ALOGW("This stream only contains audio data!");
1532
1533        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1534
1535        if (mStreamTypeMask == 0) {
1536            return OK;
1537        }
1538    }
1539
1540    sp<AnotherPacketSource> packetSource =
1541        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1542
1543    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1544        ABitReader bits(buffer->data(), buffer->size());
1545
1546        // adts_fixed_header
1547
1548        CHECK_EQ(bits.getBits(12), 0xfffu);
1549        bits.skipBits(3);  // ID, layer
1550        bool protection_absent = bits.getBits(1) != 0;
1551
1552        unsigned profile = bits.getBits(2);
1553        CHECK_NE(profile, 3u);
1554        unsigned sampling_freq_index = bits.getBits(4);
1555        bits.getBits(1);  // private_bit
1556        unsigned channel_configuration = bits.getBits(3);
1557        CHECK_NE(channel_configuration, 0u);
1558        bits.skipBits(2);  // original_copy, home
1559
1560        sp<MetaData> meta = MakeAACCodecSpecificData(
1561                profile, sampling_freq_index, channel_configuration);
1562
1563        meta->setInt32(kKeyIsADTS, true);
1564
1565        packetSource->setFormat(meta);
1566    }
1567
1568    int64_t numSamples = 0ll;
1569    int32_t sampleRate;
1570    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1571
1572    int64_t timeUs = (PTS * 100ll) / 9ll;
1573    if (!mFirstPTSValid) {
1574        mFirstPTSValid = true;
1575        mFirstTimeUs = timeUs;
1576    }
1577
1578    size_t offset = 0;
1579    while (offset < buffer->size()) {
1580        const uint8_t *adtsHeader = buffer->data() + offset;
1581        CHECK_LT(offset + 5, buffer->size());
1582
1583        unsigned aac_frame_length =
1584            ((adtsHeader[3] & 3) << 11)
1585            | (adtsHeader[4] << 3)
1586            | (adtsHeader[5] >> 5);
1587
1588        if (aac_frame_length == 0) {
1589            const uint8_t *id3Header = adtsHeader;
1590            if (!memcmp(id3Header, "ID3", 3)) {
1591                ID3 id3(id3Header, buffer->size() - offset, true);
1592                if (id3.isValid()) {
1593                    offset += id3.rawSize();
1594                    continue;
1595                };
1596            }
1597            return ERROR_MALFORMED;
1598        }
1599
1600        CHECK_LE(offset + aac_frame_length, buffer->size());
1601
1602        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1603        offset += aac_frame_length;
1604
1605        // Each AAC frame encodes 1024 samples.
1606        numSamples += 1024;
1607
1608        if (mStartup) {
1609            int64_t startTimeUs = unitTimeUs;
1610            if (mStartTimeUsRelative) {
1611                startTimeUs -= mFirstTimeUs;
1612                if (startTimeUs  < 0) {
1613                    startTimeUs = 0;
1614                }
1615            }
1616            if (startTimeUs < mStartTimeUs) {
1617                continue;
1618            }
1619
1620            if (mStartTimeUsNotify != NULL) {
1621                int32_t targetDurationSecs;
1622                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1623                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1624
1625                // Duplicated logic from how we handle .ts playlists.
1626                if (mStartup && mSegmentStartTimeUs >= 0
1627                        && timeUs - mStartTimeUs > targetDurationUs) {
1628                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1629                    if (newSeqNumber >= mSeqNumber) {
1630                        --mSeqNumber;
1631                    } else {
1632                        mSeqNumber = newSeqNumber;
1633                    }
1634                    return -EAGAIN;
1635                }
1636
1637                mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
1638                mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1639                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
1640                mStartTimeUsNotify->post();
1641                mStartTimeUsNotify.clear();
1642                mStartup = false;
1643            }
1644        }
1645
1646        if (mStopParams != NULL) {
1647            // Queue discontinuity in original stream.
1648            int32_t discontinuitySeq;
1649            int64_t stopTimeUs;
1650            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1651                    || discontinuitySeq > mDiscontinuitySeq
1652                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
1653                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
1654                packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1655                mStreamTypeMask = 0;
1656                mPacketSources.clear();
1657                return ERROR_OUT_OF_RANGE;
1658            }
1659        }
1660
1661        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1662        memcpy(unit->data(), adtsHeader, aac_frame_length);
1663
1664        unit->meta()->setInt64("timeUs", unitTimeUs);
1665        setAccessUnitProperties(unit, packetSource);
1666        packetSource->queueAccessUnit(unit);
1667    }
1668
1669    return OK;
1670}
1671
1672void PlaylistFetcher::updateDuration() {
1673    int64_t durationUs = 0ll;
1674    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1675        sp<AMessage> itemMeta;
1676        CHECK(mPlaylist->itemAt(
1677                    index, NULL /* uri */, &itemMeta));
1678
1679        int64_t itemDurationUs;
1680        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1681
1682        durationUs += itemDurationUs;
1683    }
1684
1685    sp<AMessage> msg = mNotify->dup();
1686    msg->setInt32("what", kWhatDurationUpdate);
1687    msg->setInt64("durationUs", durationUs);
1688    msg->post();
1689}
1690
1691int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1692    int64_t durationUs, threshold;
1693    if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) {
1694        return kNumSkipFrames * durationUs;
1695    }
1696
1697    sp<RefBase> obj;
1698    msg->findObject("format", &obj);
1699    MetaData *format = static_cast<MetaData *>(obj.get());
1700
1701    const char *mime;
1702    CHECK(format->findCString(kKeyMIMEType, &mime));
1703    bool audio = !strncasecmp(mime, "audio/", 6);
1704    if (audio) {
1705        // Assumes 1000 samples per frame.
1706        int32_t sampleRate;
1707        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1708        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1709                * (1000000 / sampleRate) /* sample duration (us) */;
1710    } else {
1711        int32_t frameRate;
1712        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1713            return kNumSkipFrames * (1000000 / frameRate);
1714        }
1715    }
1716
1717    return 500000ll;
1718}
1719
1720}  // namespace android
1721