PlaylistFetcher.cpp revision 39f5874c4040bec6fdbf0c0912daffcb10010df8
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mSegmentStartTimeUs(-1ll),
66      mDiscontinuitySeq(-1ll),
67      mStartTimeUsRelative(false),
68      mLastPlaylistFetchTimeUs(-1ll),
69      mSeqNumber(-1),
70      mNumRetries(0),
71      mStartup(true),
72      mAdaptive(false),
73      mPrepared(false),
74      mNextPTSTimeUs(-1ll),
75      mMonitorQueueGeneration(0),
76      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
77      mFirstPTSValid(false),
78      mAbsoluteTimeAnchorUs(0ll),
79      mVideoBuffer(new AnotherPacketSource(NULL)) {
80    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
81    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
82    mStartTimeUsNotify->setInt32("streamMask", 0);
83}
84
85PlaylistFetcher::~PlaylistFetcher() {
86}
87
88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
89    CHECK(mPlaylist != NULL);
90
91    int32_t firstSeqNumberInPlaylist;
92    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
93                "media-sequence", &firstSeqNumberInPlaylist)) {
94        firstSeqNumberInPlaylist = 0;
95    }
96
97    int32_t lastSeqNumberInPlaylist =
98        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
99
100    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
101    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
102
103    int64_t segmentStartUs = 0ll;
104    for (int32_t index = 0;
105            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
106        sp<AMessage> itemMeta;
107        CHECK(mPlaylist->itemAt(
108                    index, NULL /* uri */, &itemMeta));
109
110        int64_t itemDurationUs;
111        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
112
113        segmentStartUs += itemDurationUs;
114    }
115
116    return segmentStartUs;
117}
118
119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
120    int64_t nowUs = ALooper::GetNowUs();
121
122    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
123        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
124        return 0ll;
125    }
126
127    if (mPlaylist->isComplete()) {
128        return (~0llu >> 1);
129    }
130
131    int32_t targetDurationSecs;
132    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
133
134    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
135
136    int64_t minPlaylistAgeUs;
137
138    switch (mRefreshState) {
139        case INITIAL_MINIMUM_RELOAD_DELAY:
140        {
141            size_t n = mPlaylist->size();
142            if (n > 0) {
143                sp<AMessage> itemMeta;
144                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
145
146                int64_t itemDurationUs;
147                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
148
149                minPlaylistAgeUs = itemDurationUs;
150                break;
151            }
152
153            // fall through
154        }
155
156        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
157        {
158            minPlaylistAgeUs = targetDurationUs / 2;
159            break;
160        }
161
162        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
163        {
164            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
165            break;
166        }
167
168        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
169        {
170            minPlaylistAgeUs = targetDurationUs * 3;
171            break;
172        }
173
174        default:
175            TRESPASS();
176            break;
177    }
178
179    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
180    return delayUs > 0ll ? delayUs : 0ll;
181}
182
183status_t PlaylistFetcher::decryptBuffer(
184        size_t playlistIndex, const sp<ABuffer> &buffer,
185        bool first) {
186    sp<AMessage> itemMeta;
187    bool found = false;
188    AString method;
189
190    for (ssize_t i = playlistIndex; i >= 0; --i) {
191        AString uri;
192        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
193
194        if (itemMeta->findString("cipher-method", &method)) {
195            found = true;
196            break;
197        }
198    }
199
200    if (!found) {
201        method = "NONE";
202    }
203    buffer->meta()->setString("cipher-method", method.c_str());
204
205    if (method == "NONE") {
206        return OK;
207    } else if (!(method == "AES-128")) {
208        ALOGE("Unsupported cipher method '%s'", method.c_str());
209        return ERROR_UNSUPPORTED;
210    }
211
212    AString keyURI;
213    if (!itemMeta->findString("cipher-uri", &keyURI)) {
214        ALOGE("Missing key uri");
215        return ERROR_MALFORMED;
216    }
217
218    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
219
220    sp<ABuffer> key;
221    if (index >= 0) {
222        key = mAESKeyForURI.valueAt(index);
223    } else {
224        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
225
226        if (err < 0) {
227            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
228            return ERROR_IO;
229        } else if (key->size() != 16) {
230            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
231            return ERROR_MALFORMED;
232        }
233
234        mAESKeyForURI.add(keyURI, key);
235    }
236
237    AES_KEY aes_key;
238    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
239        ALOGE("failed to set AES decryption key.");
240        return UNKNOWN_ERROR;
241    }
242
243    size_t n = buffer->size();
244    if (!n) {
245        return OK;
246    }
247    CHECK(n % 16 == 0);
248
249    if (first) {
250        // If decrypting the first block in a file, read the iv from the manifest
251        // or derive the iv from the file's sequence number.
252
253        AString iv;
254        if (itemMeta->findString("cipher-iv", &iv)) {
255            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
256                    || iv.size() != 16 * 2 + 2) {
257                ALOGE("malformed cipher IV '%s'.", iv.c_str());
258                return ERROR_MALFORMED;
259            }
260
261            memset(mAESInitVec, 0, sizeof(mAESInitVec));
262            for (size_t i = 0; i < 16; ++i) {
263                char c1 = tolower(iv.c_str()[2 + 2 * i]);
264                char c2 = tolower(iv.c_str()[3 + 2 * i]);
265                if (!isxdigit(c1) || !isxdigit(c2)) {
266                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
267                    return ERROR_MALFORMED;
268                }
269                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
270                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
271
272                mAESInitVec[i] = nibble1 << 4 | nibble2;
273            }
274        } else {
275            memset(mAESInitVec, 0, sizeof(mAESInitVec));
276            mAESInitVec[15] = mSeqNumber & 0xff;
277            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
278            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
279            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
280        }
281    }
282
283    AES_cbc_encrypt(
284            buffer->data(), buffer->data(), buffer->size(),
285            &aes_key, mAESInitVec, AES_DECRYPT);
286
287    return OK;
288}
289
290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
291    status_t err;
292    AString method;
293    CHECK(buffer->meta()->findString("cipher-method", &method));
294    if (method == "NONE") {
295        return OK;
296    }
297
298    uint8_t padding = 0;
299    if (buffer->size() > 0) {
300        padding = buffer->data()[buffer->size() - 1];
301    }
302
303    if (padding > 16) {
304        return ERROR_MALFORMED;
305    }
306
307    for (size_t i = buffer->size() - padding; i < padding; i++) {
308        if (buffer->data()[i] != padding) {
309            return ERROR_MALFORMED;
310        }
311    }
312
313    buffer->setRange(buffer->offset(), buffer->size() - padding);
314    return OK;
315}
316
317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
318    int64_t maxDelayUs = delayUsToRefreshPlaylist();
319    if (maxDelayUs < minDelayUs) {
320        maxDelayUs = minDelayUs;
321    }
322    if (delayUs > maxDelayUs) {
323        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
324        delayUs = maxDelayUs;
325    }
326    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
327    msg->setInt32("generation", mMonitorQueueGeneration);
328    msg->post(delayUs);
329}
330
331void PlaylistFetcher::cancelMonitorQueue() {
332    ++mMonitorQueueGeneration;
333}
334
335void PlaylistFetcher::startAsync(
336        const sp<AnotherPacketSource> &audioSource,
337        const sp<AnotherPacketSource> &videoSource,
338        const sp<AnotherPacketSource> &subtitleSource,
339        int64_t startTimeUs,
340        int64_t segmentStartTimeUs,
341        int32_t startDiscontinuitySeq,
342        bool adaptive) {
343    sp<AMessage> msg = new AMessage(kWhatStart, id());
344
345    uint32_t streamTypeMask = 0ul;
346
347    if (audioSource != NULL) {
348        msg->setPointer("audioSource", audioSource.get());
349        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
350    }
351
352    if (videoSource != NULL) {
353        msg->setPointer("videoSource", videoSource.get());
354        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
355    }
356
357    if (subtitleSource != NULL) {
358        msg->setPointer("subtitleSource", subtitleSource.get());
359        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
360    }
361
362    msg->setInt32("streamTypeMask", streamTypeMask);
363    msg->setInt64("startTimeUs", startTimeUs);
364    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
365    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
366    msg->setInt32("adaptive", adaptive);
367    msg->post();
368}
369
370void PlaylistFetcher::pauseAsync() {
371    (new AMessage(kWhatPause, id()))->post();
372}
373
374void PlaylistFetcher::stopAsync(bool clear) {
375    sp<AMessage> msg = new AMessage(kWhatStop, id());
376    msg->setInt32("clear", clear);
377    msg->post();
378}
379
380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
381    AMessage* msg = new AMessage(kWhatResumeUntil, id());
382    msg->setMessage("params", params);
383    msg->post();
384}
385
386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
387    switch (msg->what()) {
388        case kWhatStart:
389        {
390            status_t err = onStart(msg);
391
392            sp<AMessage> notify = mNotify->dup();
393            notify->setInt32("what", kWhatStarted);
394            notify->setInt32("err", err);
395            notify->post();
396            break;
397        }
398
399        case kWhatPause:
400        {
401            onPause();
402
403            sp<AMessage> notify = mNotify->dup();
404            notify->setInt32("what", kWhatPaused);
405            notify->post();
406            break;
407        }
408
409        case kWhatStop:
410        {
411            onStop(msg);
412
413            sp<AMessage> notify = mNotify->dup();
414            notify->setInt32("what", kWhatStopped);
415            notify->post();
416            break;
417        }
418
419        case kWhatMonitorQueue:
420        case kWhatDownloadNext:
421        {
422            int32_t generation;
423            CHECK(msg->findInt32("generation", &generation));
424
425            if (generation != mMonitorQueueGeneration) {
426                // Stale event
427                break;
428            }
429
430            if (msg->what() == kWhatMonitorQueue) {
431                onMonitorQueue();
432            } else {
433                onDownloadNext();
434            }
435            break;
436        }
437
438        case kWhatResumeUntil:
439        {
440            onResumeUntil(msg);
441            break;
442        }
443
444        default:
445            TRESPASS();
446    }
447}
448
449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
450    mPacketSources.clear();
451
452    uint32_t streamTypeMask;
453    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
454
455    int64_t startTimeUs;
456    int64_t segmentStartTimeUs;
457    int32_t startDiscontinuitySeq;
458    int32_t adaptive;
459    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
460    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
461    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
462    CHECK(msg->findInt32("adaptive", &adaptive));
463
464    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
465        void *ptr;
466        CHECK(msg->findPointer("audioSource", &ptr));
467
468        mPacketSources.add(
469                LiveSession::STREAMTYPE_AUDIO,
470                static_cast<AnotherPacketSource *>(ptr));
471    }
472
473    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
474        void *ptr;
475        CHECK(msg->findPointer("videoSource", &ptr));
476
477        mPacketSources.add(
478                LiveSession::STREAMTYPE_VIDEO,
479                static_cast<AnotherPacketSource *>(ptr));
480    }
481
482    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
483        void *ptr;
484        CHECK(msg->findPointer("subtitleSource", &ptr));
485
486        mPacketSources.add(
487                LiveSession::STREAMTYPE_SUBTITLES,
488                static_cast<AnotherPacketSource *>(ptr));
489    }
490
491    mStreamTypeMask = streamTypeMask;
492
493    mStartTimeUs = startTimeUs;
494    mSegmentStartTimeUs = segmentStartTimeUs;
495    mDiscontinuitySeq = startDiscontinuitySeq;
496
497    if (mStartTimeUs >= 0ll) {
498        mSeqNumber = -1;
499        mStartup = true;
500        mPrepared = false;
501        mAdaptive = adaptive;
502    }
503
504    postMonitorQueue();
505
506    return OK;
507}
508
509void PlaylistFetcher::onPause() {
510    cancelMonitorQueue();
511}
512
513void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
514    cancelMonitorQueue();
515
516    int32_t clear;
517    CHECK(msg->findInt32("clear", &clear));
518    if (clear) {
519        for (size_t i = 0; i < mPacketSources.size(); i++) {
520            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
521            packetSource->clear();
522        }
523    }
524
525    mPacketSources.clear();
526    mStreamTypeMask = 0;
527}
528
529// Resume until we have reached the boundary timestamps listed in `msg`; when
530// the remaining time is too short (within a resume threshold) stop immediately
531// instead.
532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
533    sp<AMessage> params;
534    CHECK(msg->findMessage("params", &params));
535
536    bool stop = false;
537    for (size_t i = 0; i < mPacketSources.size(); i++) {
538        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
539
540        const char *stopKey;
541        int streamType = mPacketSources.keyAt(i);
542        switch (streamType) {
543        case LiveSession::STREAMTYPE_VIDEO:
544            stopKey = "timeUsVideo";
545            break;
546
547        case LiveSession::STREAMTYPE_AUDIO:
548            stopKey = "timeUsAudio";
549            break;
550
551        case LiveSession::STREAMTYPE_SUBTITLES:
552            stopKey = "timeUsSubtitle";
553            break;
554
555        default:
556            TRESPASS();
557        }
558
559        // Don't resume if we would stop within a resume threshold.
560        int32_t discontinuitySeq;
561        int64_t latestTimeUs = 0, stopTimeUs = 0;
562        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
563        if (latestMeta != NULL
564                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
565                && discontinuitySeq == mDiscontinuitySeq
566                && latestMeta->findInt64("timeUs", &latestTimeUs)
567                && params->findInt64(stopKey, &stopTimeUs)
568                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
569            stop = true;
570        }
571    }
572
573    if (stop) {
574        for (size_t i = 0; i < mPacketSources.size(); i++) {
575            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
576        }
577        stopAsync(/* clear = */ false);
578        return OK;
579    }
580
581    mStopParams = params;
582    postMonitorQueue();
583
584    return OK;
585}
586
587void PlaylistFetcher::notifyError(status_t err) {
588    sp<AMessage> notify = mNotify->dup();
589    notify->setInt32("what", kWhatError);
590    notify->setInt32("err", err);
591    notify->post();
592}
593
594void PlaylistFetcher::queueDiscontinuity(
595        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
596    for (size_t i = 0; i < mPacketSources.size(); ++i) {
597        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
598        // (seek will discard buffer by abandoning old fetchers)
599        mPacketSources.valueAt(i)->queueDiscontinuity(
600                type, extra, false /* discard */);
601    }
602}
603
604void PlaylistFetcher::onMonitorQueue() {
605    bool downloadMore = false;
606    refreshPlaylist();
607
608    int32_t targetDurationSecs;
609    int64_t targetDurationUs = kMinBufferedDurationUs;
610    if (mPlaylist != NULL) {
611        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
612        targetDurationUs = targetDurationSecs * 1000000ll;
613    }
614
615    // buffer at least 3 times the target duration, or up to 10 seconds
616    int64_t durationToBufferUs = targetDurationUs * 3;
617    if (durationToBufferUs > kMinBufferedDurationUs)  {
618        durationToBufferUs = kMinBufferedDurationUs;
619    }
620
621    int64_t bufferedDurationUs = 0ll;
622    status_t finalResult = NOT_ENOUGH_DATA;
623    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
624        sp<AnotherPacketSource> packetSource =
625            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
626
627        bufferedDurationUs =
628                packetSource->getBufferedDurationUs(&finalResult);
629        finalResult = OK;
630    } else {
631        // Use max stream duration to prevent us from waiting on a non-existent stream;
632        // when we cannot make out from the manifest what streams are included in a playlist
633        // we might assume extra streams.
634        for (size_t i = 0; i < mPacketSources.size(); ++i) {
635            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
636                continue;
637            }
638
639            int64_t bufferedStreamDurationUs =
640                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
641            ALOGV("buffered %" PRId64 " for stream %d",
642                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
643            if (bufferedStreamDurationUs > bufferedDurationUs) {
644                bufferedDurationUs = bufferedStreamDurationUs;
645            }
646        }
647    }
648    downloadMore = (bufferedDurationUs < durationToBufferUs);
649
650    // signal start if buffered up at least the target size
651    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
652        mPrepared = true;
653
654        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
655                bufferedDurationUs, targetDurationUs);
656        sp<AMessage> msg = mNotify->dup();
657        msg->setInt32("what", kWhatTemporarilyDoneFetching);
658        msg->post();
659    }
660
661    if (finalResult == OK && downloadMore) {
662        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
663                bufferedDurationUs, durationToBufferUs);
664        // delay the next download slightly; hopefully this gives other concurrent fetchers
665        // a better chance to run.
666        // onDownloadNext();
667        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
668        msg->setInt32("generation", mMonitorQueueGeneration);
669        msg->post(1000l);
670    } else {
671        // Nothing to do yet, try again in a second.
672
673        sp<AMessage> msg = mNotify->dup();
674        msg->setInt32("what", kWhatTemporarilyDoneFetching);
675        msg->post();
676
677        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
678        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
679                delayUs, bufferedDurationUs, durationToBufferUs);
680        // :TRICKY: need to enforce minimum delay because the delay to
681        // refresh the playlist will become 0
682        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
683    }
684}
685
686status_t PlaylistFetcher::refreshPlaylist() {
687    if (delayUsToRefreshPlaylist() <= 0) {
688        bool unchanged;
689        sp<M3UParser> playlist = mSession->fetchPlaylist(
690                mURI.c_str(), mPlaylistHash, &unchanged);
691
692        if (playlist == NULL) {
693            if (unchanged) {
694                // We succeeded in fetching the playlist, but it was
695                // unchanged from the last time we tried.
696
697                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
698                    mRefreshState = (RefreshState)(mRefreshState + 1);
699                }
700            } else {
701                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
702                notifyError(ERROR_IO);
703                return ERROR_IO;
704            }
705        } else {
706            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
707            mPlaylist = playlist;
708
709            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
710                updateDuration();
711            }
712        }
713
714        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
715    }
716    return OK;
717}
718
719// static
720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
721    return buffer->size() > 0 && buffer->data()[0] == 0x47;
722}
723
724void PlaylistFetcher::onDownloadNext() {
725    if (refreshPlaylist() != OK) {
726        return;
727    }
728
729    int32_t firstSeqNumberInPlaylist;
730    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
731                "media-sequence", &firstSeqNumberInPlaylist)) {
732        firstSeqNumberInPlaylist = 0;
733    }
734
735    bool discontinuity = false;
736
737    const int32_t lastSeqNumberInPlaylist =
738        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
739
740    if (mStartup && mSeqNumber >= 0
741            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
742        // in case we guessed wrong during reconfiguration, try fetching the latest content.
743        mSeqNumber = lastSeqNumberInPlaylist;
744    }
745
746    if (mDiscontinuitySeq < 0) {
747        mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
748    }
749
750    if (mSeqNumber < 0) {
751        CHECK_GE(mStartTimeUs, 0ll);
752
753        if (mSegmentStartTimeUs < 0) {
754            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
755                // If this is a live session, start 3 segments from the end on connect
756                mSeqNumber = lastSeqNumberInPlaylist - 3;
757                if (mSeqNumber < firstSeqNumberInPlaylist) {
758                    mSeqNumber = firstSeqNumberInPlaylist;
759                }
760            } else {
761                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
762                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
763            }
764            mStartTimeUsRelative = true;
765            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
766                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
767                    lastSeqNumberInPlaylist);
768        } else {
769            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
770            if (mAdaptive) {
771                // avoid double fetch/decode
772                mSeqNumber += 1;
773            }
774            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
775            if (mSeqNumber < minSeq) {
776                mSeqNumber = minSeq;
777            }
778
779            if (mSeqNumber < firstSeqNumberInPlaylist) {
780                mSeqNumber = firstSeqNumberInPlaylist;
781            }
782
783            if (mSeqNumber > lastSeqNumberInPlaylist) {
784                mSeqNumber = lastSeqNumberInPlaylist;
785            }
786            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
787                    mSeqNumber, firstSeqNumberInPlaylist,
788                    lastSeqNumberInPlaylist);
789        }
790    }
791
792    if (mSeqNumber < firstSeqNumberInPlaylist
793            || mSeqNumber > lastSeqNumberInPlaylist) {
794        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
795            ++mNumRetries;
796
797            if (mSeqNumber > lastSeqNumberInPlaylist) {
798                // refresh in increasing fraction (1/2, 1/3, ...) of the
799                // playlist's target duration or 3 seconds, whichever is less
800                int32_t targetDurationSecs;
801                CHECK(mPlaylist->meta()->findInt32(
802                        "target-duration", &targetDurationSecs));
803                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
804                        1000000ll / (1 + mNumRetries);
805                if (delayUs > kMaxMonitorDelayUs) {
806                    delayUs = kMaxMonitorDelayUs;
807                }
808                ALOGV("sequence number high: %d from (%d .. %d), "
809                      "monitor in %" PRId64 " (retry=%d)",
810                        mSeqNumber, firstSeqNumberInPlaylist,
811                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
812                postMonitorQueue(delayUs);
813                return;
814            }
815
816            // we've missed the boat, let's start from the lowest sequence
817            // number available and signal a discontinuity.
818
819            ALOGI("We've missed the boat, restarting playback."
820                  "  mStartup=%d, was  looking for %d in %d-%d",
821                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
822                    lastSeqNumberInPlaylist);
823            mSeqNumber = lastSeqNumberInPlaylist - 3;
824            if (mSeqNumber < firstSeqNumberInPlaylist) {
825                mSeqNumber = firstSeqNumberInPlaylist;
826            }
827            discontinuity = true;
828
829            // fall through
830        } else {
831            ALOGE("Cannot find sequence number %d in playlist "
832                 "(contains %d - %d)",
833                 mSeqNumber, firstSeqNumberInPlaylist,
834                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
835
836            notifyError(ERROR_END_OF_STREAM);
837            return;
838        }
839    }
840
841    mNumRetries = 0;
842
843    AString uri;
844    sp<AMessage> itemMeta;
845    CHECK(mPlaylist->itemAt(
846                mSeqNumber - firstSeqNumberInPlaylist,
847                &uri,
848                &itemMeta));
849
850    int32_t val;
851    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
852        mDiscontinuitySeq++;
853        discontinuity = true;
854    }
855
856    int64_t range_offset, range_length;
857    if (!itemMeta->findInt64("range-offset", &range_offset)
858            || !itemMeta->findInt64("range-length", &range_length)) {
859        range_offset = 0;
860        range_length = -1;
861    }
862
863    ALOGV("fetching segment %d from (%d .. %d)",
864          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
865
866    ALOGV("fetching '%s'", uri.c_str());
867
868    sp<DataSource> source;
869    sp<ABuffer> buffer, tsBuffer;
870    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
871    // this avoids interleaved connections to the key and segment file.
872    {
873        sp<ABuffer> junk = new ABuffer(16);
874        junk->setRange(0, 16);
875        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
876                true /* first */);
877        if (err != OK) {
878            notifyError(err);
879            return;
880        }
881    }
882
883    // block-wise download
884    bool startup = mStartup;
885    ssize_t bytesRead;
886    do {
887        bytesRead = mSession->fetchFile(
888                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
889
890        if (bytesRead < 0) {
891            status_t err = bytesRead;
892            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
893            notifyError(err);
894            return;
895        }
896
897        CHECK(buffer != NULL);
898
899        size_t size = buffer->size();
900        // Set decryption range.
901        buffer->setRange(size - bytesRead, bytesRead);
902        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
903                buffer->offset() == 0 /* first */);
904        // Unset decryption range.
905        buffer->setRange(0, size);
906
907        if (err != OK) {
908            ALOGE("decryptBuffer failed w/ error %d", err);
909
910            notifyError(err);
911            return;
912        }
913
914        if (startup || discontinuity) {
915            // Signal discontinuity.
916
917            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
918                // If this was a live event this made no sense since
919                // we don't have access to all the segment before the current
920                // one.
921                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
922            }
923
924            if (discontinuity) {
925                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
926
927                queueDiscontinuity(
928                        ATSParser::DISCONTINUITY_FORMATCHANGE,
929                        NULL /* extra */);
930
931                discontinuity = false;
932            }
933
934            startup = false;
935        }
936
937        err = OK;
938        if (bufferStartsWithTsSyncByte(buffer)) {
939            // Incremental extraction is only supported for MPEG2 transport streams.
940            if (tsBuffer == NULL) {
941                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
942                tsBuffer->setRange(0, 0);
943            } else if (tsBuffer->capacity() != buffer->capacity()) {
944                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
945                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
946                tsBuffer->setRange(tsOff, tsSize);
947            }
948            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
949
950            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
951        }
952
953        if (err == -EAGAIN) {
954            // starting sequence number too low
955            mTSParser.clear();
956            postMonitorQueue();
957            return;
958        } else if (err == ERROR_OUT_OF_RANGE) {
959            // reached stopping point
960            stopAsync(/* clear = */ false);
961            return;
962        } else if (err != OK) {
963            notifyError(err);
964            return;
965        }
966
967    } while (bytesRead != 0);
968
969    if (bufferStartsWithTsSyncByte(buffer)) {
970        // If we still don't see a stream after fetching a full ts segment mark it as
971        // nonexistent.
972        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
973        ATSParser::SourceType srcTypes[kNumTypes] =
974                { ATSParser::VIDEO, ATSParser::AUDIO };
975        LiveSession::StreamType streamTypes[kNumTypes] =
976                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
977
978        for (size_t i = 0; i < kNumTypes; i++) {
979            ATSParser::SourceType srcType = srcTypes[i];
980            LiveSession::StreamType streamType = streamTypes[i];
981
982            sp<AnotherPacketSource> source =
983                static_cast<AnotherPacketSource *>(
984                    mTSParser->getSource(srcType).get());
985
986            if (source == NULL) {
987                ALOGW("MPEG2 Transport stream does not contain %s data.",
988                      srcType == ATSParser::VIDEO ? "video" : "audio");
989
990                mStreamTypeMask &= ~streamType;
991                mPacketSources.removeItem(streamType);
992            }
993        }
994
995    }
996
997    if (checkDecryptPadding(buffer) != OK) {
998        ALOGE("Incorrect padding bytes after decryption.");
999        notifyError(ERROR_MALFORMED);
1000        return;
1001    }
1002
1003    status_t err = OK;
1004    if (tsBuffer != NULL) {
1005        AString method;
1006        CHECK(buffer->meta()->findString("cipher-method", &method));
1007        if ((tsBuffer->size() > 0 && method == "NONE")
1008                || tsBuffer->size() > 16) {
1009            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1010                    "bytes in length.");
1011            notifyError(ERROR_MALFORMED);
1012            return;
1013        }
1014    }
1015
1016    // bulk extract non-ts files
1017    if (tsBuffer == NULL) {
1018      err = extractAndQueueAccessUnits(buffer, itemMeta);
1019    }
1020
1021    if (err != OK) {
1022        notifyError(err);
1023        return;
1024    }
1025
1026    mStartup = false;
1027    ++mSeqNumber;
1028
1029    postMonitorQueue();
1030}
1031
1032int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1033    int32_t firstSeqNumberInPlaylist;
1034    if (mPlaylist->meta() == NULL
1035            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1036        firstSeqNumberInPlaylist = 0;
1037    }
1038
1039    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1040    if (discontinuitySeq < curDiscontinuitySeq) {
1041        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1042    }
1043
1044    size_t index = 0;
1045    while (index < mPlaylist->size()) {
1046        sp<AMessage> itemMeta;
1047        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1048
1049        int64_t discontinuity;
1050        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1051            curDiscontinuitySeq++;
1052        }
1053
1054        if (curDiscontinuitySeq == discontinuitySeq) {
1055            return firstSeqNumberInPlaylist + index;
1056        }
1057
1058        ++index;
1059    }
1060
1061    return firstSeqNumberInPlaylist + mPlaylist->size();
1062}
1063
1064int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1065    int32_t firstSeqNumberInPlaylist;
1066    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1067                "media-sequence", &firstSeqNumberInPlaylist)) {
1068        firstSeqNumberInPlaylist = 0;
1069    }
1070
1071    size_t index = 0;
1072    int64_t segmentStartUs = 0;
1073    while (index < mPlaylist->size()) {
1074        sp<AMessage> itemMeta;
1075        CHECK(mPlaylist->itemAt(
1076                    index, NULL /* uri */, &itemMeta));
1077
1078        int64_t itemDurationUs;
1079        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1080
1081        if (timeUs < segmentStartUs + itemDurationUs) {
1082            break;
1083        }
1084
1085        segmentStartUs += itemDurationUs;
1086        ++index;
1087    }
1088
1089    if (index >= mPlaylist->size()) {
1090        index = mPlaylist->size() - 1;
1091    }
1092
1093    return firstSeqNumberInPlaylist + index;
1094}
1095
1096const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1097        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1098    sp<MetaData> format = source->getFormat();
1099    if (format != NULL) {
1100        // for simplicity, store a reference to the format in each unit
1101        accessUnit->meta()->setObject("format", format);
1102    }
1103
1104    if (discard) {
1105        accessUnit->meta()->setInt32("discard", discard);
1106    }
1107
1108    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1109    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1110    return accessUnit;
1111}
1112
1113status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1114    if (mTSParser == NULL) {
1115        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1116        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1117    }
1118
1119    if (mNextPTSTimeUs >= 0ll) {
1120        sp<AMessage> extra = new AMessage;
1121        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1122        // ATSParser from skewing the timestamps of access units.
1123        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1124
1125        mTSParser->signalDiscontinuity(
1126                ATSParser::DISCONTINUITY_SEEK, extra);
1127
1128        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1129        mNextPTSTimeUs = -1ll;
1130        mFirstPTSValid = false;
1131    }
1132
1133    size_t offset = 0;
1134    while (offset + 188 <= buffer->size()) {
1135        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1136
1137        if (err != OK) {
1138            return err;
1139        }
1140
1141        offset += 188;
1142    }
1143    // setRange to indicate consumed bytes.
1144    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1145
1146    status_t err = OK;
1147    for (size_t i = mPacketSources.size(); i-- > 0;) {
1148        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1149
1150        const char *key;
1151        ATSParser::SourceType type;
1152        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1153        switch (stream) {
1154            case LiveSession::STREAMTYPE_VIDEO:
1155                type = ATSParser::VIDEO;
1156                key = "timeUsVideo";
1157                break;
1158
1159            case LiveSession::STREAMTYPE_AUDIO:
1160                type = ATSParser::AUDIO;
1161                key = "timeUsAudio";
1162                break;
1163
1164            case LiveSession::STREAMTYPE_SUBTITLES:
1165            {
1166                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1167                return ERROR_MALFORMED;
1168                break;
1169            }
1170
1171            default:
1172                TRESPASS();
1173        }
1174
1175        sp<AnotherPacketSource> source =
1176            static_cast<AnotherPacketSource *>(
1177                    mTSParser->getSource(type).get());
1178
1179        if (source == NULL) {
1180            continue;
1181        }
1182
1183        int64_t timeUs;
1184        sp<ABuffer> accessUnit;
1185        status_t finalResult;
1186        while (source->hasBufferAvailable(&finalResult)
1187                && source->dequeueAccessUnit(&accessUnit) == OK) {
1188
1189            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1190
1191            if (mStartup) {
1192                if (!mFirstPTSValid) {
1193                    mFirstTimeUs = timeUs;
1194                    mFirstPTSValid = true;
1195                }
1196                if (mStartTimeUsRelative) {
1197                    timeUs -= mFirstTimeUs;
1198                    if (timeUs < 0) {
1199                        timeUs = 0;
1200                    }
1201                } else if (mAdaptive && timeUs > mStartTimeUs) {
1202                    int32_t seq;
1203                    if (mStartTimeUsNotify != NULL
1204                            && !mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1205                        mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1206                    }
1207                    int64_t startTimeUs;
1208                    if (mStartTimeUsNotify != NULL
1209                            && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1210                        mStartTimeUsNotify->setInt64(key, timeUs);
1211
1212                        uint32_t streamMask = 0;
1213                        mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1214                        streamMask |= mPacketSources.keyAt(i);
1215                        mStartTimeUsNotify->setInt32("streamMask", streamMask);
1216
1217                        if (streamMask == mStreamTypeMask) {
1218                            mStartTimeUsNotify->post();
1219                            mStartTimeUsNotify.clear();
1220                        }
1221                    }
1222                }
1223
1224                if (timeUs < mStartTimeUs) {
1225                    if (mAdaptive) {
1226                        int32_t targetDuration;
1227                        mPlaylist->meta()->findInt32("target-duration", &targetDuration);
1228                        int32_t incr = (mStartTimeUs - timeUs) / 1000000 / targetDuration;
1229                        if (incr == 0) {
1230                            // increment mSeqNumber by at least one
1231                            incr = 1;
1232                        }
1233                        mSeqNumber += incr;
1234                        err = -EAGAIN;
1235                        break;
1236                    } else {
1237                        // buffer up to the closest preceding IDR frame
1238                        ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1239                                timeUs, mStartTimeUs);
1240                        const char *mime;
1241                        sp<MetaData> format  = source->getFormat();
1242                        bool isAvc = false;
1243                        if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1244                                && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1245                            isAvc = true;
1246                        }
1247                        if (isAvc && IsIDR(accessUnit)) {
1248                            mVideoBuffer->clear();
1249                        }
1250                        if (isAvc) {
1251                            mVideoBuffer->queueAccessUnit(accessUnit);
1252                        }
1253
1254                        continue;
1255                    }
1256                }
1257            }
1258
1259            if (mStopParams != NULL) {
1260                // Queue discontinuity in original stream.
1261                int32_t discontinuitySeq;
1262                int64_t stopTimeUs;
1263                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1264                        || discontinuitySeq > mDiscontinuitySeq
1265                        || !mStopParams->findInt64(key, &stopTimeUs)
1266                        || (discontinuitySeq == mDiscontinuitySeq
1267                                && timeUs >= stopTimeUs)) {
1268                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1269                    mStreamTypeMask &= ~stream;
1270                    mPacketSources.removeItemsAt(i);
1271                    break;
1272                }
1273            }
1274
1275            // Note that we do NOT dequeue any discontinuities except for format change.
1276            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1277                const bool discard = true;
1278                status_t status;
1279                while (mVideoBuffer->hasBufferAvailable(&status)) {
1280                    sp<ABuffer> videoBuffer;
1281                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1282                    setAccessUnitProperties(videoBuffer, source, discard);
1283                    packetSource->queueAccessUnit(videoBuffer);
1284                }
1285            }
1286
1287            setAccessUnitProperties(accessUnit, source);
1288            packetSource->queueAccessUnit(accessUnit);
1289        }
1290
1291        if (err != OK) {
1292            break;
1293        }
1294    }
1295
1296    if (err != OK) {
1297        for (size_t i = mPacketSources.size(); i-- > 0;) {
1298            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1299            packetSource->clear();
1300        }
1301        return err;
1302    }
1303
1304    if (!mStreamTypeMask) {
1305        // Signal gap is filled between original and new stream.
1306        ALOGV("ERROR OUT OF RANGE");
1307        return ERROR_OUT_OF_RANGE;
1308    }
1309
1310    return OK;
1311}
1312
1313/* static */
1314bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1315        const sp<ABuffer> &buffer) {
1316    size_t pos = 0;
1317
1318    // skip possible BOM
1319    if (buffer->size() >= pos + 3 &&
1320            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1321        pos += 3;
1322    }
1323
1324    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1325    if (buffer->size() < pos + 6 ||
1326            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1327        return false;
1328    }
1329    pos += 6;
1330
1331    if (buffer->size() == pos) {
1332        return true;
1333    }
1334
1335    uint8_t sep = buffer->data()[pos];
1336    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1337}
1338
1339status_t PlaylistFetcher::extractAndQueueAccessUnits(
1340        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1341    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1342        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1343            ALOGE("This stream only contains subtitles.");
1344            return ERROR_MALFORMED;
1345        }
1346
1347        const sp<AnotherPacketSource> packetSource =
1348            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1349
1350        int64_t durationUs;
1351        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1352        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1353        buffer->meta()->setInt64("durationUs", durationUs);
1354        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1355        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1356
1357        packetSource->queueAccessUnit(buffer);
1358        return OK;
1359    }
1360
1361    if (mNextPTSTimeUs >= 0ll) {
1362        mFirstPTSValid = false;
1363        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1364        mNextPTSTimeUs = -1ll;
1365    }
1366
1367    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1368    // stream prefixed by an ID3 tag.
1369
1370    bool firstID3Tag = true;
1371    uint64_t PTS = 0;
1372
1373    for (;;) {
1374        // Make sure to skip all ID3 tags preceding the audio data.
1375        // At least one must be present to provide the PTS timestamp.
1376
1377        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1378        if (!id3.isValid()) {
1379            if (firstID3Tag) {
1380                ALOGE("Unable to parse ID3 tag.");
1381                return ERROR_MALFORMED;
1382            } else {
1383                break;
1384            }
1385        }
1386
1387        if (firstID3Tag) {
1388            bool found = false;
1389
1390            ID3::Iterator it(id3, "PRIV");
1391            while (!it.done()) {
1392                size_t length;
1393                const uint8_t *data = it.getData(&length);
1394
1395                static const char *kMatchName =
1396                    "com.apple.streaming.transportStreamTimestamp";
1397                static const size_t kMatchNameLen = strlen(kMatchName);
1398
1399                if (length == kMatchNameLen + 1 + 8
1400                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1401                    found = true;
1402                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1403                }
1404
1405                it.next();
1406            }
1407
1408            if (!found) {
1409                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1410                return ERROR_MALFORMED;
1411            }
1412        }
1413
1414        // skip the ID3 tag
1415        buffer->setRange(
1416                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1417
1418        firstID3Tag = false;
1419    }
1420
1421    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1422        ALOGW("This stream only contains audio data!");
1423
1424        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1425
1426        if (mStreamTypeMask == 0) {
1427            return OK;
1428        }
1429    }
1430
1431    sp<AnotherPacketSource> packetSource =
1432        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1433
1434    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1435        ABitReader bits(buffer->data(), buffer->size());
1436
1437        // adts_fixed_header
1438
1439        CHECK_EQ(bits.getBits(12), 0xfffu);
1440        bits.skipBits(3);  // ID, layer
1441        bool protection_absent = bits.getBits(1) != 0;
1442
1443        unsigned profile = bits.getBits(2);
1444        CHECK_NE(profile, 3u);
1445        unsigned sampling_freq_index = bits.getBits(4);
1446        bits.getBits(1);  // private_bit
1447        unsigned channel_configuration = bits.getBits(3);
1448        CHECK_NE(channel_configuration, 0u);
1449        bits.skipBits(2);  // original_copy, home
1450
1451        sp<MetaData> meta = MakeAACCodecSpecificData(
1452                profile, sampling_freq_index, channel_configuration);
1453
1454        meta->setInt32(kKeyIsADTS, true);
1455
1456        packetSource->setFormat(meta);
1457    }
1458
1459    int64_t numSamples = 0ll;
1460    int32_t sampleRate;
1461    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1462
1463    int64_t timeUs = (PTS * 100ll) / 9ll;
1464    if (!mFirstPTSValid) {
1465        mFirstPTSValid = true;
1466        mFirstTimeUs = timeUs;
1467    }
1468
1469    size_t offset = 0;
1470    while (offset < buffer->size()) {
1471        const uint8_t *adtsHeader = buffer->data() + offset;
1472        CHECK_LT(offset + 5, buffer->size());
1473
1474        unsigned aac_frame_length =
1475            ((adtsHeader[3] & 3) << 11)
1476            | (adtsHeader[4] << 3)
1477            | (adtsHeader[5] >> 5);
1478
1479        if (aac_frame_length == 0) {
1480            const uint8_t *id3Header = adtsHeader;
1481            if (!memcmp(id3Header, "ID3", 3)) {
1482                ID3 id3(id3Header, buffer->size() - offset, true);
1483                if (id3.isValid()) {
1484                    offset += id3.rawSize();
1485                    continue;
1486                };
1487            }
1488            return ERROR_MALFORMED;
1489        }
1490
1491        CHECK_LE(offset + aac_frame_length, buffer->size());
1492
1493        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1494        offset += aac_frame_length;
1495
1496        // Each AAC frame encodes 1024 samples.
1497        numSamples += 1024;
1498
1499        if (mStartup) {
1500            int64_t startTimeUs = unitTimeUs;
1501            if (mStartTimeUsRelative) {
1502                startTimeUs -= mFirstTimeUs;
1503                if (startTimeUs  < 0) {
1504                    startTimeUs = 0;
1505                }
1506            }
1507            if (startTimeUs < mStartTimeUs) {
1508                continue;
1509            }
1510        }
1511
1512        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1513        memcpy(unit->data(), adtsHeader, aac_frame_length);
1514
1515        unit->meta()->setInt64("timeUs", unitTimeUs);
1516        unit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1517        unit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1518        packetSource->queueAccessUnit(unit);
1519    }
1520
1521    return OK;
1522}
1523
1524void PlaylistFetcher::updateDuration() {
1525    int64_t durationUs = 0ll;
1526    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1527        sp<AMessage> itemMeta;
1528        CHECK(mPlaylist->itemAt(
1529                    index, NULL /* uri */, &itemMeta));
1530
1531        int64_t itemDurationUs;
1532        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1533
1534        durationUs += itemDurationUs;
1535    }
1536
1537    sp<AMessage> msg = mNotify->dup();
1538    msg->setInt32("what", kWhatDurationUpdate);
1539    msg->setInt64("durationUs", durationUs);
1540    msg->post();
1541}
1542
1543int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1544    int64_t durationUs, threshold;
1545    if (msg->findInt64("durationUs", &durationUs)) {
1546        return kNumSkipFrames * durationUs;
1547    }
1548
1549    sp<RefBase> obj;
1550    msg->findObject("format", &obj);
1551    MetaData *format = static_cast<MetaData *>(obj.get());
1552
1553    const char *mime;
1554    CHECK(format->findCString(kKeyMIMEType, &mime));
1555    bool audio = !strncasecmp(mime, "audio/", 6);
1556    if (audio) {
1557        // Assumes 1000 samples per frame.
1558        int32_t sampleRate;
1559        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1560        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1561                * (1000000 / sampleRate) /* sample duration (us) */;
1562    } else {
1563        int32_t frameRate;
1564        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1565            return kNumSkipFrames * (1000000 / frameRate);
1566        }
1567    }
1568
1569    return 500000ll;
1570}
1571
1572}  // namespace android
1573