PlaylistFetcher.cpp revision 73d2847af14cdd5fdf8bd1ac80fb7ddf9ae7d9a7
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mSegmentStartTimeUs(-1ll),
66      mDiscontinuitySeq(-1ll),
67      mStartTimeUsRelative(false),
68      mLastPlaylistFetchTimeUs(-1ll),
69      mSeqNumber(-1),
70      mNumRetries(0),
71      mStartup(true),
72      mAdaptive(false),
73      mPrepared(false),
74      mNextPTSTimeUs(-1ll),
75      mMonitorQueueGeneration(0),
76      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
77      mFirstPTSValid(false),
78      mAbsoluteTimeAnchorUs(0ll),
79      mVideoBuffer(new AnotherPacketSource(NULL)) {
80    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
81    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
82    mStartTimeUsNotify->setInt32("streamMask", 0);
83}
84
85PlaylistFetcher::~PlaylistFetcher() {
86}
87
88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
89    CHECK(mPlaylist != NULL);
90
91    int32_t firstSeqNumberInPlaylist;
92    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
93                "media-sequence", &firstSeqNumberInPlaylist)) {
94        firstSeqNumberInPlaylist = 0;
95    }
96
97    int32_t lastSeqNumberInPlaylist =
98        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
99
100    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
101    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
102
103    int64_t segmentStartUs = 0ll;
104    for (int32_t index = 0;
105            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
106        sp<AMessage> itemMeta;
107        CHECK(mPlaylist->itemAt(
108                    index, NULL /* uri */, &itemMeta));
109
110        int64_t itemDurationUs;
111        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
112
113        segmentStartUs += itemDurationUs;
114    }
115
116    return segmentStartUs;
117}
118
119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
120    int64_t nowUs = ALooper::GetNowUs();
121
122    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
123        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
124        return 0ll;
125    }
126
127    if (mPlaylist->isComplete()) {
128        return (~0llu >> 1);
129    }
130
131    int32_t targetDurationSecs;
132    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
133
134    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
135
136    int64_t minPlaylistAgeUs;
137
138    switch (mRefreshState) {
139        case INITIAL_MINIMUM_RELOAD_DELAY:
140        {
141            size_t n = mPlaylist->size();
142            if (n > 0) {
143                sp<AMessage> itemMeta;
144                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
145
146                int64_t itemDurationUs;
147                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
148
149                minPlaylistAgeUs = itemDurationUs;
150                break;
151            }
152
153            // fall through
154        }
155
156        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
157        {
158            minPlaylistAgeUs = targetDurationUs / 2;
159            break;
160        }
161
162        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
163        {
164            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
165            break;
166        }
167
168        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
169        {
170            minPlaylistAgeUs = targetDurationUs * 3;
171            break;
172        }
173
174        default:
175            TRESPASS();
176            break;
177    }
178
179    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
180    return delayUs > 0ll ? delayUs : 0ll;
181}
182
183status_t PlaylistFetcher::decryptBuffer(
184        size_t playlistIndex, const sp<ABuffer> &buffer,
185        bool first) {
186    sp<AMessage> itemMeta;
187    bool found = false;
188    AString method;
189
190    for (ssize_t i = playlistIndex; i >= 0; --i) {
191        AString uri;
192        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
193
194        if (itemMeta->findString("cipher-method", &method)) {
195            found = true;
196            break;
197        }
198    }
199
200    if (!found) {
201        method = "NONE";
202    }
203    buffer->meta()->setString("cipher-method", method.c_str());
204
205    if (method == "NONE") {
206        return OK;
207    } else if (!(method == "AES-128")) {
208        ALOGE("Unsupported cipher method '%s'", method.c_str());
209        return ERROR_UNSUPPORTED;
210    }
211
212    AString keyURI;
213    if (!itemMeta->findString("cipher-uri", &keyURI)) {
214        ALOGE("Missing key uri");
215        return ERROR_MALFORMED;
216    }
217
218    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
219
220    sp<ABuffer> key;
221    if (index >= 0) {
222        key = mAESKeyForURI.valueAt(index);
223    } else {
224        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
225
226        if (err < 0) {
227            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
228            return ERROR_IO;
229        } else if (key->size() != 16) {
230            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
231            return ERROR_MALFORMED;
232        }
233
234        mAESKeyForURI.add(keyURI, key);
235    }
236
237    AES_KEY aes_key;
238    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
239        ALOGE("failed to set AES decryption key.");
240        return UNKNOWN_ERROR;
241    }
242
243    size_t n = buffer->size();
244    if (!n) {
245        return OK;
246    }
247    CHECK(n % 16 == 0);
248
249    if (first) {
250        // If decrypting the first block in a file, read the iv from the manifest
251        // or derive the iv from the file's sequence number.
252
253        AString iv;
254        if (itemMeta->findString("cipher-iv", &iv)) {
255            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
256                    || iv.size() != 16 * 2 + 2) {
257                ALOGE("malformed cipher IV '%s'.", iv.c_str());
258                return ERROR_MALFORMED;
259            }
260
261            memset(mAESInitVec, 0, sizeof(mAESInitVec));
262            for (size_t i = 0; i < 16; ++i) {
263                char c1 = tolower(iv.c_str()[2 + 2 * i]);
264                char c2 = tolower(iv.c_str()[3 + 2 * i]);
265                if (!isxdigit(c1) || !isxdigit(c2)) {
266                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
267                    return ERROR_MALFORMED;
268                }
269                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
270                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
271
272                mAESInitVec[i] = nibble1 << 4 | nibble2;
273            }
274        } else {
275            memset(mAESInitVec, 0, sizeof(mAESInitVec));
276            mAESInitVec[15] = mSeqNumber & 0xff;
277            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
278            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
279            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
280        }
281    }
282
283    AES_cbc_encrypt(
284            buffer->data(), buffer->data(), buffer->size(),
285            &aes_key, mAESInitVec, AES_DECRYPT);
286
287    return OK;
288}
289
290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
291    status_t err;
292    AString method;
293    CHECK(buffer->meta()->findString("cipher-method", &method));
294    if (method == "NONE") {
295        return OK;
296    }
297
298    uint8_t padding = 0;
299    if (buffer->size() > 0) {
300        padding = buffer->data()[buffer->size() - 1];
301    }
302
303    if (padding > 16) {
304        return ERROR_MALFORMED;
305    }
306
307    for (size_t i = buffer->size() - padding; i < padding; i++) {
308        if (buffer->data()[i] != padding) {
309            return ERROR_MALFORMED;
310        }
311    }
312
313    buffer->setRange(buffer->offset(), buffer->size() - padding);
314    return OK;
315}
316
317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
318    int64_t maxDelayUs = delayUsToRefreshPlaylist();
319    if (maxDelayUs < minDelayUs) {
320        maxDelayUs = minDelayUs;
321    }
322    if (delayUs > maxDelayUs) {
323        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
324        delayUs = maxDelayUs;
325    }
326    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
327    msg->setInt32("generation", mMonitorQueueGeneration);
328    msg->post(delayUs);
329}
330
331void PlaylistFetcher::cancelMonitorQueue() {
332    ++mMonitorQueueGeneration;
333}
334
335void PlaylistFetcher::startAsync(
336        const sp<AnotherPacketSource> &audioSource,
337        const sp<AnotherPacketSource> &videoSource,
338        const sp<AnotherPacketSource> &subtitleSource,
339        int64_t startTimeUs,
340        int64_t segmentStartTimeUs,
341        int32_t startDiscontinuitySeq,
342        bool adaptive) {
343    sp<AMessage> msg = new AMessage(kWhatStart, id());
344
345    uint32_t streamTypeMask = 0ul;
346
347    if (audioSource != NULL) {
348        msg->setPointer("audioSource", audioSource.get());
349        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
350    }
351
352    if (videoSource != NULL) {
353        msg->setPointer("videoSource", videoSource.get());
354        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
355    }
356
357    if (subtitleSource != NULL) {
358        msg->setPointer("subtitleSource", subtitleSource.get());
359        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
360    }
361
362    msg->setInt32("streamTypeMask", streamTypeMask);
363    msg->setInt64("startTimeUs", startTimeUs);
364    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
365    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
366    msg->setInt32("adaptive", adaptive);
367    msg->post();
368}
369
370void PlaylistFetcher::pauseAsync() {
371    (new AMessage(kWhatPause, id()))->post();
372}
373
374void PlaylistFetcher::stopAsync(bool clear) {
375    sp<AMessage> msg = new AMessage(kWhatStop, id());
376    msg->setInt32("clear", clear);
377    msg->post();
378}
379
380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
381    AMessage* msg = new AMessage(kWhatResumeUntil, id());
382    msg->setMessage("params", params);
383    msg->post();
384}
385
386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
387    switch (msg->what()) {
388        case kWhatStart:
389        {
390            status_t err = onStart(msg);
391
392            sp<AMessage> notify = mNotify->dup();
393            notify->setInt32("what", kWhatStarted);
394            notify->setInt32("err", err);
395            notify->post();
396            break;
397        }
398
399        case kWhatPause:
400        {
401            onPause();
402
403            sp<AMessage> notify = mNotify->dup();
404            notify->setInt32("what", kWhatPaused);
405            notify->post();
406            break;
407        }
408
409        case kWhatStop:
410        {
411            onStop(msg);
412
413            sp<AMessage> notify = mNotify->dup();
414            notify->setInt32("what", kWhatStopped);
415            notify->post();
416            break;
417        }
418
419        case kWhatMonitorQueue:
420        case kWhatDownloadNext:
421        {
422            int32_t generation;
423            CHECK(msg->findInt32("generation", &generation));
424
425            if (generation != mMonitorQueueGeneration) {
426                // Stale event
427                break;
428            }
429
430            if (msg->what() == kWhatMonitorQueue) {
431                onMonitorQueue();
432            } else {
433                onDownloadNext();
434            }
435            break;
436        }
437
438        case kWhatResumeUntil:
439        {
440            onResumeUntil(msg);
441            break;
442        }
443
444        default:
445            TRESPASS();
446    }
447}
448
449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
450    mPacketSources.clear();
451
452    uint32_t streamTypeMask;
453    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
454
455    int64_t startTimeUs;
456    int64_t segmentStartTimeUs;
457    int32_t startDiscontinuitySeq;
458    int32_t adaptive;
459    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
460    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
461    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
462    CHECK(msg->findInt32("adaptive", &adaptive));
463
464    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
465        void *ptr;
466        CHECK(msg->findPointer("audioSource", &ptr));
467
468        mPacketSources.add(
469                LiveSession::STREAMTYPE_AUDIO,
470                static_cast<AnotherPacketSource *>(ptr));
471    }
472
473    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
474        void *ptr;
475        CHECK(msg->findPointer("videoSource", &ptr));
476
477        mPacketSources.add(
478                LiveSession::STREAMTYPE_VIDEO,
479                static_cast<AnotherPacketSource *>(ptr));
480    }
481
482    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
483        void *ptr;
484        CHECK(msg->findPointer("subtitleSource", &ptr));
485
486        mPacketSources.add(
487                LiveSession::STREAMTYPE_SUBTITLES,
488                static_cast<AnotherPacketSource *>(ptr));
489    }
490
491    mStreamTypeMask = streamTypeMask;
492
493    mSegmentStartTimeUs = segmentStartTimeUs;
494    mDiscontinuitySeq = startDiscontinuitySeq;
495
496    if (startTimeUs >= 0) {
497        mStartTimeUs = startTimeUs;
498        mSeqNumber = -1;
499        mStartup = true;
500        mPrepared = false;
501        mAdaptive = adaptive;
502    }
503
504    postMonitorQueue();
505
506    return OK;
507}
508
509void PlaylistFetcher::onPause() {
510    cancelMonitorQueue();
511}
512
513void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
514    cancelMonitorQueue();
515
516    int32_t clear;
517    CHECK(msg->findInt32("clear", &clear));
518    if (clear) {
519        for (size_t i = 0; i < mPacketSources.size(); i++) {
520            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
521            packetSource->clear();
522        }
523    }
524
525    mPacketSources.clear();
526    mStreamTypeMask = 0;
527}
528
529// Resume until we have reached the boundary timestamps listed in `msg`; when
530// the remaining time is too short (within a resume threshold) stop immediately
531// instead.
532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
533    sp<AMessage> params;
534    CHECK(msg->findMessage("params", &params));
535
536    bool stop = false;
537    for (size_t i = 0; i < mPacketSources.size(); i++) {
538        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
539
540        const char *stopKey;
541        int streamType = mPacketSources.keyAt(i);
542        switch (streamType) {
543        case LiveSession::STREAMTYPE_VIDEO:
544            stopKey = "timeUsVideo";
545            break;
546
547        case LiveSession::STREAMTYPE_AUDIO:
548            stopKey = "timeUsAudio";
549            break;
550
551        case LiveSession::STREAMTYPE_SUBTITLES:
552            stopKey = "timeUsSubtitle";
553            break;
554
555        default:
556            TRESPASS();
557        }
558
559        // Don't resume if we would stop within a resume threshold.
560        int32_t discontinuitySeq;
561        int64_t latestTimeUs = 0, stopTimeUs = 0;
562        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
563        if (latestMeta != NULL
564                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
565                && discontinuitySeq == mDiscontinuitySeq
566                && latestMeta->findInt64("timeUs", &latestTimeUs)
567                && params->findInt64(stopKey, &stopTimeUs)
568                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
569            stop = true;
570        }
571    }
572
573    if (stop) {
574        for (size_t i = 0; i < mPacketSources.size(); i++) {
575            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
576        }
577        stopAsync(/* clear = */ false);
578        return OK;
579    }
580
581    mStopParams = params;
582    postMonitorQueue();
583
584    return OK;
585}
586
587void PlaylistFetcher::notifyError(status_t err) {
588    sp<AMessage> notify = mNotify->dup();
589    notify->setInt32("what", kWhatError);
590    notify->setInt32("err", err);
591    notify->post();
592}
593
594void PlaylistFetcher::queueDiscontinuity(
595        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
596    for (size_t i = 0; i < mPacketSources.size(); ++i) {
597        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
598        // (seek will discard buffer by abandoning old fetchers)
599        mPacketSources.valueAt(i)->queueDiscontinuity(
600                type, extra, false /* discard */);
601    }
602}
603
604void PlaylistFetcher::onMonitorQueue() {
605    bool downloadMore = false;
606    refreshPlaylist();
607
608    int32_t targetDurationSecs;
609    int64_t targetDurationUs = kMinBufferedDurationUs;
610    if (mPlaylist != NULL) {
611        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
612        targetDurationUs = targetDurationSecs * 1000000ll;
613    }
614
615    // buffer at least 3 times the target duration, or up to 10 seconds
616    int64_t durationToBufferUs = targetDurationUs * 3;
617    if (durationToBufferUs > kMinBufferedDurationUs)  {
618        durationToBufferUs = kMinBufferedDurationUs;
619    }
620
621    int64_t bufferedDurationUs = 0ll;
622    status_t finalResult = NOT_ENOUGH_DATA;
623    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
624        sp<AnotherPacketSource> packetSource =
625            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
626
627        bufferedDurationUs =
628                packetSource->getBufferedDurationUs(&finalResult);
629        finalResult = OK;
630    } else {
631        // Use max stream duration to prevent us from waiting on a non-existent stream;
632        // when we cannot make out from the manifest what streams are included in a playlist
633        // we might assume extra streams.
634        for (size_t i = 0; i < mPacketSources.size(); ++i) {
635            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
636                continue;
637            }
638
639            int64_t bufferedStreamDurationUs =
640                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
641            ALOGV("buffered %" PRId64 " for stream %d",
642                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
643            if (bufferedStreamDurationUs > bufferedDurationUs) {
644                bufferedDurationUs = bufferedStreamDurationUs;
645            }
646        }
647    }
648    downloadMore = (bufferedDurationUs < durationToBufferUs);
649
650    // signal start if buffered up at least the target size
651    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
652        mPrepared = true;
653
654        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
655                bufferedDurationUs, targetDurationUs);
656        sp<AMessage> msg = mNotify->dup();
657        msg->setInt32("what", kWhatTemporarilyDoneFetching);
658        msg->post();
659    }
660
661    if (finalResult == OK && downloadMore) {
662        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
663                bufferedDurationUs, durationToBufferUs);
664        // delay the next download slightly; hopefully this gives other concurrent fetchers
665        // a better chance to run.
666        // onDownloadNext();
667        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
668        msg->setInt32("generation", mMonitorQueueGeneration);
669        msg->post(1000l);
670    } else {
671        // Nothing to do yet, try again in a second.
672
673        sp<AMessage> msg = mNotify->dup();
674        msg->setInt32("what", kWhatTemporarilyDoneFetching);
675        msg->post();
676
677        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
678        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
679                delayUs, bufferedDurationUs, durationToBufferUs);
680        // :TRICKY: need to enforce minimum delay because the delay to
681        // refresh the playlist will become 0
682        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
683    }
684}
685
686status_t PlaylistFetcher::refreshPlaylist() {
687    if (delayUsToRefreshPlaylist() <= 0) {
688        bool unchanged;
689        sp<M3UParser> playlist = mSession->fetchPlaylist(
690                mURI.c_str(), mPlaylistHash, &unchanged);
691
692        if (playlist == NULL) {
693            if (unchanged) {
694                // We succeeded in fetching the playlist, but it was
695                // unchanged from the last time we tried.
696
697                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
698                    mRefreshState = (RefreshState)(mRefreshState + 1);
699                }
700            } else {
701                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
702                notifyError(ERROR_IO);
703                return ERROR_IO;
704            }
705        } else {
706            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
707            mPlaylist = playlist;
708
709            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
710                updateDuration();
711            }
712        }
713
714        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
715    }
716    return OK;
717}
718
719// static
720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
721    return buffer->size() > 0 && buffer->data()[0] == 0x47;
722}
723
724void PlaylistFetcher::onDownloadNext() {
725    if (refreshPlaylist() != OK) {
726        return;
727    }
728
729    int32_t firstSeqNumberInPlaylist;
730    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
731                "media-sequence", &firstSeqNumberInPlaylist)) {
732        firstSeqNumberInPlaylist = 0;
733    }
734
735    bool discontinuity = false;
736
737    const int32_t lastSeqNumberInPlaylist =
738        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
739
740    if (mDiscontinuitySeq < 0) {
741        mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
742    }
743
744    if (mSeqNumber < 0) {
745        CHECK_GE(mStartTimeUs, 0ll);
746
747        if (mSegmentStartTimeUs < 0) {
748            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
749                // If this is a live session, start 3 segments from the end on connect
750                mSeqNumber = lastSeqNumberInPlaylist - 3;
751                if (mSeqNumber < firstSeqNumberInPlaylist) {
752                    mSeqNumber = firstSeqNumberInPlaylist;
753                }
754            } else {
755                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
756                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
757            }
758            mStartTimeUsRelative = true;
759            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
760                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
761                    lastSeqNumberInPlaylist);
762        } else {
763            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
764            if (mAdaptive) {
765                // avoid double fetch/decode
766                mSeqNumber += 1;
767            }
768            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
769            if (mSeqNumber < minSeq) {
770                mSeqNumber = minSeq;
771            }
772
773            if (mSeqNumber < firstSeqNumberInPlaylist) {
774                mSeqNumber = firstSeqNumberInPlaylist;
775            }
776
777            if (mSeqNumber > lastSeqNumberInPlaylist) {
778                mSeqNumber = lastSeqNumberInPlaylist;
779            }
780            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
781                    mSeqNumber, firstSeqNumberInPlaylist,
782                    lastSeqNumberInPlaylist);
783        }
784    }
785
786    if (mSeqNumber < firstSeqNumberInPlaylist
787            || mSeqNumber > lastSeqNumberInPlaylist) {
788        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
789            ++mNumRetries;
790
791            if (mSeqNumber > lastSeqNumberInPlaylist) {
792                // refresh in increasing fraction (1/2, 1/3, ...) of the
793                // playlist's target duration or 3 seconds, whichever is less
794                int32_t targetDurationSecs;
795                CHECK(mPlaylist->meta()->findInt32(
796                        "target-duration", &targetDurationSecs));
797                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
798                        1000000ll / (1 + mNumRetries);
799                if (delayUs > kMaxMonitorDelayUs) {
800                    delayUs = kMaxMonitorDelayUs;
801                }
802                ALOGV("sequence number high: %d from (%d .. %d), "
803                      "monitor in %" PRId64 " (retry=%d)",
804                        mSeqNumber, firstSeqNumberInPlaylist,
805                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
806                postMonitorQueue(delayUs);
807                return;
808            }
809
810            // we've missed the boat, let's start from the lowest sequence
811            // number available and signal a discontinuity.
812
813            ALOGI("We've missed the boat, restarting playback."
814                  "  mStartup=%d, was  looking for %d in %d-%d",
815                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
816                    lastSeqNumberInPlaylist);
817            mSeqNumber = lastSeqNumberInPlaylist - 3;
818            if (mSeqNumber < firstSeqNumberInPlaylist) {
819                mSeqNumber = firstSeqNumberInPlaylist;
820            }
821            discontinuity = true;
822
823            // fall through
824        } else {
825            ALOGE("Cannot find sequence number %d in playlist "
826                 "(contains %d - %d)",
827                 mSeqNumber, firstSeqNumberInPlaylist,
828                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
829
830            notifyError(ERROR_END_OF_STREAM);
831            return;
832        }
833    }
834
835    mNumRetries = 0;
836
837    AString uri;
838    sp<AMessage> itemMeta;
839    CHECK(mPlaylist->itemAt(
840                mSeqNumber - firstSeqNumberInPlaylist,
841                &uri,
842                &itemMeta));
843
844    int32_t val;
845    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
846        mDiscontinuitySeq++;
847        discontinuity = true;
848    }
849
850    int64_t range_offset, range_length;
851    if (!itemMeta->findInt64("range-offset", &range_offset)
852            || !itemMeta->findInt64("range-length", &range_length)) {
853        range_offset = 0;
854        range_length = -1;
855    }
856
857    ALOGV("fetching segment %d from (%d .. %d)",
858          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
859
860    ALOGV("fetching '%s'", uri.c_str());
861
862    sp<DataSource> source;
863    sp<ABuffer> buffer, tsBuffer;
864    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
865    // this avoids interleaved connections to the key and segment file.
866    {
867        sp<ABuffer> junk = new ABuffer(16);
868        junk->setRange(0, 16);
869        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
870                true /* first */);
871        if (err != OK) {
872            notifyError(err);
873            return;
874        }
875    }
876
877    // block-wise download
878    bool startup = mStartup;
879    ssize_t bytesRead;
880    do {
881        bytesRead = mSession->fetchFile(
882                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
883
884        if (bytesRead < 0) {
885            status_t err = bytesRead;
886            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
887            notifyError(err);
888            return;
889        }
890
891        CHECK(buffer != NULL);
892
893        size_t size = buffer->size();
894        // Set decryption range.
895        buffer->setRange(size - bytesRead, bytesRead);
896        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
897                buffer->offset() == 0 /* first */);
898        // Unset decryption range.
899        buffer->setRange(0, size);
900
901        if (err != OK) {
902            ALOGE("decryptBuffer failed w/ error %d", err);
903
904            notifyError(err);
905            return;
906        }
907
908        if (startup || discontinuity) {
909            // Signal discontinuity.
910
911            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
912                // If this was a live event this made no sense since
913                // we don't have access to all the segment before the current
914                // one.
915                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
916            }
917
918            if (discontinuity) {
919                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
920
921                queueDiscontinuity(
922                        ATSParser::DISCONTINUITY_FORMATCHANGE,
923                        NULL /* extra */);
924
925                discontinuity = false;
926            }
927
928            startup = false;
929        }
930
931        err = OK;
932        if (bufferStartsWithTsSyncByte(buffer)) {
933            // Incremental extraction is only supported for MPEG2 transport streams.
934            if (tsBuffer == NULL) {
935                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
936                tsBuffer->setRange(0, 0);
937            } else if (tsBuffer->capacity() != buffer->capacity()) {
938                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
939                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
940                tsBuffer->setRange(tsOff, tsSize);
941            }
942            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
943
944            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
945        }
946
947        if (err == -EAGAIN) {
948            // starting sequence number too low/high
949            mTSParser.clear();
950            postMonitorQueue();
951            return;
952        } else if (err == ERROR_OUT_OF_RANGE) {
953            // reached stopping point
954            stopAsync(/* clear = */ false);
955            return;
956        } else if (err != OK) {
957            notifyError(err);
958            return;
959        }
960
961    } while (bytesRead != 0);
962
963    if (bufferStartsWithTsSyncByte(buffer)) {
964        // If we still don't see a stream after fetching a full ts segment mark it as
965        // nonexistent.
966        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
967        ATSParser::SourceType srcTypes[kNumTypes] =
968                { ATSParser::VIDEO, ATSParser::AUDIO };
969        LiveSession::StreamType streamTypes[kNumTypes] =
970                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
971
972        for (size_t i = 0; i < kNumTypes; i++) {
973            ATSParser::SourceType srcType = srcTypes[i];
974            LiveSession::StreamType streamType = streamTypes[i];
975
976            sp<AnotherPacketSource> source =
977                static_cast<AnotherPacketSource *>(
978                    mTSParser->getSource(srcType).get());
979
980            if (source == NULL) {
981                ALOGW("MPEG2 Transport stream does not contain %s data.",
982                      srcType == ATSParser::VIDEO ? "video" : "audio");
983
984                mStreamTypeMask &= ~streamType;
985                mPacketSources.removeItem(streamType);
986            }
987        }
988
989    }
990
991    if (checkDecryptPadding(buffer) != OK) {
992        ALOGE("Incorrect padding bytes after decryption.");
993        notifyError(ERROR_MALFORMED);
994        return;
995    }
996
997    status_t err = OK;
998    if (tsBuffer != NULL) {
999        AString method;
1000        CHECK(buffer->meta()->findString("cipher-method", &method));
1001        if ((tsBuffer->size() > 0 && method == "NONE")
1002                || tsBuffer->size() > 16) {
1003            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1004                    "bytes in length.");
1005            notifyError(ERROR_MALFORMED);
1006            return;
1007        }
1008    }
1009
1010    // bulk extract non-ts files
1011    if (tsBuffer == NULL) {
1012        err = extractAndQueueAccessUnits(buffer, itemMeta);
1013        if (err == -EAGAIN) {
1014            // starting sequence number too low/high
1015            postMonitorQueue();
1016            return;
1017        } else if (err == ERROR_OUT_OF_RANGE) {
1018            // reached stopping point
1019            stopAsync(/* clear = */false);
1020            return;
1021        }
1022    }
1023
1024    if (err != OK) {
1025        notifyError(err);
1026        return;
1027    }
1028
1029    ++mSeqNumber;
1030
1031    postMonitorQueue();
1032}
1033
1034int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1035    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1036    if (mPlaylist->meta() == NULL
1037            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1038        firstSeqNumberInPlaylist = 0;
1039    }
1040    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1041
1042    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1043    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1044        sp<AMessage> itemMeta;
1045        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1046
1047        int64_t itemDurationUs;
1048        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1049
1050        anchorTimeUs -= itemDurationUs;
1051        --index;
1052    }
1053
1054    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1055    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1056        return newSeqNumber;
1057    } else {
1058        return lastSeqNumberInPlaylist;
1059    }
1060}
1061
1062int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1063    int32_t firstSeqNumberInPlaylist;
1064    if (mPlaylist->meta() == NULL
1065            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1066        firstSeqNumberInPlaylist = 0;
1067    }
1068
1069    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1070    if (discontinuitySeq < curDiscontinuitySeq) {
1071        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1072    }
1073
1074    size_t index = 0;
1075    while (index < mPlaylist->size()) {
1076        sp<AMessage> itemMeta;
1077        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1078
1079        int64_t discontinuity;
1080        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1081            curDiscontinuitySeq++;
1082        }
1083
1084        if (curDiscontinuitySeq == discontinuitySeq) {
1085            return firstSeqNumberInPlaylist + index;
1086        }
1087
1088        ++index;
1089    }
1090
1091    return firstSeqNumberInPlaylist + mPlaylist->size();
1092}
1093
1094int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1095    int32_t firstSeqNumberInPlaylist;
1096    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1097                "media-sequence", &firstSeqNumberInPlaylist)) {
1098        firstSeqNumberInPlaylist = 0;
1099    }
1100
1101    size_t index = 0;
1102    int64_t segmentStartUs = 0;
1103    while (index < mPlaylist->size()) {
1104        sp<AMessage> itemMeta;
1105        CHECK(mPlaylist->itemAt(
1106                    index, NULL /* uri */, &itemMeta));
1107
1108        int64_t itemDurationUs;
1109        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1110
1111        if (timeUs < segmentStartUs + itemDurationUs) {
1112            break;
1113        }
1114
1115        segmentStartUs += itemDurationUs;
1116        ++index;
1117    }
1118
1119    if (index >= mPlaylist->size()) {
1120        index = mPlaylist->size() - 1;
1121    }
1122
1123    return firstSeqNumberInPlaylist + index;
1124}
1125
1126const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1127        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1128    sp<MetaData> format = source->getFormat();
1129    if (format != NULL) {
1130        // for simplicity, store a reference to the format in each unit
1131        accessUnit->meta()->setObject("format", format);
1132    }
1133
1134    if (discard) {
1135        accessUnit->meta()->setInt32("discard", discard);
1136    }
1137
1138    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1139    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1140    return accessUnit;
1141}
1142
1143status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1144    if (mTSParser == NULL) {
1145        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1146        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1147    }
1148
1149    if (mNextPTSTimeUs >= 0ll) {
1150        sp<AMessage> extra = new AMessage;
1151        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1152        // ATSParser from skewing the timestamps of access units.
1153        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1154
1155        mTSParser->signalDiscontinuity(
1156                ATSParser::DISCONTINUITY_SEEK, extra);
1157
1158        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1159        mNextPTSTimeUs = -1ll;
1160        mFirstPTSValid = false;
1161    }
1162
1163    size_t offset = 0;
1164    while (offset + 188 <= buffer->size()) {
1165        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1166
1167        if (err != OK) {
1168            return err;
1169        }
1170
1171        offset += 188;
1172    }
1173    // setRange to indicate consumed bytes.
1174    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1175
1176    status_t err = OK;
1177    for (size_t i = mPacketSources.size(); i-- > 0;) {
1178        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1179
1180        const char *key;
1181        ATSParser::SourceType type;
1182        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1183        switch (stream) {
1184            case LiveSession::STREAMTYPE_VIDEO:
1185                type = ATSParser::VIDEO;
1186                key = "timeUsVideo";
1187                break;
1188
1189            case LiveSession::STREAMTYPE_AUDIO:
1190                type = ATSParser::AUDIO;
1191                key = "timeUsAudio";
1192                break;
1193
1194            case LiveSession::STREAMTYPE_SUBTITLES:
1195            {
1196                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1197                return ERROR_MALFORMED;
1198                break;
1199            }
1200
1201            default:
1202                TRESPASS();
1203        }
1204
1205        sp<AnotherPacketSource> source =
1206            static_cast<AnotherPacketSource *>(
1207                    mTSParser->getSource(type).get());
1208
1209        if (source == NULL) {
1210            continue;
1211        }
1212
1213        int64_t timeUs;
1214        sp<ABuffer> accessUnit;
1215        status_t finalResult;
1216        while (source->hasBufferAvailable(&finalResult)
1217                && source->dequeueAccessUnit(&accessUnit) == OK) {
1218
1219            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1220
1221            if (mStartup) {
1222                if (!mFirstPTSValid) {
1223                    mFirstTimeUs = timeUs;
1224                    mFirstPTSValid = true;
1225                }
1226                if (mStartTimeUsRelative) {
1227                    timeUs -= mFirstTimeUs;
1228                    if (timeUs < 0) {
1229                        timeUs = 0;
1230                    }
1231                }
1232
1233                if (timeUs < mStartTimeUs) {
1234                    // buffer up to the closest preceding IDR frame
1235                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1236                            timeUs, mStartTimeUs);
1237                    const char *mime;
1238                    sp<MetaData> format  = source->getFormat();
1239                    bool isAvc = false;
1240                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1241                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1242                        isAvc = true;
1243                    }
1244                    if (isAvc && IsIDR(accessUnit)) {
1245                        mVideoBuffer->clear();
1246                    }
1247                    if (isAvc) {
1248                        mVideoBuffer->queueAccessUnit(accessUnit);
1249                    }
1250
1251                    continue;
1252                }
1253            }
1254
1255            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1256            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1257
1258                int32_t targetDurationSecs;
1259                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1260                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1261                // mStartup
1262                //   mStartup is true until we have queued a packet for all the streams
1263                //   we are fetching. We queue packets whose timestamps are greater than
1264                //   mStartTimeUs.
1265                // mSegmentStartTimeUs >= 0
1266                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1267                // timeUs - mStartTimeUs > targetDurationUs:
1268                //   This and the 2 above conditions should only happen when adapting in a live
1269                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1270                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1271                //   the old fetcher would then continue fetching data until timeUs. We don't want
1272                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1273                //   stop as early as possible. The definition of being "too far ahead" is
1274                //   arbitrary; here we use targetDurationUs as threshold.
1275                if (mStartup && mSegmentStartTimeUs >= 0
1276                        && timeUs - mStartTimeUs > targetDurationUs) {
1277                    // we just guessed a starting timestamp that is too high when adapting in a
1278                    // live stream; re-adjust based on the actual timestamp extracted from the
1279                    // media segment; if we didn't move backward after the re-adjustment
1280                    // (newSeqNumber), start at least 1 segment prior.
1281                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1282                    if (newSeqNumber >= mSeqNumber) {
1283                        --mSeqNumber;
1284                    } else {
1285                        mSeqNumber = newSeqNumber;
1286                    }
1287                    mStartTimeUsNotify = mNotify->dup();
1288                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1289                    return -EAGAIN;
1290                }
1291
1292                int32_t seq;
1293                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1294                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1295                }
1296                int64_t startTimeUs;
1297                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1298                    mStartTimeUsNotify->setInt64(key, timeUs);
1299
1300                    uint32_t streamMask = 0;
1301                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1302                    streamMask |= mPacketSources.keyAt(i);
1303                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1304
1305                    if (streamMask == mStreamTypeMask) {
1306                        mStartup = false;
1307                        mStartTimeUsNotify->post();
1308                        mStartTimeUsNotify.clear();
1309                    }
1310                }
1311            }
1312
1313            if (mStopParams != NULL) {
1314                // Queue discontinuity in original stream.
1315                int32_t discontinuitySeq;
1316                int64_t stopTimeUs;
1317                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1318                        || discontinuitySeq > mDiscontinuitySeq
1319                        || !mStopParams->findInt64(key, &stopTimeUs)
1320                        || (discontinuitySeq == mDiscontinuitySeq
1321                                && timeUs >= stopTimeUs)) {
1322                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1323                    mStreamTypeMask &= ~stream;
1324                    mPacketSources.removeItemsAt(i);
1325                    break;
1326                }
1327            }
1328
1329            // Note that we do NOT dequeue any discontinuities except for format change.
1330            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1331                const bool discard = true;
1332                status_t status;
1333                while (mVideoBuffer->hasBufferAvailable(&status)) {
1334                    sp<ABuffer> videoBuffer;
1335                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1336                    setAccessUnitProperties(videoBuffer, source, discard);
1337                    packetSource->queueAccessUnit(videoBuffer);
1338                }
1339            }
1340
1341            setAccessUnitProperties(accessUnit, source);
1342            packetSource->queueAccessUnit(accessUnit);
1343        }
1344
1345        if (err != OK) {
1346            break;
1347        }
1348    }
1349
1350    if (err != OK) {
1351        for (size_t i = mPacketSources.size(); i-- > 0;) {
1352            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1353            packetSource->clear();
1354        }
1355        return err;
1356    }
1357
1358    if (!mStreamTypeMask) {
1359        // Signal gap is filled between original and new stream.
1360        ALOGV("ERROR OUT OF RANGE");
1361        return ERROR_OUT_OF_RANGE;
1362    }
1363
1364    return OK;
1365}
1366
1367/* static */
1368bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1369        const sp<ABuffer> &buffer) {
1370    size_t pos = 0;
1371
1372    // skip possible BOM
1373    if (buffer->size() >= pos + 3 &&
1374            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1375        pos += 3;
1376    }
1377
1378    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1379    if (buffer->size() < pos + 6 ||
1380            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1381        return false;
1382    }
1383    pos += 6;
1384
1385    if (buffer->size() == pos) {
1386        return true;
1387    }
1388
1389    uint8_t sep = buffer->data()[pos];
1390    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1391}
1392
1393status_t PlaylistFetcher::extractAndQueueAccessUnits(
1394        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1395    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1396        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1397            ALOGE("This stream only contains subtitles.");
1398            return ERROR_MALFORMED;
1399        }
1400
1401        const sp<AnotherPacketSource> packetSource =
1402            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1403
1404        int64_t durationUs;
1405        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1406        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1407        buffer->meta()->setInt64("durationUs", durationUs);
1408        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1409        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1410
1411        packetSource->queueAccessUnit(buffer);
1412        return OK;
1413    }
1414
1415    if (mNextPTSTimeUs >= 0ll) {
1416        mFirstPTSValid = false;
1417        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1418        mNextPTSTimeUs = -1ll;
1419    }
1420
1421    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1422    // stream prefixed by an ID3 tag.
1423
1424    bool firstID3Tag = true;
1425    uint64_t PTS = 0;
1426
1427    for (;;) {
1428        // Make sure to skip all ID3 tags preceding the audio data.
1429        // At least one must be present to provide the PTS timestamp.
1430
1431        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1432        if (!id3.isValid()) {
1433            if (firstID3Tag) {
1434                ALOGE("Unable to parse ID3 tag.");
1435                return ERROR_MALFORMED;
1436            } else {
1437                break;
1438            }
1439        }
1440
1441        if (firstID3Tag) {
1442            bool found = false;
1443
1444            ID3::Iterator it(id3, "PRIV");
1445            while (!it.done()) {
1446                size_t length;
1447                const uint8_t *data = it.getData(&length);
1448
1449                static const char *kMatchName =
1450                    "com.apple.streaming.transportStreamTimestamp";
1451                static const size_t kMatchNameLen = strlen(kMatchName);
1452
1453                if (length == kMatchNameLen + 1 + 8
1454                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1455                    found = true;
1456                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1457                }
1458
1459                it.next();
1460            }
1461
1462            if (!found) {
1463                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1464                return ERROR_MALFORMED;
1465            }
1466        }
1467
1468        // skip the ID3 tag
1469        buffer->setRange(
1470                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1471
1472        firstID3Tag = false;
1473    }
1474
1475    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1476        ALOGW("This stream only contains audio data!");
1477
1478        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1479
1480        if (mStreamTypeMask == 0) {
1481            return OK;
1482        }
1483    }
1484
1485    sp<AnotherPacketSource> packetSource =
1486        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1487
1488    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1489        ABitReader bits(buffer->data(), buffer->size());
1490
1491        // adts_fixed_header
1492
1493        CHECK_EQ(bits.getBits(12), 0xfffu);
1494        bits.skipBits(3);  // ID, layer
1495        bool protection_absent = bits.getBits(1) != 0;
1496
1497        unsigned profile = bits.getBits(2);
1498        CHECK_NE(profile, 3u);
1499        unsigned sampling_freq_index = bits.getBits(4);
1500        bits.getBits(1);  // private_bit
1501        unsigned channel_configuration = bits.getBits(3);
1502        CHECK_NE(channel_configuration, 0u);
1503        bits.skipBits(2);  // original_copy, home
1504
1505        sp<MetaData> meta = MakeAACCodecSpecificData(
1506                profile, sampling_freq_index, channel_configuration);
1507
1508        meta->setInt32(kKeyIsADTS, true);
1509
1510        packetSource->setFormat(meta);
1511    }
1512
1513    int64_t numSamples = 0ll;
1514    int32_t sampleRate;
1515    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1516
1517    int64_t timeUs = (PTS * 100ll) / 9ll;
1518    if (!mFirstPTSValid) {
1519        mFirstPTSValid = true;
1520        mFirstTimeUs = timeUs;
1521    }
1522
1523    size_t offset = 0;
1524    while (offset < buffer->size()) {
1525        const uint8_t *adtsHeader = buffer->data() + offset;
1526        CHECK_LT(offset + 5, buffer->size());
1527
1528        unsigned aac_frame_length =
1529            ((adtsHeader[3] & 3) << 11)
1530            | (adtsHeader[4] << 3)
1531            | (adtsHeader[5] >> 5);
1532
1533        if (aac_frame_length == 0) {
1534            const uint8_t *id3Header = adtsHeader;
1535            if (!memcmp(id3Header, "ID3", 3)) {
1536                ID3 id3(id3Header, buffer->size() - offset, true);
1537                if (id3.isValid()) {
1538                    offset += id3.rawSize();
1539                    continue;
1540                };
1541            }
1542            return ERROR_MALFORMED;
1543        }
1544
1545        CHECK_LE(offset + aac_frame_length, buffer->size());
1546
1547        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1548        offset += aac_frame_length;
1549
1550        // Each AAC frame encodes 1024 samples.
1551        numSamples += 1024;
1552
1553        if (mStartup) {
1554            int64_t startTimeUs = unitTimeUs;
1555            if (mStartTimeUsRelative) {
1556                startTimeUs -= mFirstTimeUs;
1557                if (startTimeUs  < 0) {
1558                    startTimeUs = 0;
1559                }
1560            }
1561            if (startTimeUs < mStartTimeUs) {
1562                continue;
1563            }
1564
1565            if (mStartTimeUsNotify != NULL) {
1566                int32_t targetDurationSecs;
1567                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1568                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1569
1570                // Duplicated logic from how we handle .ts playlists.
1571                if (mStartup && mSegmentStartTimeUs >= 0
1572                        && timeUs - mStartTimeUs > targetDurationUs) {
1573                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1574                    if (newSeqNumber >= mSeqNumber) {
1575                        --mSeqNumber;
1576                    } else {
1577                        mSeqNumber = newSeqNumber;
1578                    }
1579                    return -EAGAIN;
1580                }
1581
1582                mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
1583                mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1584                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
1585                mStartTimeUsNotify->post();
1586                mStartTimeUsNotify.clear();
1587            }
1588        }
1589
1590        if (mStopParams != NULL) {
1591            // Queue discontinuity in original stream.
1592            int32_t discontinuitySeq;
1593            int64_t stopTimeUs;
1594            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1595                    || discontinuitySeq > mDiscontinuitySeq
1596                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
1597                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
1598                packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1599                mStreamTypeMask = 0;
1600                mPacketSources.clear();
1601                return ERROR_OUT_OF_RANGE;
1602            }
1603        }
1604
1605        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1606        memcpy(unit->data(), adtsHeader, aac_frame_length);
1607
1608        unit->meta()->setInt64("timeUs", unitTimeUs);
1609        setAccessUnitProperties(unit, packetSource);
1610        packetSource->queueAccessUnit(unit);
1611    }
1612
1613    return OK;
1614}
1615
1616void PlaylistFetcher::updateDuration() {
1617    int64_t durationUs = 0ll;
1618    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1619        sp<AMessage> itemMeta;
1620        CHECK(mPlaylist->itemAt(
1621                    index, NULL /* uri */, &itemMeta));
1622
1623        int64_t itemDurationUs;
1624        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1625
1626        durationUs += itemDurationUs;
1627    }
1628
1629    sp<AMessage> msg = mNotify->dup();
1630    msg->setInt32("what", kWhatDurationUpdate);
1631    msg->setInt64("durationUs", durationUs);
1632    msg->post();
1633}
1634
1635int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1636    int64_t durationUs, threshold;
1637    if (msg->findInt64("durationUs", &durationUs)) {
1638        return kNumSkipFrames * durationUs;
1639    }
1640
1641    sp<RefBase> obj;
1642    msg->findObject("format", &obj);
1643    MetaData *format = static_cast<MetaData *>(obj.get());
1644
1645    const char *mime;
1646    CHECK(format->findCString(kKeyMIMEType, &mime));
1647    bool audio = !strncasecmp(mime, "audio/", 6);
1648    if (audio) {
1649        // Assumes 1000 samples per frame.
1650        int32_t sampleRate;
1651        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1652        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1653                * (1000000 / sampleRate) /* sample duration (us) */;
1654    } else {
1655        int32_t frameRate;
1656        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1657            return kNumSkipFrames * (1000000 / frameRate);
1658        }
1659    }
1660
1661    return 500000ll;
1662}
1663
1664}  // namespace android
1665