PlaylistFetcher.cpp revision 7dbbc7ec95c3040668388162a0ffbc45b68af6f1
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52// LCM of 188 (size of a TS packet) & 1k works well
53const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
54const int32_t PlaylistFetcher::kNumSkipFrames = 5;
55
56PlaylistFetcher::PlaylistFetcher(
57        const sp<AMessage> &notify,
58        const sp<LiveSession> &session,
59        const char *uri,
60        int32_t subtitleGeneration)
61    : mNotify(notify),
62      mStartTimeUsNotify(notify->dup()),
63      mSession(session),
64      mURI(uri),
65      mStreamTypeMask(0),
66      mStartTimeUs(-1ll),
67      mSegmentStartTimeUs(-1ll),
68      mDiscontinuitySeq(-1ll),
69      mStartTimeUsRelative(false),
70      mLastPlaylistFetchTimeUs(-1ll),
71      mSeqNumber(-1),
72      mNumRetries(0),
73      mStartup(true),
74      mAdaptive(false),
75      mPrepared(false),
76      mNextPTSTimeUs(-1ll),
77      mMonitorQueueGeneration(0),
78      mSubtitleGeneration(subtitleGeneration),
79      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
80      mFirstPTSValid(false),
81      mAbsoluteTimeAnchorUs(0ll),
82      mVideoBuffer(new AnotherPacketSource(NULL)) {
83    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
84    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
85    mStartTimeUsNotify->setInt32("streamMask", 0);
86}
87
88PlaylistFetcher::~PlaylistFetcher() {
89}
90
91int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
92    CHECK(mPlaylist != NULL);
93
94    int32_t firstSeqNumberInPlaylist;
95    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
96                "media-sequence", &firstSeqNumberInPlaylist)) {
97        firstSeqNumberInPlaylist = 0;
98    }
99
100    int32_t lastSeqNumberInPlaylist =
101        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
102
103    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
104    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
105
106    int64_t segmentStartUs = 0ll;
107    for (int32_t index = 0;
108            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
109        sp<AMessage> itemMeta;
110        CHECK(mPlaylist->itemAt(
111                    index, NULL /* uri */, &itemMeta));
112
113        int64_t itemDurationUs;
114        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
115
116        segmentStartUs += itemDurationUs;
117    }
118
119    return segmentStartUs;
120}
121
122int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
123    int64_t nowUs = ALooper::GetNowUs();
124
125    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
126        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
127        return 0ll;
128    }
129
130    if (mPlaylist->isComplete()) {
131        return (~0llu >> 1);
132    }
133
134    int32_t targetDurationSecs;
135    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
136
137    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
138
139    int64_t minPlaylistAgeUs;
140
141    switch (mRefreshState) {
142        case INITIAL_MINIMUM_RELOAD_DELAY:
143        {
144            size_t n = mPlaylist->size();
145            if (n > 0) {
146                sp<AMessage> itemMeta;
147                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
148
149                int64_t itemDurationUs;
150                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
151
152                minPlaylistAgeUs = itemDurationUs;
153                break;
154            }
155
156            // fall through
157        }
158
159        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
160        {
161            minPlaylistAgeUs = targetDurationUs / 2;
162            break;
163        }
164
165        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
166        {
167            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
168            break;
169        }
170
171        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
172        {
173            minPlaylistAgeUs = targetDurationUs * 3;
174            break;
175        }
176
177        default:
178            TRESPASS();
179            break;
180    }
181
182    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
183    return delayUs > 0ll ? delayUs : 0ll;
184}
185
186status_t PlaylistFetcher::decryptBuffer(
187        size_t playlistIndex, const sp<ABuffer> &buffer,
188        bool first) {
189    sp<AMessage> itemMeta;
190    bool found = false;
191    AString method;
192
193    for (ssize_t i = playlistIndex; i >= 0; --i) {
194        AString uri;
195        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
196
197        if (itemMeta->findString("cipher-method", &method)) {
198            found = true;
199            break;
200        }
201    }
202
203    if (!found) {
204        method = "NONE";
205    }
206    buffer->meta()->setString("cipher-method", method.c_str());
207
208    if (method == "NONE") {
209        return OK;
210    } else if (!(method == "AES-128")) {
211        ALOGE("Unsupported cipher method '%s'", method.c_str());
212        return ERROR_UNSUPPORTED;
213    }
214
215    AString keyURI;
216    if (!itemMeta->findString("cipher-uri", &keyURI)) {
217        ALOGE("Missing key uri");
218        return ERROR_MALFORMED;
219    }
220
221    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
222
223    sp<ABuffer> key;
224    if (index >= 0) {
225        key = mAESKeyForURI.valueAt(index);
226    } else {
227        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
228
229        if (err < 0) {
230            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
231            return ERROR_IO;
232        } else if (key->size() != 16) {
233            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
234            return ERROR_MALFORMED;
235        }
236
237        mAESKeyForURI.add(keyURI, key);
238    }
239
240    AES_KEY aes_key;
241    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
242        ALOGE("failed to set AES decryption key.");
243        return UNKNOWN_ERROR;
244    }
245
246    size_t n = buffer->size();
247    if (!n) {
248        return OK;
249    }
250    CHECK(n % 16 == 0);
251
252    if (first) {
253        // If decrypting the first block in a file, read the iv from the manifest
254        // or derive the iv from the file's sequence number.
255
256        AString iv;
257        if (itemMeta->findString("cipher-iv", &iv)) {
258            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
259                    || iv.size() != 16 * 2 + 2) {
260                ALOGE("malformed cipher IV '%s'.", iv.c_str());
261                return ERROR_MALFORMED;
262            }
263
264            memset(mAESInitVec, 0, sizeof(mAESInitVec));
265            for (size_t i = 0; i < 16; ++i) {
266                char c1 = tolower(iv.c_str()[2 + 2 * i]);
267                char c2 = tolower(iv.c_str()[3 + 2 * i]);
268                if (!isxdigit(c1) || !isxdigit(c2)) {
269                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
270                    return ERROR_MALFORMED;
271                }
272                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
273                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
274
275                mAESInitVec[i] = nibble1 << 4 | nibble2;
276            }
277        } else {
278            memset(mAESInitVec, 0, sizeof(mAESInitVec));
279            mAESInitVec[15] = mSeqNumber & 0xff;
280            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
281            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
282            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
283        }
284    }
285
286    AES_cbc_encrypt(
287            buffer->data(), buffer->data(), buffer->size(),
288            &aes_key, mAESInitVec, AES_DECRYPT);
289
290    return OK;
291}
292
293status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
294    AString method;
295    CHECK(buffer->meta()->findString("cipher-method", &method));
296    if (method == "NONE") {
297        return OK;
298    }
299
300    uint8_t padding = 0;
301    if (buffer->size() > 0) {
302        padding = buffer->data()[buffer->size() - 1];
303    }
304
305    if (padding > 16) {
306        return ERROR_MALFORMED;
307    }
308
309    for (size_t i = buffer->size() - padding; i < padding; i++) {
310        if (buffer->data()[i] != padding) {
311            return ERROR_MALFORMED;
312        }
313    }
314
315    buffer->setRange(buffer->offset(), buffer->size() - padding);
316    return OK;
317}
318
319void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
320    int64_t maxDelayUs = delayUsToRefreshPlaylist();
321    if (maxDelayUs < minDelayUs) {
322        maxDelayUs = minDelayUs;
323    }
324    if (delayUs > maxDelayUs) {
325        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
326        delayUs = maxDelayUs;
327    }
328    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
329    msg->setInt32("generation", mMonitorQueueGeneration);
330    msg->post(delayUs);
331}
332
333void PlaylistFetcher::cancelMonitorQueue() {
334    ++mMonitorQueueGeneration;
335}
336
337void PlaylistFetcher::startAsync(
338        const sp<AnotherPacketSource> &audioSource,
339        const sp<AnotherPacketSource> &videoSource,
340        const sp<AnotherPacketSource> &subtitleSource,
341        int64_t startTimeUs,
342        int64_t segmentStartTimeUs,
343        int32_t startDiscontinuitySeq,
344        bool adaptive) {
345    sp<AMessage> msg = new AMessage(kWhatStart, id());
346
347    uint32_t streamTypeMask = 0ul;
348
349    if (audioSource != NULL) {
350        msg->setPointer("audioSource", audioSource.get());
351        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
352    }
353
354    if (videoSource != NULL) {
355        msg->setPointer("videoSource", videoSource.get());
356        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
357    }
358
359    if (subtitleSource != NULL) {
360        msg->setPointer("subtitleSource", subtitleSource.get());
361        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
362    }
363
364    msg->setInt32("streamTypeMask", streamTypeMask);
365    msg->setInt64("startTimeUs", startTimeUs);
366    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
367    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
368    msg->setInt32("adaptive", adaptive);
369    msg->post();
370}
371
372void PlaylistFetcher::pauseAsync() {
373    (new AMessage(kWhatPause, id()))->post();
374}
375
376void PlaylistFetcher::stopAsync(bool clear) {
377    sp<AMessage> msg = new AMessage(kWhatStop, id());
378    msg->setInt32("clear", clear);
379    msg->post();
380}
381
382void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
383    AMessage* msg = new AMessage(kWhatResumeUntil, id());
384    msg->setMessage("params", params);
385    msg->post();
386}
387
388void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
389    switch (msg->what()) {
390        case kWhatStart:
391        {
392            status_t err = onStart(msg);
393
394            sp<AMessage> notify = mNotify->dup();
395            notify->setInt32("what", kWhatStarted);
396            notify->setInt32("err", err);
397            notify->post();
398            break;
399        }
400
401        case kWhatPause:
402        {
403            onPause();
404
405            sp<AMessage> notify = mNotify->dup();
406            notify->setInt32("what", kWhatPaused);
407            notify->post();
408            break;
409        }
410
411        case kWhatStop:
412        {
413            onStop(msg);
414
415            sp<AMessage> notify = mNotify->dup();
416            notify->setInt32("what", kWhatStopped);
417            notify->post();
418            break;
419        }
420
421        case kWhatMonitorQueue:
422        case kWhatDownloadNext:
423        {
424            int32_t generation;
425            CHECK(msg->findInt32("generation", &generation));
426
427            if (generation != mMonitorQueueGeneration) {
428                // Stale event
429                break;
430            }
431
432            if (msg->what() == kWhatMonitorQueue) {
433                onMonitorQueue();
434            } else {
435                onDownloadNext();
436            }
437            break;
438        }
439
440        case kWhatResumeUntil:
441        {
442            onResumeUntil(msg);
443            break;
444        }
445
446        default:
447            TRESPASS();
448    }
449}
450
451status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
452    mPacketSources.clear();
453
454    uint32_t streamTypeMask;
455    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
456
457    int64_t startTimeUs;
458    int64_t segmentStartTimeUs;
459    int32_t startDiscontinuitySeq;
460    int32_t adaptive;
461    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
462    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
463    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
464    CHECK(msg->findInt32("adaptive", &adaptive));
465
466    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
467        void *ptr;
468        CHECK(msg->findPointer("audioSource", &ptr));
469
470        mPacketSources.add(
471                LiveSession::STREAMTYPE_AUDIO,
472                static_cast<AnotherPacketSource *>(ptr));
473    }
474
475    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
476        void *ptr;
477        CHECK(msg->findPointer("videoSource", &ptr));
478
479        mPacketSources.add(
480                LiveSession::STREAMTYPE_VIDEO,
481                static_cast<AnotherPacketSource *>(ptr));
482    }
483
484    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
485        void *ptr;
486        CHECK(msg->findPointer("subtitleSource", &ptr));
487
488        mPacketSources.add(
489                LiveSession::STREAMTYPE_SUBTITLES,
490                static_cast<AnotherPacketSource *>(ptr));
491    }
492
493    mStreamTypeMask = streamTypeMask;
494
495    mSegmentStartTimeUs = segmentStartTimeUs;
496    mDiscontinuitySeq = startDiscontinuitySeq;
497
498    if (startTimeUs >= 0) {
499        mStartTimeUs = startTimeUs;
500        mSeqNumber = -1;
501        mStartup = true;
502        mPrepared = false;
503        mAdaptive = adaptive;
504    }
505
506    postMonitorQueue();
507
508    return OK;
509}
510
511void PlaylistFetcher::onPause() {
512    cancelMonitorQueue();
513}
514
515void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
516    cancelMonitorQueue();
517
518    int32_t clear;
519    CHECK(msg->findInt32("clear", &clear));
520    if (clear) {
521        for (size_t i = 0; i < mPacketSources.size(); i++) {
522            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
523            packetSource->clear();
524        }
525    }
526
527    mPacketSources.clear();
528    mStreamTypeMask = 0;
529}
530
531// Resume until we have reached the boundary timestamps listed in `msg`; when
532// the remaining time is too short (within a resume threshold) stop immediately
533// instead.
534status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
535    sp<AMessage> params;
536    CHECK(msg->findMessage("params", &params));
537
538    bool stop = false;
539    for (size_t i = 0; i < mPacketSources.size(); i++) {
540        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
541
542        const char *stopKey;
543        int streamType = mPacketSources.keyAt(i);
544        switch (streamType) {
545        case LiveSession::STREAMTYPE_VIDEO:
546            stopKey = "timeUsVideo";
547            break;
548
549        case LiveSession::STREAMTYPE_AUDIO:
550            stopKey = "timeUsAudio";
551            break;
552
553        case LiveSession::STREAMTYPE_SUBTITLES:
554            stopKey = "timeUsSubtitle";
555            break;
556
557        default:
558            TRESPASS();
559        }
560
561        // Don't resume if we would stop within a resume threshold.
562        int32_t discontinuitySeq;
563        int64_t latestTimeUs = 0, stopTimeUs = 0;
564        sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta();
565        if (latestMeta != NULL
566                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
567                && discontinuitySeq == mDiscontinuitySeq
568                && latestMeta->findInt64("timeUs", &latestTimeUs)
569                && params->findInt64(stopKey, &stopTimeUs)
570                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
571            stop = true;
572        }
573    }
574
575    if (stop) {
576        for (size_t i = 0; i < mPacketSources.size(); i++) {
577            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
578        }
579        stopAsync(/* clear = */ false);
580        return OK;
581    }
582
583    mStopParams = params;
584    postMonitorQueue();
585
586    return OK;
587}
588
589void PlaylistFetcher::notifyError(status_t err) {
590    sp<AMessage> notify = mNotify->dup();
591    notify->setInt32("what", kWhatError);
592    notify->setInt32("err", err);
593    notify->post();
594}
595
596void PlaylistFetcher::queueDiscontinuity(
597        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
598    for (size_t i = 0; i < mPacketSources.size(); ++i) {
599        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
600        // (seek will discard buffer by abandoning old fetchers)
601        mPacketSources.valueAt(i)->queueDiscontinuity(
602                type, extra, false /* discard */);
603    }
604}
605
606void PlaylistFetcher::onMonitorQueue() {
607    bool downloadMore = false;
608    refreshPlaylist();
609
610    int32_t targetDurationSecs;
611    int64_t targetDurationUs = kMinBufferedDurationUs;
612    if (mPlaylist != NULL) {
613        if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
614                "target-duration", &targetDurationSecs)) {
615            ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag");
616            notifyError(ERROR_MALFORMED);
617            return;
618        }
619        targetDurationUs = targetDurationSecs * 1000000ll;
620    }
621
622    // buffer at least 3 times the target duration, or up to 10 seconds
623    int64_t durationToBufferUs = targetDurationUs * 3;
624    if (durationToBufferUs > kMinBufferedDurationUs)  {
625        durationToBufferUs = kMinBufferedDurationUs;
626    }
627
628    int64_t bufferedDurationUs = 0ll;
629    status_t finalResult = NOT_ENOUGH_DATA;
630    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
631        sp<AnotherPacketSource> packetSource =
632            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
633
634        bufferedDurationUs =
635                packetSource->getBufferedDurationUs(&finalResult);
636        finalResult = OK;
637    } else {
638        // Use max stream duration to prevent us from waiting on a non-existent stream;
639        // when we cannot make out from the manifest what streams are included in a playlist
640        // we might assume extra streams.
641        for (size_t i = 0; i < mPacketSources.size(); ++i) {
642            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
643                continue;
644            }
645
646            int64_t bufferedStreamDurationUs =
647                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
648            ALOGV("buffered %" PRId64 " for stream %d",
649                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
650            if (bufferedStreamDurationUs > bufferedDurationUs) {
651                bufferedDurationUs = bufferedStreamDurationUs;
652            }
653        }
654    }
655    downloadMore = (bufferedDurationUs < durationToBufferUs);
656
657    // signal start if buffered up at least the target size
658    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
659        mPrepared = true;
660
661        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
662                bufferedDurationUs, targetDurationUs);
663        sp<AMessage> msg = mNotify->dup();
664        msg->setInt32("what", kWhatTemporarilyDoneFetching);
665        msg->post();
666    }
667
668    if (finalResult == OK && downloadMore) {
669        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
670                bufferedDurationUs, durationToBufferUs);
671        // delay the next download slightly; hopefully this gives other concurrent fetchers
672        // a better chance to run.
673        // onDownloadNext();
674        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
675        msg->setInt32("generation", mMonitorQueueGeneration);
676        msg->post(1000l);
677    } else {
678        // Nothing to do yet, try again in a second.
679
680        sp<AMessage> msg = mNotify->dup();
681        msg->setInt32("what", kWhatTemporarilyDoneFetching);
682        msg->post();
683
684        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
685        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
686                delayUs, bufferedDurationUs, durationToBufferUs);
687        // :TRICKY: need to enforce minimum delay because the delay to
688        // refresh the playlist will become 0
689        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
690    }
691}
692
693status_t PlaylistFetcher::refreshPlaylist() {
694    if (delayUsToRefreshPlaylist() <= 0) {
695        bool unchanged;
696        sp<M3UParser> playlist = mSession->fetchPlaylist(
697                mURI.c_str(), mPlaylistHash, &unchanged);
698
699        if (playlist == NULL) {
700            if (unchanged) {
701                // We succeeded in fetching the playlist, but it was
702                // unchanged from the last time we tried.
703
704                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
705                    mRefreshState = (RefreshState)(mRefreshState + 1);
706                }
707            } else {
708                ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
709                return ERROR_IO;
710            }
711        } else {
712            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
713            mPlaylist = playlist;
714
715            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
716                updateDuration();
717            }
718        }
719
720        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
721    }
722    return OK;
723}
724
725// static
726bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
727    return buffer->size() > 0 && buffer->data()[0] == 0x47;
728}
729
730void PlaylistFetcher::onDownloadNext() {
731    status_t err = refreshPlaylist();
732    int32_t firstSeqNumberInPlaylist = 0;
733    int32_t lastSeqNumberInPlaylist = 0;
734    bool discontinuity = false;
735
736    if (mPlaylist != NULL) {
737        if (mPlaylist->meta() != NULL) {
738            mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist);
739        }
740
741        lastSeqNumberInPlaylist =
742                firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
743
744        if (mDiscontinuitySeq < 0) {
745            mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
746        }
747    }
748
749    if (mPlaylist != NULL && mSeqNumber < 0) {
750        CHECK_GE(mStartTimeUs, 0ll);
751
752        if (mSegmentStartTimeUs < 0) {
753            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
754                // If this is a live session, start 3 segments from the end on connect
755                mSeqNumber = lastSeqNumberInPlaylist - 3;
756                if (mSeqNumber < firstSeqNumberInPlaylist) {
757                    mSeqNumber = firstSeqNumberInPlaylist;
758                }
759            } else {
760                // When seeking mSegmentStartTimeUs is unavailable (< 0), we
761                // use mStartTimeUs (client supplied timestamp) to determine both start segment
762                // and relative position inside a segment
763                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
764                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
765            }
766            mStartTimeUsRelative = true;
767            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
768                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
769                    lastSeqNumberInPlaylist);
770        } else {
771            // When adapting or track switching, mSegmentStartTimeUs (relative
772            // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
773            // timestamps coming from the media container) is used to determine the position
774            // inside a segments.
775            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
776            if (mAdaptive) {
777                // avoid double fetch/decode
778                mSeqNumber += 1;
779            }
780            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
781            if (mSeqNumber < minSeq) {
782                mSeqNumber = minSeq;
783            }
784
785            if (mSeqNumber < firstSeqNumberInPlaylist) {
786                mSeqNumber = firstSeqNumberInPlaylist;
787            }
788
789            if (mSeqNumber > lastSeqNumberInPlaylist) {
790                mSeqNumber = lastSeqNumberInPlaylist;
791            }
792            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
793                    mSeqNumber, firstSeqNumberInPlaylist,
794                    lastSeqNumberInPlaylist);
795        }
796    }
797
798    // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
799    if (mSeqNumber < firstSeqNumberInPlaylist
800            || mSeqNumber > lastSeqNumberInPlaylist
801            || err != OK) {
802        if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
803            ++mNumRetries;
804
805            if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
806                // make sure we reach this retry logic on refresh failures
807                // by adding an err != OK clause to all enclosing if's.
808
809                // refresh in increasing fraction (1/2, 1/3, ...) of the
810                // playlist's target duration or 3 seconds, whichever is less
811                int64_t delayUs = kMaxMonitorDelayUs;
812                if (mPlaylist != NULL && mPlaylist->meta() != NULL) {
813                    int32_t targetDurationSecs;
814                    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
815                    delayUs = mPlaylist->size() * targetDurationSecs *
816                            1000000ll / (1 + mNumRetries);
817                }
818                if (delayUs > kMaxMonitorDelayUs) {
819                    delayUs = kMaxMonitorDelayUs;
820                }
821                ALOGV("sequence number high: %d from (%d .. %d), "
822                      "monitor in %" PRId64 " (retry=%d)",
823                        mSeqNumber, firstSeqNumberInPlaylist,
824                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
825                postMonitorQueue(delayUs);
826                return;
827            }
828
829            if (err != OK) {
830                notifyError(err);
831                return;
832            }
833
834            // we've missed the boat, let's start 3 segments prior to the latest sequence
835            // number available and signal a discontinuity.
836
837            ALOGI("We've missed the boat, restarting playback."
838                  "  mStartup=%d, was  looking for %d in %d-%d",
839                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
840                    lastSeqNumberInPlaylist);
841            if (mStopParams != NULL) {
842                // we should have kept on fetching until we hit the boundaries in mStopParams,
843                // but since the segments we are supposed to fetch have already rolled off
844                // the playlist, i.e. we have already missed the boat, we inevitably have to
845                // skip.
846                for (size_t i = 0; i < mPacketSources.size(); i++) {
847                    sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
848                    mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
849                }
850                stopAsync(/* clear = */ false);
851                return;
852            }
853            mSeqNumber = lastSeqNumberInPlaylist - 3;
854            if (mSeqNumber < firstSeqNumberInPlaylist) {
855                mSeqNumber = firstSeqNumberInPlaylist;
856            }
857            discontinuity = true;
858
859            // fall through
860        } else {
861            ALOGE("Cannot find sequence number %d in playlist "
862                 "(contains %d - %d)",
863                 mSeqNumber, firstSeqNumberInPlaylist,
864                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
865
866            notifyError(ERROR_END_OF_STREAM);
867            return;
868        }
869    }
870
871    mNumRetries = 0;
872
873    AString uri;
874    sp<AMessage> itemMeta;
875    CHECK(mPlaylist->itemAt(
876                mSeqNumber - firstSeqNumberInPlaylist,
877                &uri,
878                &itemMeta));
879
880    int32_t val;
881    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
882        mDiscontinuitySeq++;
883        discontinuity = true;
884    }
885
886    int64_t range_offset, range_length;
887    if (!itemMeta->findInt64("range-offset", &range_offset)
888            || !itemMeta->findInt64("range-length", &range_length)) {
889        range_offset = 0;
890        range_length = -1;
891    }
892
893    ALOGV("fetching segment %d from (%d .. %d)",
894          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
895
896    ALOGV("fetching '%s'", uri.c_str());
897
898    sp<DataSource> source;
899    sp<ABuffer> buffer, tsBuffer;
900    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
901    // this avoids interleaved connections to the key and segment file.
902    {
903        sp<ABuffer> junk = new ABuffer(16);
904        junk->setRange(0, 16);
905        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
906                true /* first */);
907        if (err != OK) {
908            notifyError(err);
909            return;
910        }
911    }
912
913    // block-wise download
914    bool startup = mStartup;
915    ssize_t bytesRead;
916    do {
917        bytesRead = mSession->fetchFile(
918                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
919
920        if (bytesRead < 0) {
921            status_t err = bytesRead;
922            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
923            notifyError(err);
924            return;
925        }
926
927        CHECK(buffer != NULL);
928
929        size_t size = buffer->size();
930        // Set decryption range.
931        buffer->setRange(size - bytesRead, bytesRead);
932        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
933                buffer->offset() == 0 /* first */);
934        // Unset decryption range.
935        buffer->setRange(0, size);
936
937        if (err != OK) {
938            ALOGE("decryptBuffer failed w/ error %d", err);
939
940            notifyError(err);
941            return;
942        }
943
944        if (startup || discontinuity) {
945            // Signal discontinuity.
946
947            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
948                // If this was a live event this made no sense since
949                // we don't have access to all the segment before the current
950                // one.
951                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
952            }
953
954            if (discontinuity) {
955                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
956
957                queueDiscontinuity(
958                        ATSParser::DISCONTINUITY_FORMATCHANGE,
959                        NULL /* extra */);
960
961                discontinuity = false;
962            }
963
964            startup = false;
965        }
966
967        err = OK;
968        if (bufferStartsWithTsSyncByte(buffer)) {
969            // Incremental extraction is only supported for MPEG2 transport streams.
970            if (tsBuffer == NULL) {
971                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
972                tsBuffer->setRange(0, 0);
973            } else if (tsBuffer->capacity() != buffer->capacity()) {
974                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
975                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
976                tsBuffer->setRange(tsOff, tsSize);
977            }
978            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
979
980            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
981        }
982
983        if (err == -EAGAIN) {
984            // starting sequence number too low/high
985            mTSParser.clear();
986            for (size_t i = 0; i < mPacketSources.size(); i++) {
987                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
988                packetSource->clear();
989            }
990            postMonitorQueue();
991            return;
992        } else if (err == ERROR_OUT_OF_RANGE) {
993            // reached stopping point
994            stopAsync(/* clear = */ false);
995            return;
996        } else if (err != OK) {
997            notifyError(err);
998            return;
999        }
1000
1001    } while (bytesRead != 0);
1002
1003    if (bufferStartsWithTsSyncByte(buffer)) {
1004        // If we don't see a stream in the program table after fetching a full ts segment
1005        // mark it as nonexistent.
1006        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
1007        ATSParser::SourceType srcTypes[kNumTypes] =
1008                { ATSParser::VIDEO, ATSParser::AUDIO };
1009        LiveSession::StreamType streamTypes[kNumTypes] =
1010                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1011
1012        for (size_t i = 0; i < kNumTypes; i++) {
1013            ATSParser::SourceType srcType = srcTypes[i];
1014            LiveSession::StreamType streamType = streamTypes[i];
1015
1016            sp<AnotherPacketSource> source =
1017                static_cast<AnotherPacketSource *>(
1018                    mTSParser->getSource(srcType).get());
1019
1020            if (!mTSParser->hasSource(srcType)) {
1021                ALOGW("MPEG2 Transport stream does not contain %s data.",
1022                      srcType == ATSParser::VIDEO ? "video" : "audio");
1023
1024                mStreamTypeMask &= ~streamType;
1025                mPacketSources.removeItem(streamType);
1026            }
1027        }
1028
1029    }
1030
1031    if (checkDecryptPadding(buffer) != OK) {
1032        ALOGE("Incorrect padding bytes after decryption.");
1033        notifyError(ERROR_MALFORMED);
1034        return;
1035    }
1036
1037    err = OK;
1038    if (tsBuffer != NULL) {
1039        AString method;
1040        CHECK(buffer->meta()->findString("cipher-method", &method));
1041        if ((tsBuffer->size() > 0 && method == "NONE")
1042                || tsBuffer->size() > 16) {
1043            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1044                    "bytes in length.");
1045            notifyError(ERROR_MALFORMED);
1046            return;
1047        }
1048    }
1049
1050    // bulk extract non-ts files
1051    if (tsBuffer == NULL) {
1052        err = extractAndQueueAccessUnits(buffer, itemMeta);
1053        if (err == -EAGAIN) {
1054            // starting sequence number too low/high
1055            postMonitorQueue();
1056            return;
1057        } else if (err == ERROR_OUT_OF_RANGE) {
1058            // reached stopping point
1059            stopAsync(/* clear = */false);
1060            return;
1061        }
1062    }
1063
1064    if (err != OK) {
1065        notifyError(err);
1066        return;
1067    }
1068
1069    ++mSeqNumber;
1070
1071    postMonitorQueue();
1072}
1073
1074int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1075    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1076    if (mPlaylist->meta() == NULL
1077            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1078        firstSeqNumberInPlaylist = 0;
1079    }
1080    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1081
1082    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1083    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1084        sp<AMessage> itemMeta;
1085        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1086
1087        int64_t itemDurationUs;
1088        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1089
1090        anchorTimeUs -= itemDurationUs;
1091        --index;
1092    }
1093
1094    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1095    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1096        return newSeqNumber;
1097    } else {
1098        return lastSeqNumberInPlaylist;
1099    }
1100}
1101
1102int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1103    int32_t firstSeqNumberInPlaylist;
1104    if (mPlaylist->meta() == NULL
1105            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1106        firstSeqNumberInPlaylist = 0;
1107    }
1108
1109    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1110    if (discontinuitySeq < curDiscontinuitySeq) {
1111        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1112    }
1113
1114    size_t index = 0;
1115    while (index < mPlaylist->size()) {
1116        sp<AMessage> itemMeta;
1117        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1118
1119        int64_t discontinuity;
1120        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1121            curDiscontinuitySeq++;
1122        }
1123
1124        if (curDiscontinuitySeq == discontinuitySeq) {
1125            return firstSeqNumberInPlaylist + index;
1126        }
1127
1128        ++index;
1129    }
1130
1131    return firstSeqNumberInPlaylist + mPlaylist->size();
1132}
1133
1134int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1135    int32_t firstSeqNumberInPlaylist;
1136    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1137                "media-sequence", &firstSeqNumberInPlaylist)) {
1138        firstSeqNumberInPlaylist = 0;
1139    }
1140
1141    size_t index = 0;
1142    int64_t segmentStartUs = 0;
1143    while (index < mPlaylist->size()) {
1144        sp<AMessage> itemMeta;
1145        CHECK(mPlaylist->itemAt(
1146                    index, NULL /* uri */, &itemMeta));
1147
1148        int64_t itemDurationUs;
1149        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1150
1151        if (timeUs < segmentStartUs + itemDurationUs) {
1152            break;
1153        }
1154
1155        segmentStartUs += itemDurationUs;
1156        ++index;
1157    }
1158
1159    if (index >= mPlaylist->size()) {
1160        index = mPlaylist->size() - 1;
1161    }
1162
1163    return firstSeqNumberInPlaylist + index;
1164}
1165
1166const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1167        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1168    sp<MetaData> format = source->getFormat();
1169    if (format != NULL) {
1170        // for simplicity, store a reference to the format in each unit
1171        accessUnit->meta()->setObject("format", format);
1172    }
1173
1174    if (discard) {
1175        accessUnit->meta()->setInt32("discard", discard);
1176    }
1177
1178    int32_t targetDurationSecs;
1179    if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) {
1180        accessUnit->meta()->setInt32("targetDuration", targetDurationSecs);
1181    }
1182
1183    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1184    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1185    return accessUnit;
1186}
1187
1188status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1189    if (mTSParser == NULL) {
1190        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1191        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1192    }
1193
1194    if (mNextPTSTimeUs >= 0ll) {
1195        sp<AMessage> extra = new AMessage;
1196        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1197        // ATSParser from skewing the timestamps of access units.
1198        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1199
1200        mTSParser->signalDiscontinuity(
1201                ATSParser::DISCONTINUITY_TIME, extra);
1202
1203        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1204        mNextPTSTimeUs = -1ll;
1205        mFirstPTSValid = false;
1206    }
1207
1208    size_t offset = 0;
1209    while (offset + 188 <= buffer->size()) {
1210        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1211
1212        if (err != OK) {
1213            return err;
1214        }
1215
1216        offset += 188;
1217    }
1218    // setRange to indicate consumed bytes.
1219    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1220
1221    status_t err = OK;
1222    for (size_t i = mPacketSources.size(); i-- > 0;) {
1223        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1224
1225        const char *key;
1226        ATSParser::SourceType type;
1227        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1228        switch (stream) {
1229            case LiveSession::STREAMTYPE_VIDEO:
1230                type = ATSParser::VIDEO;
1231                key = "timeUsVideo";
1232                break;
1233
1234            case LiveSession::STREAMTYPE_AUDIO:
1235                type = ATSParser::AUDIO;
1236                key = "timeUsAudio";
1237                break;
1238
1239            case LiveSession::STREAMTYPE_SUBTITLES:
1240            {
1241                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1242                return ERROR_MALFORMED;
1243                break;
1244            }
1245
1246            default:
1247                TRESPASS();
1248        }
1249
1250        sp<AnotherPacketSource> source =
1251            static_cast<AnotherPacketSource *>(
1252                    mTSParser->getSource(type).get());
1253
1254        if (source == NULL) {
1255            continue;
1256        }
1257
1258        int64_t timeUs;
1259        sp<ABuffer> accessUnit;
1260        status_t finalResult;
1261        while (source->hasBufferAvailable(&finalResult)
1262                && source->dequeueAccessUnit(&accessUnit) == OK) {
1263
1264            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1265
1266            if (mStartup) {
1267                if (!mFirstPTSValid) {
1268                    mFirstTimeUs = timeUs;
1269                    mFirstPTSValid = true;
1270                }
1271                if (mStartTimeUsRelative) {
1272                    timeUs -= mFirstTimeUs;
1273                    if (timeUs < 0) {
1274                        timeUs = 0;
1275                    }
1276                }
1277
1278                if (timeUs < mStartTimeUs) {
1279                    // buffer up to the closest preceding IDR frame
1280                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1281                            timeUs, mStartTimeUs);
1282                    const char *mime;
1283                    sp<MetaData> format  = source->getFormat();
1284                    bool isAvc = false;
1285                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1286                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1287                        isAvc = true;
1288                    }
1289                    if (isAvc && IsIDR(accessUnit)) {
1290                        mVideoBuffer->clear();
1291                    }
1292                    if (isAvc) {
1293                        mVideoBuffer->queueAccessUnit(accessUnit);
1294                    }
1295
1296                    continue;
1297                }
1298            }
1299
1300            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1301            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1302                int32_t firstSeqNumberInPlaylist;
1303                if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1304                            "media-sequence", &firstSeqNumberInPlaylist)) {
1305                    firstSeqNumberInPlaylist = 0;
1306                }
1307
1308                int32_t targetDurationSecs;
1309                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1310                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1311                // mStartup
1312                //   mStartup is true until we have queued a packet for all the streams
1313                //   we are fetching. We queue packets whose timestamps are greater than
1314                //   mStartTimeUs.
1315                // mSegmentStartTimeUs >= 0
1316                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1317                // mSeqNumber > firstSeqNumberInPlaylist
1318                //   don't decrement mSeqNumber if it already points to the 1st segment
1319                // timeUs - mStartTimeUs > targetDurationUs:
1320                //   This and the 2 above conditions should only happen when adapting in a live
1321                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1322                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1323                //   the old fetcher would then continue fetching data until timeUs. We don't want
1324                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1325                //   stop as early as possible. The definition of being "too far ahead" is
1326                //   arbitrary; here we use targetDurationUs as threshold.
1327                if (mStartup && mSegmentStartTimeUs >= 0
1328                        && mSeqNumber > firstSeqNumberInPlaylist
1329                        && timeUs - mStartTimeUs > targetDurationUs) {
1330                    // we just guessed a starting timestamp that is too high when adapting in a
1331                    // live stream; re-adjust based on the actual timestamp extracted from the
1332                    // media segment; if we didn't move backward after the re-adjustment
1333                    // (newSeqNumber), start at least 1 segment prior.
1334                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1335                    if (newSeqNumber >= mSeqNumber) {
1336                        --mSeqNumber;
1337                    } else {
1338                        mSeqNumber = newSeqNumber;
1339                    }
1340                    mStartTimeUsNotify = mNotify->dup();
1341                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1342                    return -EAGAIN;
1343                }
1344
1345                int32_t seq;
1346                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1347                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1348                }
1349                int64_t startTimeUs;
1350                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1351                    mStartTimeUsNotify->setInt64(key, timeUs);
1352
1353                    uint32_t streamMask = 0;
1354                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1355                    streamMask |= mPacketSources.keyAt(i);
1356                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1357
1358                    if (streamMask == mStreamTypeMask) {
1359                        mStartup = false;
1360                        mStartTimeUsNotify->post();
1361                        mStartTimeUsNotify.clear();
1362                    }
1363                }
1364            }
1365
1366            if (mStopParams != NULL) {
1367                // Queue discontinuity in original stream.
1368                int32_t discontinuitySeq;
1369                int64_t stopTimeUs;
1370                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1371                        || discontinuitySeq > mDiscontinuitySeq
1372                        || !mStopParams->findInt64(key, &stopTimeUs)
1373                        || (discontinuitySeq == mDiscontinuitySeq
1374                                && timeUs >= stopTimeUs)) {
1375                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1376                    mStreamTypeMask &= ~stream;
1377                    mPacketSources.removeItemsAt(i);
1378                    break;
1379                }
1380            }
1381
1382            // Note that we do NOT dequeue any discontinuities except for format change.
1383            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1384                const bool discard = true;
1385                status_t status;
1386                while (mVideoBuffer->hasBufferAvailable(&status)) {
1387                    sp<ABuffer> videoBuffer;
1388                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1389                    setAccessUnitProperties(videoBuffer, source, discard);
1390                    packetSource->queueAccessUnit(videoBuffer);
1391                }
1392            }
1393
1394            setAccessUnitProperties(accessUnit, source);
1395            packetSource->queueAccessUnit(accessUnit);
1396        }
1397
1398        if (err != OK) {
1399            break;
1400        }
1401    }
1402
1403    if (err != OK) {
1404        for (size_t i = mPacketSources.size(); i-- > 0;) {
1405            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1406            packetSource->clear();
1407        }
1408        return err;
1409    }
1410
1411    if (!mStreamTypeMask) {
1412        // Signal gap is filled between original and new stream.
1413        ALOGV("ERROR OUT OF RANGE");
1414        return ERROR_OUT_OF_RANGE;
1415    }
1416
1417    return OK;
1418}
1419
1420/* static */
1421bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1422        const sp<ABuffer> &buffer) {
1423    size_t pos = 0;
1424
1425    // skip possible BOM
1426    if (buffer->size() >= pos + 3 &&
1427            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1428        pos += 3;
1429    }
1430
1431    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1432    if (buffer->size() < pos + 6 ||
1433            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1434        return false;
1435    }
1436    pos += 6;
1437
1438    if (buffer->size() == pos) {
1439        return true;
1440    }
1441
1442    uint8_t sep = buffer->data()[pos];
1443    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1444}
1445
1446status_t PlaylistFetcher::extractAndQueueAccessUnits(
1447        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1448    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1449        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1450            ALOGE("This stream only contains subtitles.");
1451            return ERROR_MALFORMED;
1452        }
1453
1454        const sp<AnotherPacketSource> packetSource =
1455            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1456
1457        int64_t durationUs;
1458        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1459        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1460        buffer->meta()->setInt64("durationUs", durationUs);
1461        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1462        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1463        buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1464
1465        packetSource->queueAccessUnit(buffer);
1466        return OK;
1467    }
1468
1469    if (mNextPTSTimeUs >= 0ll) {
1470        mFirstPTSValid = false;
1471        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1472        mNextPTSTimeUs = -1ll;
1473    }
1474
1475    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1476    // stream prefixed by an ID3 tag.
1477
1478    bool firstID3Tag = true;
1479    uint64_t PTS = 0;
1480
1481    for (;;) {
1482        // Make sure to skip all ID3 tags preceding the audio data.
1483        // At least one must be present to provide the PTS timestamp.
1484
1485        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1486        if (!id3.isValid()) {
1487            if (firstID3Tag) {
1488                ALOGE("Unable to parse ID3 tag.");
1489                return ERROR_MALFORMED;
1490            } else {
1491                break;
1492            }
1493        }
1494
1495        if (firstID3Tag) {
1496            bool found = false;
1497
1498            ID3::Iterator it(id3, "PRIV");
1499            while (!it.done()) {
1500                size_t length;
1501                const uint8_t *data = it.getData(&length);
1502
1503                static const char *kMatchName =
1504                    "com.apple.streaming.transportStreamTimestamp";
1505                static const size_t kMatchNameLen = strlen(kMatchName);
1506
1507                if (length == kMatchNameLen + 1 + 8
1508                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1509                    found = true;
1510                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1511                }
1512
1513                it.next();
1514            }
1515
1516            if (!found) {
1517                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1518                return ERROR_MALFORMED;
1519            }
1520        }
1521
1522        // skip the ID3 tag
1523        buffer->setRange(
1524                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1525
1526        firstID3Tag = false;
1527    }
1528
1529    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1530        ALOGW("This stream only contains audio data!");
1531
1532        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1533
1534        if (mStreamTypeMask == 0) {
1535            return OK;
1536        }
1537    }
1538
1539    sp<AnotherPacketSource> packetSource =
1540        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1541
1542    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1543        ABitReader bits(buffer->data(), buffer->size());
1544
1545        // adts_fixed_header
1546
1547        CHECK_EQ(bits.getBits(12), 0xfffu);
1548        bits.skipBits(3);  // ID, layer
1549        bool protection_absent __unused = bits.getBits(1) != 0;
1550
1551        unsigned profile = bits.getBits(2);
1552        CHECK_NE(profile, 3u);
1553        unsigned sampling_freq_index = bits.getBits(4);
1554        bits.getBits(1);  // private_bit
1555        unsigned channel_configuration = bits.getBits(3);
1556        CHECK_NE(channel_configuration, 0u);
1557        bits.skipBits(2);  // original_copy, home
1558
1559        sp<MetaData> meta = MakeAACCodecSpecificData(
1560                profile, sampling_freq_index, channel_configuration);
1561
1562        meta->setInt32(kKeyIsADTS, true);
1563
1564        packetSource->setFormat(meta);
1565    }
1566
1567    int64_t numSamples = 0ll;
1568    int32_t sampleRate;
1569    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1570
1571    int64_t timeUs = (PTS * 100ll) / 9ll;
1572    if (!mFirstPTSValid) {
1573        mFirstPTSValid = true;
1574        mFirstTimeUs = timeUs;
1575    }
1576
1577    size_t offset = 0;
1578    while (offset < buffer->size()) {
1579        const uint8_t *adtsHeader = buffer->data() + offset;
1580        CHECK_LT(offset + 5, buffer->size());
1581
1582        unsigned aac_frame_length =
1583            ((adtsHeader[3] & 3) << 11)
1584            | (adtsHeader[4] << 3)
1585            | (adtsHeader[5] >> 5);
1586
1587        if (aac_frame_length == 0) {
1588            const uint8_t *id3Header = adtsHeader;
1589            if (!memcmp(id3Header, "ID3", 3)) {
1590                ID3 id3(id3Header, buffer->size() - offset, true);
1591                if (id3.isValid()) {
1592                    offset += id3.rawSize();
1593                    continue;
1594                };
1595            }
1596            return ERROR_MALFORMED;
1597        }
1598
1599        CHECK_LE(offset + aac_frame_length, buffer->size());
1600
1601        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1602        offset += aac_frame_length;
1603
1604        // Each AAC frame encodes 1024 samples.
1605        numSamples += 1024;
1606
1607        if (mStartup) {
1608            int64_t startTimeUs = unitTimeUs;
1609            if (mStartTimeUsRelative) {
1610                startTimeUs -= mFirstTimeUs;
1611                if (startTimeUs  < 0) {
1612                    startTimeUs = 0;
1613                }
1614            }
1615            if (startTimeUs < mStartTimeUs) {
1616                continue;
1617            }
1618
1619            if (mStartTimeUsNotify != NULL) {
1620                int32_t targetDurationSecs;
1621                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1622                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1623
1624                // Duplicated logic from how we handle .ts playlists.
1625                if (mStartup && mSegmentStartTimeUs >= 0
1626                        && timeUs - mStartTimeUs > targetDurationUs) {
1627                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1628                    if (newSeqNumber >= mSeqNumber) {
1629                        --mSeqNumber;
1630                    } else {
1631                        mSeqNumber = newSeqNumber;
1632                    }
1633                    return -EAGAIN;
1634                }
1635
1636                mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
1637                mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1638                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
1639                mStartTimeUsNotify->post();
1640                mStartTimeUsNotify.clear();
1641                mStartup = false;
1642            }
1643        }
1644
1645        if (mStopParams != NULL) {
1646            // Queue discontinuity in original stream.
1647            int32_t discontinuitySeq;
1648            int64_t stopTimeUs;
1649            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1650                    || discontinuitySeq > mDiscontinuitySeq
1651                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
1652                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
1653                packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1654                mStreamTypeMask = 0;
1655                mPacketSources.clear();
1656                return ERROR_OUT_OF_RANGE;
1657            }
1658        }
1659
1660        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1661        memcpy(unit->data(), adtsHeader, aac_frame_length);
1662
1663        unit->meta()->setInt64("timeUs", unitTimeUs);
1664        setAccessUnitProperties(unit, packetSource);
1665        packetSource->queueAccessUnit(unit);
1666    }
1667
1668    return OK;
1669}
1670
1671void PlaylistFetcher::updateDuration() {
1672    int64_t durationUs = 0ll;
1673    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1674        sp<AMessage> itemMeta;
1675        CHECK(mPlaylist->itemAt(
1676                    index, NULL /* uri */, &itemMeta));
1677
1678        int64_t itemDurationUs;
1679        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1680
1681        durationUs += itemDurationUs;
1682    }
1683
1684    sp<AMessage> msg = mNotify->dup();
1685    msg->setInt32("what", kWhatDurationUpdate);
1686    msg->setInt64("durationUs", durationUs);
1687    msg->post();
1688}
1689
1690int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1691    int64_t durationUs;
1692    if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) {
1693        return kNumSkipFrames * durationUs;
1694    }
1695
1696    sp<RefBase> obj;
1697    msg->findObject("format", &obj);
1698    MetaData *format = static_cast<MetaData *>(obj.get());
1699
1700    const char *mime;
1701    CHECK(format->findCString(kKeyMIMEType, &mime));
1702    bool audio = !strncasecmp(mime, "audio/", 6);
1703    if (audio) {
1704        // Assumes 1000 samples per frame.
1705        int32_t sampleRate;
1706        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1707        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1708                * (1000000 / sampleRate) /* sample duration (us) */;
1709    } else {
1710        int32_t frameRate;
1711        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1712            return kNumSkipFrames * (1000000 / frameRate);
1713        }
1714    }
1715
1716    return 500000ll;
1717}
1718
1719}  // namespace android
1720