PlaylistFetcher.cpp revision 7d8e3ccfbf326b5e190b416590e956c2fc3021f7
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 192;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mMinStartTimeUs(0ll),
66      mStopParams(NULL),
67      mLastPlaylistFetchTimeUs(-1ll),
68      mSeqNumber(-1),
69      mNumRetries(0),
70      mStartup(true),
71      mPrepared(false),
72      mNextPTSTimeUs(-1ll),
73      mMonitorQueueGeneration(0),
74      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
75      mFirstPTSValid(false),
76      mAbsoluteTimeAnchorUs(0ll) {
77    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
78    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
79    mStartTimeUsNotify->setInt32("streamMask", 0);
80}
81
82PlaylistFetcher::~PlaylistFetcher() {
83}
84
85int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
86    CHECK(mPlaylist != NULL);
87
88    int32_t firstSeqNumberInPlaylist;
89    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
90                "media-sequence", &firstSeqNumberInPlaylist)) {
91        firstSeqNumberInPlaylist = 0;
92    }
93
94    int32_t lastSeqNumberInPlaylist =
95        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
96
97    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
98    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
99
100    int64_t segmentStartUs = 0ll;
101    for (int32_t index = 0;
102            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
103        sp<AMessage> itemMeta;
104        CHECK(mPlaylist->itemAt(
105                    index, NULL /* uri */, &itemMeta));
106
107        int64_t itemDurationUs;
108        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
109
110        segmentStartUs += itemDurationUs;
111    }
112
113    return segmentStartUs;
114}
115
116int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
117    int64_t nowUs = ALooper::GetNowUs();
118
119    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
120        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
121        return 0ll;
122    }
123
124    if (mPlaylist->isComplete()) {
125        return (~0llu >> 1);
126    }
127
128    int32_t targetDurationSecs;
129    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
130
131    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
132
133    int64_t minPlaylistAgeUs;
134
135    switch (mRefreshState) {
136        case INITIAL_MINIMUM_RELOAD_DELAY:
137        {
138            size_t n = mPlaylist->size();
139            if (n > 0) {
140                sp<AMessage> itemMeta;
141                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
142
143                int64_t itemDurationUs;
144                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
145
146                minPlaylistAgeUs = itemDurationUs;
147                break;
148            }
149
150            // fall through
151        }
152
153        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
154        {
155            minPlaylistAgeUs = targetDurationUs / 2;
156            break;
157        }
158
159        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
160        {
161            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
162            break;
163        }
164
165        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
166        {
167            minPlaylistAgeUs = targetDurationUs * 3;
168            break;
169        }
170
171        default:
172            TRESPASS();
173            break;
174    }
175
176    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
177    return delayUs > 0ll ? delayUs : 0ll;
178}
179
180status_t PlaylistFetcher::decryptBuffer(
181        size_t playlistIndex, const sp<ABuffer> &buffer,
182        bool first) {
183    sp<AMessage> itemMeta;
184    bool found = false;
185    AString method;
186
187    for (ssize_t i = playlistIndex; i >= 0; --i) {
188        AString uri;
189        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
190
191        if (itemMeta->findString("cipher-method", &method)) {
192            found = true;
193            break;
194        }
195    }
196
197    if (!found) {
198        method = "NONE";
199    }
200    buffer->meta()->setString("cipher-method", method.c_str());
201
202    if (method == "NONE") {
203        return OK;
204    } else if (!(method == "AES-128")) {
205        ALOGE("Unsupported cipher method '%s'", method.c_str());
206        return ERROR_UNSUPPORTED;
207    }
208
209    AString keyURI;
210    if (!itemMeta->findString("cipher-uri", &keyURI)) {
211        ALOGE("Missing key uri");
212        return ERROR_MALFORMED;
213    }
214
215    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
216
217    sp<ABuffer> key;
218    if (index >= 0) {
219        key = mAESKeyForURI.valueAt(index);
220    } else {
221        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
222
223        if (err < 0) {
224            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
225            return ERROR_IO;
226        } else if (key->size() != 16) {
227            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
228            return ERROR_MALFORMED;
229        }
230
231        mAESKeyForURI.add(keyURI, key);
232    }
233
234    AES_KEY aes_key;
235    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
236        ALOGE("failed to set AES decryption key.");
237        return UNKNOWN_ERROR;
238    }
239
240    size_t n = buffer->size();
241    if (!n) {
242        return OK;
243    }
244    CHECK(n % 16 == 0);
245
246    if (first) {
247        // If decrypting the first block in a file, read the iv from the manifest
248        // or derive the iv from the file's sequence number.
249
250        AString iv;
251        if (itemMeta->findString("cipher-iv", &iv)) {
252            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
253                    || iv.size() != 16 * 2 + 2) {
254                ALOGE("malformed cipher IV '%s'.", iv.c_str());
255                return ERROR_MALFORMED;
256            }
257
258            memset(mAESInitVec, 0, sizeof(mAESInitVec));
259            for (size_t i = 0; i < 16; ++i) {
260                char c1 = tolower(iv.c_str()[2 + 2 * i]);
261                char c2 = tolower(iv.c_str()[3 + 2 * i]);
262                if (!isxdigit(c1) || !isxdigit(c2)) {
263                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
264                    return ERROR_MALFORMED;
265                }
266                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
267                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
268
269                mAESInitVec[i] = nibble1 << 4 | nibble2;
270            }
271        } else {
272            memset(mAESInitVec, 0, sizeof(mAESInitVec));
273            mAESInitVec[15] = mSeqNumber & 0xff;
274            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
275            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
276            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
277        }
278    }
279
280    AES_cbc_encrypt(
281            buffer->data(), buffer->data(), buffer->size(),
282            &aes_key, mAESInitVec, AES_DECRYPT);
283
284    return OK;
285}
286
287status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
288    status_t err;
289    AString method;
290    CHECK(buffer->meta()->findString("cipher-method", &method));
291    if (method == "NONE") {
292        return OK;
293    }
294
295    uint8_t padding = 0;
296    if (buffer->size() > 0) {
297        padding = buffer->data()[buffer->size() - 1];
298    }
299
300    if (padding > 16) {
301        return ERROR_MALFORMED;
302    }
303
304    for (size_t i = buffer->size() - padding; i < padding; i++) {
305        if (buffer->data()[i] != padding) {
306            return ERROR_MALFORMED;
307        }
308    }
309
310    buffer->setRange(buffer->offset(), buffer->size() - padding);
311    return OK;
312}
313
314void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
315    int64_t maxDelayUs = delayUsToRefreshPlaylist();
316    if (maxDelayUs < minDelayUs) {
317        maxDelayUs = minDelayUs;
318    }
319    if (delayUs > maxDelayUs) {
320        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
321        delayUs = maxDelayUs;
322    }
323    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
324    msg->setInt32("generation", mMonitorQueueGeneration);
325    msg->post(delayUs);
326}
327
328void PlaylistFetcher::cancelMonitorQueue() {
329    ++mMonitorQueueGeneration;
330}
331
332void PlaylistFetcher::startAsync(
333        const sp<AnotherPacketSource> &audioSource,
334        const sp<AnotherPacketSource> &videoSource,
335        const sp<AnotherPacketSource> &subtitleSource,
336        int64_t startTimeUs,
337        int64_t minStartTimeUs,
338        int32_t startSeqNumberHint) {
339    sp<AMessage> msg = new AMessage(kWhatStart, id());
340
341    uint32_t streamTypeMask = 0ul;
342
343    if (audioSource != NULL) {
344        msg->setPointer("audioSource", audioSource.get());
345        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
346    }
347
348    if (videoSource != NULL) {
349        msg->setPointer("videoSource", videoSource.get());
350        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
351    }
352
353    if (subtitleSource != NULL) {
354        msg->setPointer("subtitleSource", subtitleSource.get());
355        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
356    }
357
358    msg->setInt32("streamTypeMask", streamTypeMask);
359    msg->setInt64("startTimeUs", startTimeUs);
360    msg->setInt64("minStartTimeUs", minStartTimeUs);
361    msg->setInt32("startSeqNumberHint", startSeqNumberHint);
362    msg->post();
363}
364
365void PlaylistFetcher::pauseAsync() {
366    (new AMessage(kWhatPause, id()))->post();
367}
368
369void PlaylistFetcher::stopAsync(bool selfTriggered) {
370    sp<AMessage> msg = new AMessage(kWhatStop, id());
371    msg->setInt32("selfTriggered", selfTriggered);
372    msg->post();
373}
374
375void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
376    AMessage* msg = new AMessage(kWhatResumeUntil, id());
377    msg->setMessage("params", params);
378    msg->post();
379}
380
381void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
382    switch (msg->what()) {
383        case kWhatStart:
384        {
385            status_t err = onStart(msg);
386
387            sp<AMessage> notify = mNotify->dup();
388            notify->setInt32("what", kWhatStarted);
389            notify->setInt32("err", err);
390            notify->post();
391            break;
392        }
393
394        case kWhatPause:
395        {
396            onPause();
397
398            sp<AMessage> notify = mNotify->dup();
399            notify->setInt32("what", kWhatPaused);
400            notify->post();
401            break;
402        }
403
404        case kWhatStop:
405        {
406            onStop(msg);
407
408            sp<AMessage> notify = mNotify->dup();
409            notify->setInt32("what", kWhatStopped);
410            notify->post();
411            break;
412        }
413
414        case kWhatMonitorQueue:
415        case kWhatDownloadNext:
416        {
417            int32_t generation;
418            CHECK(msg->findInt32("generation", &generation));
419
420            if (generation != mMonitorQueueGeneration) {
421                // Stale event
422                break;
423            }
424
425            if (msg->what() == kWhatMonitorQueue) {
426                onMonitorQueue();
427            } else {
428                onDownloadNext();
429            }
430            break;
431        }
432
433        case kWhatResumeUntil:
434        {
435            onResumeUntil(msg);
436            break;
437        }
438
439        default:
440            TRESPASS();
441    }
442}
443
444status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
445    mPacketSources.clear();
446
447    uint32_t streamTypeMask;
448    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
449
450    int64_t startTimeUs;
451    int32_t startSeqNumberHint;
452    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
453    CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
454    CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
455
456    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
457        void *ptr;
458        CHECK(msg->findPointer("audioSource", &ptr));
459
460        mPacketSources.add(
461                LiveSession::STREAMTYPE_AUDIO,
462                static_cast<AnotherPacketSource *>(ptr));
463    }
464
465    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
466        void *ptr;
467        CHECK(msg->findPointer("videoSource", &ptr));
468
469        mPacketSources.add(
470                LiveSession::STREAMTYPE_VIDEO,
471                static_cast<AnotherPacketSource *>(ptr));
472    }
473
474    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
475        void *ptr;
476        CHECK(msg->findPointer("subtitleSource", &ptr));
477
478        mPacketSources.add(
479                LiveSession::STREAMTYPE_SUBTITLES,
480                static_cast<AnotherPacketSource *>(ptr));
481    }
482
483    mStreamTypeMask = streamTypeMask;
484    mStartTimeUs = startTimeUs;
485
486    if (mStartTimeUs >= 0ll) {
487        mSeqNumber = -1;
488        mStartup = true;
489        mPrepared = false;
490    }
491
492    if (startSeqNumberHint >= 0) {
493        mSeqNumber = startSeqNumberHint;
494    }
495
496    postMonitorQueue();
497
498    return OK;
499}
500
501void PlaylistFetcher::onPause() {
502    cancelMonitorQueue();
503}
504
505void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
506    cancelMonitorQueue();
507
508    int32_t selfTriggered;
509    CHECK(msg->findInt32("selfTriggered", &selfTriggered));
510    if (!selfTriggered) {
511        // Self triggered stops only happen during switching, in which case we do not want
512        // to clear the discontinuities queued at the end of packet sources.
513        for (size_t i = 0; i < mPacketSources.size(); i++) {
514            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
515            packetSource->clear();
516        }
517    }
518
519    mPacketSources.clear();
520    mStreamTypeMask = 0;
521}
522
523// Resume until we have reached the boundary timestamps listed in `msg`; when
524// the remaining time is too short (within a resume threshold) stop immediately
525// instead.
526status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
527    sp<AMessage> params;
528    CHECK(msg->findMessage("params", &params));
529
530    bool stop = false;
531    for (size_t i = 0; i < mPacketSources.size(); i++) {
532        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
533
534        const char *stopKey;
535        int streamType = mPacketSources.keyAt(i);
536        switch (streamType) {
537        case LiveSession::STREAMTYPE_VIDEO:
538            stopKey = "timeUsVideo";
539            break;
540
541        case LiveSession::STREAMTYPE_AUDIO:
542            stopKey = "timeUsAudio";
543            break;
544
545        case LiveSession::STREAMTYPE_SUBTITLES:
546            stopKey = "timeUsSubtitle";
547            break;
548
549        default:
550            TRESPASS();
551        }
552
553        // Don't resume if we would stop within a resume threshold.
554        int64_t latestTimeUs = 0, stopTimeUs = 0;
555        sp<AMessage> latestMeta = packetSource->getLatestMeta();
556        if (latestMeta != NULL
557                && (latestMeta->findInt64("timeUs", &latestTimeUs)
558                && params->findInt64(stopKey, &stopTimeUs))) {
559            int64_t diffUs = stopTimeUs - latestTimeUs;
560            if (diffUs < resumeThreshold(latestMeta)) {
561                stop = true;
562            }
563        }
564    }
565
566    if (stop) {
567        for (size_t i = 0; i < mPacketSources.size(); i++) {
568            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
569        }
570        stopAsync(/* selfTriggered = */ true);
571        return OK;
572    }
573
574    mStopParams = params;
575    postMonitorQueue();
576
577    return OK;
578}
579
580void PlaylistFetcher::notifyError(status_t err) {
581    sp<AMessage> notify = mNotify->dup();
582    notify->setInt32("what", kWhatError);
583    notify->setInt32("err", err);
584    notify->post();
585}
586
587void PlaylistFetcher::queueDiscontinuity(
588        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
589    for (size_t i = 0; i < mPacketSources.size(); ++i) {
590        mPacketSources.valueAt(i)->queueDiscontinuity(type, extra);
591    }
592}
593
594void PlaylistFetcher::onMonitorQueue() {
595    bool downloadMore = false;
596    refreshPlaylist();
597
598    int32_t targetDurationSecs;
599    int64_t targetDurationUs = kMinBufferedDurationUs;
600    if (mPlaylist != NULL) {
601        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
602        targetDurationUs = targetDurationSecs * 1000000ll;
603    }
604
605    // buffer at least 3 times the target duration, or up to 10 seconds
606    int64_t durationToBufferUs = targetDurationUs * 3;
607    if (durationToBufferUs > kMinBufferedDurationUs)  {
608        durationToBufferUs = kMinBufferedDurationUs;
609    }
610
611    int64_t bufferedDurationUs = 0ll;
612    status_t finalResult = NOT_ENOUGH_DATA;
613    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
614        sp<AnotherPacketSource> packetSource =
615            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
616
617        bufferedDurationUs =
618                packetSource->getBufferedDurationUs(&finalResult);
619        finalResult = OK;
620    } else {
621        // Use max stream duration to prevent us from waiting on a non-existent stream;
622        // when we cannot make out from the manifest what streams are included in a playlist
623        // we might assume extra streams.
624        for (size_t i = 0; i < mPacketSources.size(); ++i) {
625            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
626                continue;
627            }
628
629            int64_t bufferedStreamDurationUs =
630                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
631            ALOGV("buffered %" PRId64 " for stream %d",
632                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
633            if (bufferedStreamDurationUs > bufferedDurationUs) {
634                bufferedDurationUs = bufferedStreamDurationUs;
635            }
636        }
637    }
638    downloadMore = (bufferedDurationUs < durationToBufferUs);
639
640    // signal start if buffered up at least the target size
641    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
642        mPrepared = true;
643
644        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
645                bufferedDurationUs, targetDurationUs);
646        sp<AMessage> msg = mNotify->dup();
647        msg->setInt32("what", kWhatTemporarilyDoneFetching);
648        msg->post();
649    }
650
651    if (finalResult == OK && downloadMore) {
652        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
653                bufferedDurationUs, durationToBufferUs);
654        // delay the next download slightly; hopefully this gives other concurrent fetchers
655        // a better chance to run.
656        // onDownloadNext();
657        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
658        msg->setInt32("generation", mMonitorQueueGeneration);
659        msg->post(1000l);
660    } else {
661        // Nothing to do yet, try again in a second.
662
663        sp<AMessage> msg = mNotify->dup();
664        msg->setInt32("what", kWhatTemporarilyDoneFetching);
665        msg->post();
666
667        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
668        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
669                delayUs, bufferedDurationUs, durationToBufferUs);
670        // :TRICKY: need to enforce minimum delay because the delay to
671        // refresh the playlist will become 0
672        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
673    }
674}
675
676status_t PlaylistFetcher::refreshPlaylist() {
677    if (delayUsToRefreshPlaylist() <= 0) {
678        bool unchanged;
679        sp<M3UParser> playlist = mSession->fetchPlaylist(
680                mURI.c_str(), mPlaylistHash, &unchanged);
681
682        if (playlist == NULL) {
683            if (unchanged) {
684                // We succeeded in fetching the playlist, but it was
685                // unchanged from the last time we tried.
686
687                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
688                    mRefreshState = (RefreshState)(mRefreshState + 1);
689                }
690            } else {
691                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
692                notifyError(ERROR_IO);
693                return ERROR_IO;
694            }
695        } else {
696            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
697            mPlaylist = playlist;
698
699            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
700                updateDuration();
701            }
702        }
703
704        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
705    }
706    return OK;
707}
708
709// static
710bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
711    return buffer->size() > 0 && buffer->data()[0] == 0x47;
712}
713
714void PlaylistFetcher::onDownloadNext() {
715    if (refreshPlaylist() != OK) {
716        return;
717    }
718
719    int32_t firstSeqNumberInPlaylist;
720    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
721                "media-sequence", &firstSeqNumberInPlaylist)) {
722        firstSeqNumberInPlaylist = 0;
723    }
724
725    bool seekDiscontinuity = false;
726    bool explicitDiscontinuity = false;
727
728    const int32_t lastSeqNumberInPlaylist =
729        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
730
731    if (mStartup && mSeqNumber >= 0
732            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
733        // in case we guessed wrong during reconfiguration, try fetching the latest content.
734        mSeqNumber = lastSeqNumberInPlaylist;
735    }
736
737    if (mSeqNumber < 0) {
738        CHECK_GE(mStartTimeUs, 0ll);
739
740        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
741            mSeqNumber = getSeqNumberForTime(mStartTimeUs);
742            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
743                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
744                    lastSeqNumberInPlaylist);
745        } else {
746            // If this is a live session, start 3 segments from the end.
747            mSeqNumber = lastSeqNumberInPlaylist - 3;
748            if (mSeqNumber < firstSeqNumberInPlaylist) {
749                mSeqNumber = firstSeqNumberInPlaylist;
750            }
751            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
752                    mSeqNumber, firstSeqNumberInPlaylist,
753                    lastSeqNumberInPlaylist);
754        }
755
756        mStartTimeUs = -1ll;
757    }
758
759    if (mSeqNumber < firstSeqNumberInPlaylist
760            || mSeqNumber > lastSeqNumberInPlaylist) {
761        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
762            ++mNumRetries;
763
764            if (mSeqNumber > lastSeqNumberInPlaylist) {
765                // refresh in increasing fraction (1/2, 1/3, ...) of the
766                // playlist's target duration or 3 seconds, whichever is less
767                int32_t targetDurationSecs;
768                CHECK(mPlaylist->meta()->findInt32(
769                        "target-duration", &targetDurationSecs));
770                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
771                        1000000ll / (1 + mNumRetries);
772                if (delayUs > kMaxMonitorDelayUs) {
773                    delayUs = kMaxMonitorDelayUs;
774                }
775                ALOGV("sequence number high: %d from (%d .. %d), "
776                      "monitor in %" PRId64 " (retry=%d)",
777                        mSeqNumber, firstSeqNumberInPlaylist,
778                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
779                postMonitorQueue(delayUs);
780                return;
781            }
782
783            // we've missed the boat, let's start from the lowest sequence
784            // number available and signal a discontinuity.
785
786            ALOGI("We've missed the boat, restarting playback."
787                  "  mStartup=%d, was  looking for %d in %d-%d",
788                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
789                    lastSeqNumberInPlaylist);
790            mSeqNumber = lastSeqNumberInPlaylist - 3;
791            if (mSeqNumber < firstSeqNumberInPlaylist) {
792                mSeqNumber = firstSeqNumberInPlaylist;
793            }
794            explicitDiscontinuity = true;
795
796            // fall through
797        } else {
798            ALOGE("Cannot find sequence number %d in playlist "
799                 "(contains %d - %d)",
800                 mSeqNumber, firstSeqNumberInPlaylist,
801                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
802
803            notifyError(ERROR_END_OF_STREAM);
804            return;
805        }
806    }
807
808    mNumRetries = 0;
809
810    AString uri;
811    sp<AMessage> itemMeta;
812    CHECK(mPlaylist->itemAt(
813                mSeqNumber - firstSeqNumberInPlaylist,
814                &uri,
815                &itemMeta));
816
817    int32_t val;
818    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
819        explicitDiscontinuity = true;
820    }
821
822    int64_t range_offset, range_length;
823    if (!itemMeta->findInt64("range-offset", &range_offset)
824            || !itemMeta->findInt64("range-length", &range_length)) {
825        range_offset = 0;
826        range_length = -1;
827    }
828
829    ALOGV("fetching segment %d from (%d .. %d)",
830          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
831
832    ALOGV("fetching '%s'", uri.c_str());
833
834    sp<DataSource> source;
835    sp<ABuffer> buffer, tsBuffer;
836    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
837    // this avoids interleaved connections to the key and segment file.
838    {
839        sp<ABuffer> junk = new ABuffer(16);
840        junk->setRange(0, 16);
841        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
842                true /* first */);
843        if (err != OK) {
844            notifyError(err);
845            return;
846        }
847    }
848
849    // block-wise download
850    ssize_t bytesRead;
851    do {
852        bytesRead = mSession->fetchFile(
853                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
854
855        if (bytesRead < 0) {
856            status_t err = bytesRead;
857            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
858            notifyError(err);
859            return;
860        }
861
862        CHECK(buffer != NULL);
863
864        size_t size = buffer->size();
865        // Set decryption range.
866        buffer->setRange(size - bytesRead, bytesRead);
867        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
868                buffer->offset() == 0 /* first */);
869        // Unset decryption range.
870        buffer->setRange(0, size);
871
872        if (err != OK) {
873            ALOGE("decryptBuffer failed w/ error %d", err);
874
875            notifyError(err);
876            return;
877        }
878
879        if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
880            // Signal discontinuity.
881
882            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
883                // If this was a live event this made no sense since
884                // we don't have access to all the segment before the current
885                // one.
886                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
887            }
888
889            if (seekDiscontinuity || explicitDiscontinuity) {
890                ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
891                     seekDiscontinuity, explicitDiscontinuity);
892
893                queueDiscontinuity(
894                        explicitDiscontinuity
895                            ? ATSParser::DISCONTINUITY_FORMATCHANGE
896                            : ATSParser::DISCONTINUITY_SEEK,
897                        NULL /* extra */);
898            }
899        }
900
901        err = OK;
902        if (bufferStartsWithTsSyncByte(buffer)) {
903            // Incremental extraction is only supported for MPEG2 transport streams.
904            if (tsBuffer == NULL) {
905                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
906                tsBuffer->setRange(0, 0);
907            } else if (tsBuffer->capacity() != buffer->capacity()) {
908                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
909                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
910                tsBuffer->setRange(tsOff, tsSize);
911            }
912            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
913
914            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
915        }
916
917        if (err == -EAGAIN) {
918            // bad starting sequence number hint
919            mTSParser.clear();
920            postMonitorQueue();
921            return;
922        }
923
924        if (err == ERROR_OUT_OF_RANGE) {
925            // reached stopping point
926            stopAsync(/* selfTriggered = */ true);
927            return;
928        }
929
930        if (err != OK) {
931            notifyError(err);
932            return;
933        }
934
935        mStartup = false;
936    } while (bytesRead != 0);
937
938    if (bufferStartsWithTsSyncByte(buffer)) {
939        // If we still don't see a stream after fetching a full ts segment mark it as
940        // nonexistent.
941        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
942        ATSParser::SourceType srcTypes[kNumTypes] =
943                { ATSParser::VIDEO, ATSParser::AUDIO };
944        LiveSession::StreamType streamTypes[kNumTypes] =
945                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
946
947        for (size_t i = 0; i < kNumTypes; i++) {
948            ATSParser::SourceType srcType = srcTypes[i];
949            LiveSession::StreamType streamType = streamTypes[i];
950
951            sp<AnotherPacketSource> source =
952                static_cast<AnotherPacketSource *>(
953                    mTSParser->getSource(srcType).get());
954
955            if (source == NULL) {
956                ALOGW("MPEG2 Transport stream does not contain %s data.",
957                      srcType == ATSParser::VIDEO ? "video" : "audio");
958
959                mStreamTypeMask &= ~streamType;
960                mPacketSources.removeItem(streamType);
961            }
962        }
963
964    }
965
966    if (checkDecryptPadding(buffer) != OK) {
967        ALOGE("Incorrect padding bytes after decryption.");
968        notifyError(ERROR_MALFORMED);
969        return;
970    }
971
972    status_t err = OK;
973    if (tsBuffer != NULL) {
974        AString method;
975        CHECK(buffer->meta()->findString("cipher-method", &method));
976        if ((tsBuffer->size() > 0 && method == "NONE")
977                || tsBuffer->size() > 16) {
978            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
979                    "bytes in length.");
980            notifyError(ERROR_MALFORMED);
981            return;
982        }
983    }
984
985    // bulk extract non-ts files
986    if (tsBuffer == NULL) {
987      err = extractAndQueueAccessUnits(buffer, itemMeta);
988    }
989
990    if (err != OK) {
991        notifyError(err);
992        return;
993    }
994
995    ++mSeqNumber;
996
997    postMonitorQueue();
998}
999
1000int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1001    int32_t firstSeqNumberInPlaylist;
1002    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1003                "media-sequence", &firstSeqNumberInPlaylist)) {
1004        firstSeqNumberInPlaylist = 0;
1005    }
1006
1007    size_t index = 0;
1008    int64_t segmentStartUs = 0;
1009    while (index < mPlaylist->size()) {
1010        sp<AMessage> itemMeta;
1011        CHECK(mPlaylist->itemAt(
1012                    index, NULL /* uri */, &itemMeta));
1013
1014        int64_t itemDurationUs;
1015        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1016
1017        if (timeUs < segmentStartUs + itemDurationUs) {
1018            break;
1019        }
1020
1021        segmentStartUs += itemDurationUs;
1022        ++index;
1023    }
1024
1025    if (index >= mPlaylist->size()) {
1026        index = mPlaylist->size() - 1;
1027    }
1028
1029    return firstSeqNumberInPlaylist + index;
1030}
1031
1032status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1033    if (mTSParser == NULL) {
1034        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1035        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1036    }
1037
1038    if (mNextPTSTimeUs >= 0ll) {
1039        sp<AMessage> extra = new AMessage;
1040        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1041        // ATSParser from skewing the timestamps of access units.
1042        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1043
1044        mTSParser->signalDiscontinuity(
1045                ATSParser::DISCONTINUITY_SEEK, extra);
1046
1047        mNextPTSTimeUs = -1ll;
1048    }
1049
1050    size_t offset = 0;
1051    while (offset + 188 <= buffer->size()) {
1052        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1053
1054        if (err != OK) {
1055            return err;
1056        }
1057
1058        offset += 188;
1059    }
1060    // setRange to indicate consumed bytes.
1061    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1062
1063    status_t err = OK;
1064    for (size_t i = mPacketSources.size(); i-- > 0;) {
1065        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1066
1067        const char *key;
1068        ATSParser::SourceType type;
1069        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1070        switch (stream) {
1071            case LiveSession::STREAMTYPE_VIDEO:
1072                type = ATSParser::VIDEO;
1073                key = "timeUsVideo";
1074                break;
1075
1076            case LiveSession::STREAMTYPE_AUDIO:
1077                type = ATSParser::AUDIO;
1078                key = "timeUsAudio";
1079                break;
1080
1081            case LiveSession::STREAMTYPE_SUBTITLES:
1082            {
1083                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1084                return ERROR_MALFORMED;
1085                break;
1086            }
1087
1088            default:
1089                TRESPASS();
1090        }
1091
1092        sp<AnotherPacketSource> source =
1093            static_cast<AnotherPacketSource *>(
1094                    mTSParser->getSource(type).get());
1095
1096        if (source == NULL) {
1097            continue;
1098        }
1099
1100        int64_t timeUs;
1101        sp<ABuffer> accessUnit;
1102        status_t finalResult;
1103        while (source->hasBufferAvailable(&finalResult)
1104                && source->dequeueAccessUnit(&accessUnit) == OK) {
1105
1106            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1107            if (mMinStartTimeUs > 0) {
1108                if (timeUs < mMinStartTimeUs) {
1109                    // TODO untested path
1110                    // try a later ts
1111                    int32_t targetDuration;
1112                    mPlaylist->meta()->findInt32("target-duration", &targetDuration);
1113                    int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
1114                    if (incr == 0) {
1115                        // increment mSeqNumber by at least one
1116                        incr = 1;
1117                    }
1118                    mSeqNumber += incr;
1119                    err = -EAGAIN;
1120                    break;
1121                } else {
1122                    int64_t startTimeUs;
1123                    if (mStartTimeUsNotify != NULL
1124                            && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1125                        mStartTimeUsNotify->setInt64(key, timeUs);
1126
1127                        uint32_t streamMask = 0;
1128                        mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1129                        streamMask |= mPacketSources.keyAt(i);
1130                        mStartTimeUsNotify->setInt32("streamMask", streamMask);
1131
1132                        if (streamMask == mStreamTypeMask) {
1133                            mStartTimeUsNotify->post();
1134                            mStartTimeUsNotify.clear();
1135                        }
1136                    }
1137                }
1138            }
1139
1140            if (mStopParams != NULL) {
1141                // Queue discontinuity in original stream.
1142                int64_t stopTimeUs;
1143                if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
1144                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1145                    mStreamTypeMask &= ~stream;
1146                    mPacketSources.removeItemsAt(i);
1147                    break;
1148                }
1149            }
1150
1151            // Note that we do NOT dequeue any discontinuities except for format change.
1152
1153            // for simplicity, store a reference to the format in each unit
1154            sp<MetaData> format = source->getFormat();
1155            if (format != NULL) {
1156                accessUnit->meta()->setObject("format", format);
1157            }
1158
1159            // Stash the sequence number so we can hint future playlist where to start at.
1160            accessUnit->meta()->setInt32("seq", mSeqNumber);
1161            packetSource->queueAccessUnit(accessUnit);
1162        }
1163
1164        if (err != OK) {
1165            break;
1166        }
1167    }
1168
1169    if (err != OK) {
1170        for (size_t i = mPacketSources.size(); i-- > 0;) {
1171            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1172            packetSource->clear();
1173        }
1174        return err;
1175    }
1176
1177    if (!mStreamTypeMask) {
1178        // Signal gap is filled between original and new stream.
1179        ALOGV("ERROR OUT OF RANGE");
1180        return ERROR_OUT_OF_RANGE;
1181    }
1182
1183    return OK;
1184}
1185
1186/* static */
1187bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1188        const sp<ABuffer> &buffer) {
1189    size_t pos = 0;
1190
1191    // skip possible BOM
1192    if (buffer->size() >= pos + 3 &&
1193            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1194        pos += 3;
1195    }
1196
1197    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1198    if (buffer->size() < pos + 6 ||
1199            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1200        return false;
1201    }
1202    pos += 6;
1203
1204    if (buffer->size() == pos) {
1205        return true;
1206    }
1207
1208    uint8_t sep = buffer->data()[pos];
1209    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1210}
1211
1212status_t PlaylistFetcher::extractAndQueueAccessUnits(
1213        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1214    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1215        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1216            ALOGE("This stream only contains subtitles.");
1217            return ERROR_MALFORMED;
1218        }
1219
1220        const sp<AnotherPacketSource> packetSource =
1221            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1222
1223        int64_t durationUs;
1224        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1225        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1226        buffer->meta()->setInt64("durationUs", durationUs);
1227        buffer->meta()->setInt32("seq", mSeqNumber);
1228
1229        packetSource->queueAccessUnit(buffer);
1230        return OK;
1231    }
1232
1233    if (mNextPTSTimeUs >= 0ll) {
1234        mFirstPTSValid = false;
1235        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1236        mNextPTSTimeUs = -1ll;
1237    }
1238
1239    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1240    // stream prefixed by an ID3 tag.
1241
1242    bool firstID3Tag = true;
1243    uint64_t PTS = 0;
1244
1245    for (;;) {
1246        // Make sure to skip all ID3 tags preceding the audio data.
1247        // At least one must be present to provide the PTS timestamp.
1248
1249        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1250        if (!id3.isValid()) {
1251            if (firstID3Tag) {
1252                ALOGE("Unable to parse ID3 tag.");
1253                return ERROR_MALFORMED;
1254            } else {
1255                break;
1256            }
1257        }
1258
1259        if (firstID3Tag) {
1260            bool found = false;
1261
1262            ID3::Iterator it(id3, "PRIV");
1263            while (!it.done()) {
1264                size_t length;
1265                const uint8_t *data = it.getData(&length);
1266
1267                static const char *kMatchName =
1268                    "com.apple.streaming.transportStreamTimestamp";
1269                static const size_t kMatchNameLen = strlen(kMatchName);
1270
1271                if (length == kMatchNameLen + 1 + 8
1272                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1273                    found = true;
1274                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1275                }
1276
1277                it.next();
1278            }
1279
1280            if (!found) {
1281                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1282                return ERROR_MALFORMED;
1283            }
1284        }
1285
1286        // skip the ID3 tag
1287        buffer->setRange(
1288                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1289
1290        firstID3Tag = false;
1291    }
1292
1293    if (!mFirstPTSValid) {
1294        mFirstPTSValid = true;
1295        mFirstPTS = PTS;
1296    }
1297    PTS -= mFirstPTS;
1298
1299    int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs;
1300
1301    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1302        ALOGW("This stream only contains audio data!");
1303
1304        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1305
1306        if (mStreamTypeMask == 0) {
1307            return OK;
1308        }
1309    }
1310
1311    sp<AnotherPacketSource> packetSource =
1312        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1313
1314    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1315        ABitReader bits(buffer->data(), buffer->size());
1316
1317        // adts_fixed_header
1318
1319        CHECK_EQ(bits.getBits(12), 0xfffu);
1320        bits.skipBits(3);  // ID, layer
1321        bool protection_absent = bits.getBits(1) != 0;
1322
1323        unsigned profile = bits.getBits(2);
1324        CHECK_NE(profile, 3u);
1325        unsigned sampling_freq_index = bits.getBits(4);
1326        bits.getBits(1);  // private_bit
1327        unsigned channel_configuration = bits.getBits(3);
1328        CHECK_NE(channel_configuration, 0u);
1329        bits.skipBits(2);  // original_copy, home
1330
1331        sp<MetaData> meta = MakeAACCodecSpecificData(
1332                profile, sampling_freq_index, channel_configuration);
1333
1334        meta->setInt32(kKeyIsADTS, true);
1335
1336        packetSource->setFormat(meta);
1337    }
1338
1339    int64_t numSamples = 0ll;
1340    int32_t sampleRate;
1341    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1342
1343    size_t offset = 0;
1344    while (offset < buffer->size()) {
1345        const uint8_t *adtsHeader = buffer->data() + offset;
1346        CHECK_LT(offset + 5, buffer->size());
1347
1348        unsigned aac_frame_length =
1349            ((adtsHeader[3] & 3) << 11)
1350            | (adtsHeader[4] << 3)
1351            | (adtsHeader[5] >> 5);
1352
1353        if (aac_frame_length == 0) {
1354            const uint8_t *id3Header = adtsHeader;
1355            if (!memcmp(id3Header, "ID3", 3)) {
1356                ID3 id3(id3Header, buffer->size() - offset, true);
1357                if (id3.isValid()) {
1358                    offset += id3.rawSize();
1359                    continue;
1360                };
1361            }
1362            return ERROR_MALFORMED;
1363        }
1364
1365        CHECK_LE(offset + aac_frame_length, buffer->size());
1366
1367        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1368        memcpy(unit->data(), adtsHeader, aac_frame_length);
1369
1370        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1371        unit->meta()->setInt64("timeUs", unitTimeUs);
1372
1373        // Each AAC frame encodes 1024 samples.
1374        numSamples += 1024;
1375
1376        unit->meta()->setInt32("seq", mSeqNumber);
1377        packetSource->queueAccessUnit(unit);
1378
1379        offset += aac_frame_length;
1380    }
1381
1382    return OK;
1383}
1384
1385void PlaylistFetcher::updateDuration() {
1386    int64_t durationUs = 0ll;
1387    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1388        sp<AMessage> itemMeta;
1389        CHECK(mPlaylist->itemAt(
1390                    index, NULL /* uri */, &itemMeta));
1391
1392        int64_t itemDurationUs;
1393        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1394
1395        durationUs += itemDurationUs;
1396    }
1397
1398    sp<AMessage> msg = mNotify->dup();
1399    msg->setInt32("what", kWhatDurationUpdate);
1400    msg->setInt64("durationUs", durationUs);
1401    msg->post();
1402}
1403
1404int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1405    int64_t durationUs, threshold;
1406    if (msg->findInt64("durationUs", &durationUs)) {
1407        return kNumSkipFrames * durationUs;
1408    }
1409
1410    sp<RefBase> obj;
1411    msg->findObject("format", &obj);
1412    MetaData *format = static_cast<MetaData *>(obj.get());
1413
1414    const char *mime;
1415    CHECK(format->findCString(kKeyMIMEType, &mime));
1416    bool audio = !strncasecmp(mime, "audio/", 6);
1417    if (audio) {
1418        // Assumes 1000 samples per frame.
1419        int32_t sampleRate;
1420        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1421        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1422                * (1000000 / sampleRate) /* sample duration (us) */;
1423    } else {
1424        int32_t frameRate;
1425        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1426            return kNumSkipFrames * (1000000 / frameRate);
1427        }
1428    }
1429
1430    return 500000ll;
1431}
1432
1433}  // namespace android
1434