PlaylistFetcher.cpp revision 309aa8bf5e4cd66fe988adf2654cac3fadc2a1c3
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mSegmentStartTimeUs(-1ll),
66      mDiscontinuitySeq(-1ll),
67      mStartTimeUsRelative(false),
68      mLastPlaylistFetchTimeUs(-1ll),
69      mSeqNumber(-1),
70      mNumRetries(0),
71      mStartup(true),
72      mAdaptive(false),
73      mPrepared(false),
74      mNextPTSTimeUs(-1ll),
75      mMonitorQueueGeneration(0),
76      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
77      mFirstPTSValid(false),
78      mAbsoluteTimeAnchorUs(0ll),
79      mVideoBuffer(new AnotherPacketSource(NULL)) {
80    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
81    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
82    mStartTimeUsNotify->setInt32("streamMask", 0);
83}
84
85PlaylistFetcher::~PlaylistFetcher() {
86}
87
88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
89    CHECK(mPlaylist != NULL);
90
91    int32_t firstSeqNumberInPlaylist;
92    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
93                "media-sequence", &firstSeqNumberInPlaylist)) {
94        firstSeqNumberInPlaylist = 0;
95    }
96
97    int32_t lastSeqNumberInPlaylist =
98        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
99
100    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
101    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
102
103    int64_t segmentStartUs = 0ll;
104    for (int32_t index = 0;
105            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
106        sp<AMessage> itemMeta;
107        CHECK(mPlaylist->itemAt(
108                    index, NULL /* uri */, &itemMeta));
109
110        int64_t itemDurationUs;
111        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
112
113        segmentStartUs += itemDurationUs;
114    }
115
116    return segmentStartUs;
117}
118
119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
120    int64_t nowUs = ALooper::GetNowUs();
121
122    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
123        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
124        return 0ll;
125    }
126
127    if (mPlaylist->isComplete()) {
128        return (~0llu >> 1);
129    }
130
131    int32_t targetDurationSecs;
132    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
133
134    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
135
136    int64_t minPlaylistAgeUs;
137
138    switch (mRefreshState) {
139        case INITIAL_MINIMUM_RELOAD_DELAY:
140        {
141            size_t n = mPlaylist->size();
142            if (n > 0) {
143                sp<AMessage> itemMeta;
144                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
145
146                int64_t itemDurationUs;
147                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
148
149                minPlaylistAgeUs = itemDurationUs;
150                break;
151            }
152
153            // fall through
154        }
155
156        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
157        {
158            minPlaylistAgeUs = targetDurationUs / 2;
159            break;
160        }
161
162        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
163        {
164            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
165            break;
166        }
167
168        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
169        {
170            minPlaylistAgeUs = targetDurationUs * 3;
171            break;
172        }
173
174        default:
175            TRESPASS();
176            break;
177    }
178
179    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
180    return delayUs > 0ll ? delayUs : 0ll;
181}
182
183status_t PlaylistFetcher::decryptBuffer(
184        size_t playlistIndex, const sp<ABuffer> &buffer,
185        bool first) {
186    sp<AMessage> itemMeta;
187    bool found = false;
188    AString method;
189
190    for (ssize_t i = playlistIndex; i >= 0; --i) {
191        AString uri;
192        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
193
194        if (itemMeta->findString("cipher-method", &method)) {
195            found = true;
196            break;
197        }
198    }
199
200    if (!found) {
201        method = "NONE";
202    }
203    buffer->meta()->setString("cipher-method", method.c_str());
204
205    if (method == "NONE") {
206        return OK;
207    } else if (!(method == "AES-128")) {
208        ALOGE("Unsupported cipher method '%s'", method.c_str());
209        return ERROR_UNSUPPORTED;
210    }
211
212    AString keyURI;
213    if (!itemMeta->findString("cipher-uri", &keyURI)) {
214        ALOGE("Missing key uri");
215        return ERROR_MALFORMED;
216    }
217
218    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
219
220    sp<ABuffer> key;
221    if (index >= 0) {
222        key = mAESKeyForURI.valueAt(index);
223    } else {
224        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
225
226        if (err < 0) {
227            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
228            return ERROR_IO;
229        } else if (key->size() != 16) {
230            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
231            return ERROR_MALFORMED;
232        }
233
234        mAESKeyForURI.add(keyURI, key);
235    }
236
237    AES_KEY aes_key;
238    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
239        ALOGE("failed to set AES decryption key.");
240        return UNKNOWN_ERROR;
241    }
242
243    size_t n = buffer->size();
244    if (!n) {
245        return OK;
246    }
247    CHECK(n % 16 == 0);
248
249    if (first) {
250        // If decrypting the first block in a file, read the iv from the manifest
251        // or derive the iv from the file's sequence number.
252
253        AString iv;
254        if (itemMeta->findString("cipher-iv", &iv)) {
255            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
256                    || iv.size() != 16 * 2 + 2) {
257                ALOGE("malformed cipher IV '%s'.", iv.c_str());
258                return ERROR_MALFORMED;
259            }
260
261            memset(mAESInitVec, 0, sizeof(mAESInitVec));
262            for (size_t i = 0; i < 16; ++i) {
263                char c1 = tolower(iv.c_str()[2 + 2 * i]);
264                char c2 = tolower(iv.c_str()[3 + 2 * i]);
265                if (!isxdigit(c1) || !isxdigit(c2)) {
266                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
267                    return ERROR_MALFORMED;
268                }
269                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
270                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
271
272                mAESInitVec[i] = nibble1 << 4 | nibble2;
273            }
274        } else {
275            memset(mAESInitVec, 0, sizeof(mAESInitVec));
276            mAESInitVec[15] = mSeqNumber & 0xff;
277            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
278            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
279            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
280        }
281    }
282
283    AES_cbc_encrypt(
284            buffer->data(), buffer->data(), buffer->size(),
285            &aes_key, mAESInitVec, AES_DECRYPT);
286
287    return OK;
288}
289
290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
291    status_t err;
292    AString method;
293    CHECK(buffer->meta()->findString("cipher-method", &method));
294    if (method == "NONE") {
295        return OK;
296    }
297
298    uint8_t padding = 0;
299    if (buffer->size() > 0) {
300        padding = buffer->data()[buffer->size() - 1];
301    }
302
303    if (padding > 16) {
304        return ERROR_MALFORMED;
305    }
306
307    for (size_t i = buffer->size() - padding; i < padding; i++) {
308        if (buffer->data()[i] != padding) {
309            return ERROR_MALFORMED;
310        }
311    }
312
313    buffer->setRange(buffer->offset(), buffer->size() - padding);
314    return OK;
315}
316
317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
318    int64_t maxDelayUs = delayUsToRefreshPlaylist();
319    if (maxDelayUs < minDelayUs) {
320        maxDelayUs = minDelayUs;
321    }
322    if (delayUs > maxDelayUs) {
323        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
324        delayUs = maxDelayUs;
325    }
326    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
327    msg->setInt32("generation", mMonitorQueueGeneration);
328    msg->post(delayUs);
329}
330
331void PlaylistFetcher::cancelMonitorQueue() {
332    ++mMonitorQueueGeneration;
333}
334
335void PlaylistFetcher::startAsync(
336        const sp<AnotherPacketSource> &audioSource,
337        const sp<AnotherPacketSource> &videoSource,
338        const sp<AnotherPacketSource> &subtitleSource,
339        int64_t startTimeUs,
340        int64_t segmentStartTimeUs,
341        int32_t startDiscontinuitySeq,
342        bool adaptive) {
343    sp<AMessage> msg = new AMessage(kWhatStart, id());
344
345    uint32_t streamTypeMask = 0ul;
346
347    if (audioSource != NULL) {
348        msg->setPointer("audioSource", audioSource.get());
349        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
350    }
351
352    if (videoSource != NULL) {
353        msg->setPointer("videoSource", videoSource.get());
354        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
355    }
356
357    if (subtitleSource != NULL) {
358        msg->setPointer("subtitleSource", subtitleSource.get());
359        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
360    }
361
362    msg->setInt32("streamTypeMask", streamTypeMask);
363    msg->setInt64("startTimeUs", startTimeUs);
364    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
365    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
366    msg->setInt32("adaptive", adaptive);
367    msg->post();
368}
369
370void PlaylistFetcher::pauseAsync() {
371    (new AMessage(kWhatPause, id()))->post();
372}
373
374void PlaylistFetcher::stopAsync(bool clear) {
375    sp<AMessage> msg = new AMessage(kWhatStop, id());
376    msg->setInt32("clear", clear);
377    msg->post();
378}
379
380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
381    AMessage* msg = new AMessage(kWhatResumeUntil, id());
382    msg->setMessage("params", params);
383    msg->post();
384}
385
386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
387    switch (msg->what()) {
388        case kWhatStart:
389        {
390            status_t err = onStart(msg);
391
392            sp<AMessage> notify = mNotify->dup();
393            notify->setInt32("what", kWhatStarted);
394            notify->setInt32("err", err);
395            notify->post();
396            break;
397        }
398
399        case kWhatPause:
400        {
401            onPause();
402
403            sp<AMessage> notify = mNotify->dup();
404            notify->setInt32("what", kWhatPaused);
405            notify->post();
406            break;
407        }
408
409        case kWhatStop:
410        {
411            onStop(msg);
412
413            sp<AMessage> notify = mNotify->dup();
414            notify->setInt32("what", kWhatStopped);
415            notify->post();
416            break;
417        }
418
419        case kWhatMonitorQueue:
420        case kWhatDownloadNext:
421        {
422            int32_t generation;
423            CHECK(msg->findInt32("generation", &generation));
424
425            if (generation != mMonitorQueueGeneration) {
426                // Stale event
427                break;
428            }
429
430            if (msg->what() == kWhatMonitorQueue) {
431                onMonitorQueue();
432            } else {
433                onDownloadNext();
434            }
435            break;
436        }
437
438        case kWhatResumeUntil:
439        {
440            onResumeUntil(msg);
441            break;
442        }
443
444        default:
445            TRESPASS();
446    }
447}
448
449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
450    mPacketSources.clear();
451
452    uint32_t streamTypeMask;
453    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
454
455    int64_t startTimeUs;
456    int64_t segmentStartTimeUs;
457    int32_t startDiscontinuitySeq;
458    int32_t adaptive;
459    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
460    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
461    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
462    CHECK(msg->findInt32("adaptive", &adaptive));
463
464    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
465        void *ptr;
466        CHECK(msg->findPointer("audioSource", &ptr));
467
468        mPacketSources.add(
469                LiveSession::STREAMTYPE_AUDIO,
470                static_cast<AnotherPacketSource *>(ptr));
471    }
472
473    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
474        void *ptr;
475        CHECK(msg->findPointer("videoSource", &ptr));
476
477        mPacketSources.add(
478                LiveSession::STREAMTYPE_VIDEO,
479                static_cast<AnotherPacketSource *>(ptr));
480    }
481
482    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
483        void *ptr;
484        CHECK(msg->findPointer("subtitleSource", &ptr));
485
486        mPacketSources.add(
487                LiveSession::STREAMTYPE_SUBTITLES,
488                static_cast<AnotherPacketSource *>(ptr));
489    }
490
491    mStreamTypeMask = streamTypeMask;
492
493    mStartTimeUs = startTimeUs;
494    mSegmentStartTimeUs = segmentStartTimeUs;
495    mDiscontinuitySeq = startDiscontinuitySeq;
496
497    if (mStartTimeUs >= 0ll) {
498        mSeqNumber = -1;
499        mStartup = true;
500        mPrepared = false;
501        mAdaptive = adaptive;
502    }
503
504    postMonitorQueue();
505
506    return OK;
507}
508
509void PlaylistFetcher::onPause() {
510    cancelMonitorQueue();
511}
512
513void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
514    cancelMonitorQueue();
515
516    int32_t clear;
517    CHECK(msg->findInt32("clear", &clear));
518    if (clear) {
519        for (size_t i = 0; i < mPacketSources.size(); i++) {
520            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
521            packetSource->clear();
522        }
523    }
524
525    mPacketSources.clear();
526    mStreamTypeMask = 0;
527}
528
529// Resume until we have reached the boundary timestamps listed in `msg`; when
530// the remaining time is too short (within a resume threshold) stop immediately
531// instead.
532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
533    sp<AMessage> params;
534    CHECK(msg->findMessage("params", &params));
535
536    bool stop = false;
537    for (size_t i = 0; i < mPacketSources.size(); i++) {
538        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
539
540        const char *stopKey;
541        int streamType = mPacketSources.keyAt(i);
542        switch (streamType) {
543        case LiveSession::STREAMTYPE_VIDEO:
544            stopKey = "timeUsVideo";
545            break;
546
547        case LiveSession::STREAMTYPE_AUDIO:
548            stopKey = "timeUsAudio";
549            break;
550
551        case LiveSession::STREAMTYPE_SUBTITLES:
552            stopKey = "timeUsSubtitle";
553            break;
554
555        default:
556            TRESPASS();
557        }
558
559        // Don't resume if we would stop within a resume threshold.
560        int32_t discontinuitySeq;
561        int64_t latestTimeUs = 0, stopTimeUs = 0;
562        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
563        if (latestMeta != NULL
564                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
565                && discontinuitySeq == mDiscontinuitySeq
566                && latestMeta->findInt64("timeUs", &latestTimeUs)
567                && params->findInt64(stopKey, &stopTimeUs)
568                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
569            stop = true;
570        }
571    }
572
573    if (stop) {
574        for (size_t i = 0; i < mPacketSources.size(); i++) {
575            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
576        }
577        stopAsync(/* clear = */ false);
578        return OK;
579    }
580
581    mStopParams = params;
582    postMonitorQueue();
583
584    return OK;
585}
586
587void PlaylistFetcher::notifyError(status_t err) {
588    sp<AMessage> notify = mNotify->dup();
589    notify->setInt32("what", kWhatError);
590    notify->setInt32("err", err);
591    notify->post();
592}
593
594void PlaylistFetcher::queueDiscontinuity(
595        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
596    for (size_t i = 0; i < mPacketSources.size(); ++i) {
597        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
598        // (seek will discard buffer by abandoning old fetchers)
599        mPacketSources.valueAt(i)->queueDiscontinuity(
600                type, extra, false /* discard */);
601    }
602}
603
604void PlaylistFetcher::onMonitorQueue() {
605    bool downloadMore = false;
606    refreshPlaylist();
607
608    int32_t targetDurationSecs;
609    int64_t targetDurationUs = kMinBufferedDurationUs;
610    if (mPlaylist != NULL) {
611        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
612        targetDurationUs = targetDurationSecs * 1000000ll;
613    }
614
615    // buffer at least 3 times the target duration, or up to 10 seconds
616    int64_t durationToBufferUs = targetDurationUs * 3;
617    if (durationToBufferUs > kMinBufferedDurationUs)  {
618        durationToBufferUs = kMinBufferedDurationUs;
619    }
620
621    int64_t bufferedDurationUs = 0ll;
622    status_t finalResult = NOT_ENOUGH_DATA;
623    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
624        sp<AnotherPacketSource> packetSource =
625            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
626
627        bufferedDurationUs =
628                packetSource->getBufferedDurationUs(&finalResult);
629        finalResult = OK;
630    } else {
631        // Use max stream duration to prevent us from waiting on a non-existent stream;
632        // when we cannot make out from the manifest what streams are included in a playlist
633        // we might assume extra streams.
634        for (size_t i = 0; i < mPacketSources.size(); ++i) {
635            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
636                continue;
637            }
638
639            int64_t bufferedStreamDurationUs =
640                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
641            ALOGV("buffered %" PRId64 " for stream %d",
642                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
643            if (bufferedStreamDurationUs > bufferedDurationUs) {
644                bufferedDurationUs = bufferedStreamDurationUs;
645            }
646        }
647    }
648    downloadMore = (bufferedDurationUs < durationToBufferUs);
649
650    // signal start if buffered up at least the target size
651    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
652        mPrepared = true;
653
654        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
655                bufferedDurationUs, targetDurationUs);
656        sp<AMessage> msg = mNotify->dup();
657        msg->setInt32("what", kWhatTemporarilyDoneFetching);
658        msg->post();
659    }
660
661    if (finalResult == OK && downloadMore) {
662        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
663                bufferedDurationUs, durationToBufferUs);
664        // delay the next download slightly; hopefully this gives other concurrent fetchers
665        // a better chance to run.
666        // onDownloadNext();
667        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
668        msg->setInt32("generation", mMonitorQueueGeneration);
669        msg->post(1000l);
670    } else {
671        // Nothing to do yet, try again in a second.
672
673        sp<AMessage> msg = mNotify->dup();
674        msg->setInt32("what", kWhatTemporarilyDoneFetching);
675        msg->post();
676
677        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
678        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
679                delayUs, bufferedDurationUs, durationToBufferUs);
680        // :TRICKY: need to enforce minimum delay because the delay to
681        // refresh the playlist will become 0
682        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
683    }
684}
685
686status_t PlaylistFetcher::refreshPlaylist() {
687    if (delayUsToRefreshPlaylist() <= 0) {
688        bool unchanged;
689        sp<M3UParser> playlist = mSession->fetchPlaylist(
690                mURI.c_str(), mPlaylistHash, &unchanged);
691
692        if (playlist == NULL) {
693            if (unchanged) {
694                // We succeeded in fetching the playlist, but it was
695                // unchanged from the last time we tried.
696
697                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
698                    mRefreshState = (RefreshState)(mRefreshState + 1);
699                }
700            } else {
701                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
702                notifyError(ERROR_IO);
703                return ERROR_IO;
704            }
705        } else {
706            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
707            mPlaylist = playlist;
708
709            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
710                updateDuration();
711            }
712        }
713
714        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
715    }
716    return OK;
717}
718
719// static
720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
721    return buffer->size() > 0 && buffer->data()[0] == 0x47;
722}
723
724void PlaylistFetcher::onDownloadNext() {
725    if (refreshPlaylist() != OK) {
726        return;
727    }
728
729    int32_t firstSeqNumberInPlaylist;
730    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
731                "media-sequence", &firstSeqNumberInPlaylist)) {
732        firstSeqNumberInPlaylist = 0;
733    }
734
735    bool discontinuity = false;
736
737    const int32_t lastSeqNumberInPlaylist =
738        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
739
740    if (mStartup && mSeqNumber >= 0
741            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
742        // in case we guessed wrong during reconfiguration, try fetching the latest content.
743        mSeqNumber = lastSeqNumberInPlaylist;
744    }
745
746    if (mDiscontinuitySeq < 0) {
747        mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
748    }
749
750    if (mSeqNumber < 0) {
751        CHECK_GE(mStartTimeUs, 0ll);
752
753        if (mSegmentStartTimeUs < 0) {
754            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
755                // If this is a live session, start 3 segments from the end on connect
756                mSeqNumber = lastSeqNumberInPlaylist - 3;
757            } else {
758                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
759                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
760            }
761            mStartTimeUsRelative = true;
762            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
763                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
764                    lastSeqNumberInPlaylist);
765        } else {
766            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
767            if (mAdaptive) {
768                // avoid double fetch/decode
769                mSeqNumber += 1;
770            }
771            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
772            if (mSeqNumber < minSeq) {
773                mSeqNumber = minSeq;
774            }
775
776            if (mSeqNumber < firstSeqNumberInPlaylist) {
777                mSeqNumber = firstSeqNumberInPlaylist;
778            }
779
780            if (mSeqNumber > lastSeqNumberInPlaylist) {
781                mSeqNumber = lastSeqNumberInPlaylist;
782            }
783            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
784                    mSeqNumber, firstSeqNumberInPlaylist,
785                    lastSeqNumberInPlaylist);
786        }
787    }
788
789    if (mSeqNumber < firstSeqNumberInPlaylist
790            || mSeqNumber > lastSeqNumberInPlaylist) {
791        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
792            ++mNumRetries;
793
794            if (mSeqNumber > lastSeqNumberInPlaylist) {
795                // refresh in increasing fraction (1/2, 1/3, ...) of the
796                // playlist's target duration or 3 seconds, whichever is less
797                int32_t targetDurationSecs;
798                CHECK(mPlaylist->meta()->findInt32(
799                        "target-duration", &targetDurationSecs));
800                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
801                        1000000ll / (1 + mNumRetries);
802                if (delayUs > kMaxMonitorDelayUs) {
803                    delayUs = kMaxMonitorDelayUs;
804                }
805                ALOGV("sequence number high: %d from (%d .. %d), "
806                      "monitor in %" PRId64 " (retry=%d)",
807                        mSeqNumber, firstSeqNumberInPlaylist,
808                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
809                postMonitorQueue(delayUs);
810                return;
811            }
812
813            // we've missed the boat, let's start from the lowest sequence
814            // number available and signal a discontinuity.
815
816            ALOGI("We've missed the boat, restarting playback."
817                  "  mStartup=%d, was  looking for %d in %d-%d",
818                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
819                    lastSeqNumberInPlaylist);
820            mSeqNumber = lastSeqNumberInPlaylist - 3;
821            if (mSeqNumber < firstSeqNumberInPlaylist) {
822                mSeqNumber = firstSeqNumberInPlaylist;
823            }
824            discontinuity = true;
825
826            // fall through
827        } else {
828            ALOGE("Cannot find sequence number %d in playlist "
829                 "(contains %d - %d)",
830                 mSeqNumber, firstSeqNumberInPlaylist,
831                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
832
833            notifyError(ERROR_END_OF_STREAM);
834            return;
835        }
836    }
837
838    mNumRetries = 0;
839
840    AString uri;
841    sp<AMessage> itemMeta;
842    CHECK(mPlaylist->itemAt(
843                mSeqNumber - firstSeqNumberInPlaylist,
844                &uri,
845                &itemMeta));
846
847    int32_t val;
848    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
849        mDiscontinuitySeq++;
850        discontinuity = true;
851    }
852
853    int64_t range_offset, range_length;
854    if (!itemMeta->findInt64("range-offset", &range_offset)
855            || !itemMeta->findInt64("range-length", &range_length)) {
856        range_offset = 0;
857        range_length = -1;
858    }
859
860    ALOGV("fetching segment %d from (%d .. %d)",
861          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
862
863    ALOGV("fetching '%s'", uri.c_str());
864
865    sp<DataSource> source;
866    sp<ABuffer> buffer, tsBuffer;
867    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
868    // this avoids interleaved connections to the key and segment file.
869    {
870        sp<ABuffer> junk = new ABuffer(16);
871        junk->setRange(0, 16);
872        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
873                true /* first */);
874        if (err != OK) {
875            notifyError(err);
876            return;
877        }
878    }
879
880    // block-wise download
881    bool startup = mStartup;
882    ssize_t bytesRead;
883    do {
884        bytesRead = mSession->fetchFile(
885                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
886
887        if (bytesRead < 0) {
888            status_t err = bytesRead;
889            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
890            notifyError(err);
891            return;
892        }
893
894        CHECK(buffer != NULL);
895
896        size_t size = buffer->size();
897        // Set decryption range.
898        buffer->setRange(size - bytesRead, bytesRead);
899        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
900                buffer->offset() == 0 /* first */);
901        // Unset decryption range.
902        buffer->setRange(0, size);
903
904        if (err != OK) {
905            ALOGE("decryptBuffer failed w/ error %d", err);
906
907            notifyError(err);
908            return;
909        }
910
911        if (startup || discontinuity) {
912            // Signal discontinuity.
913
914            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
915                // If this was a live event this made no sense since
916                // we don't have access to all the segment before the current
917                // one.
918                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
919            }
920
921            if (discontinuity) {
922                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
923
924                queueDiscontinuity(
925                        ATSParser::DISCONTINUITY_FORMATCHANGE,
926                        NULL /* extra */);
927
928                discontinuity = false;
929            }
930
931            startup = false;
932        }
933
934        err = OK;
935        if (bufferStartsWithTsSyncByte(buffer)) {
936            // Incremental extraction is only supported for MPEG2 transport streams.
937            if (tsBuffer == NULL) {
938                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
939                tsBuffer->setRange(0, 0);
940            } else if (tsBuffer->capacity() != buffer->capacity()) {
941                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
942                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
943                tsBuffer->setRange(tsOff, tsSize);
944            }
945            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
946
947            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
948        }
949
950        if (err == -EAGAIN) {
951            // starting sequence number too low
952            mTSParser.clear();
953            postMonitorQueue();
954            return;
955        } else if (err == ERROR_OUT_OF_RANGE) {
956            // reached stopping point
957            stopAsync(/* clear = */ false);
958            return;
959        } else if (err != OK) {
960            notifyError(err);
961            return;
962        }
963
964    } while (bytesRead != 0);
965
966    if (bufferStartsWithTsSyncByte(buffer)) {
967        // If we still don't see a stream after fetching a full ts segment mark it as
968        // nonexistent.
969        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
970        ATSParser::SourceType srcTypes[kNumTypes] =
971                { ATSParser::VIDEO, ATSParser::AUDIO };
972        LiveSession::StreamType streamTypes[kNumTypes] =
973                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
974
975        for (size_t i = 0; i < kNumTypes; i++) {
976            ATSParser::SourceType srcType = srcTypes[i];
977            LiveSession::StreamType streamType = streamTypes[i];
978
979            sp<AnotherPacketSource> source =
980                static_cast<AnotherPacketSource *>(
981                    mTSParser->getSource(srcType).get());
982
983            if (source == NULL) {
984                ALOGW("MPEG2 Transport stream does not contain %s data.",
985                      srcType == ATSParser::VIDEO ? "video" : "audio");
986
987                mStreamTypeMask &= ~streamType;
988                mPacketSources.removeItem(streamType);
989            }
990        }
991
992    }
993
994    if (checkDecryptPadding(buffer) != OK) {
995        ALOGE("Incorrect padding bytes after decryption.");
996        notifyError(ERROR_MALFORMED);
997        return;
998    }
999
1000    status_t err = OK;
1001    if (tsBuffer != NULL) {
1002        AString method;
1003        CHECK(buffer->meta()->findString("cipher-method", &method));
1004        if ((tsBuffer->size() > 0 && method == "NONE")
1005                || tsBuffer->size() > 16) {
1006            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1007                    "bytes in length.");
1008            notifyError(ERROR_MALFORMED);
1009            return;
1010        }
1011    }
1012
1013    // bulk extract non-ts files
1014    if (tsBuffer == NULL) {
1015      err = extractAndQueueAccessUnits(buffer, itemMeta);
1016    }
1017
1018    if (err != OK) {
1019        notifyError(err);
1020        return;
1021    }
1022
1023    mStartup = false;
1024    ++mSeqNumber;
1025
1026    postMonitorQueue();
1027}
1028
1029int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1030    int32_t firstSeqNumberInPlaylist;
1031    if (mPlaylist->meta() == NULL
1032            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1033        firstSeqNumberInPlaylist = 0;
1034    }
1035
1036    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1037    if (discontinuitySeq < curDiscontinuitySeq) {
1038        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1039    }
1040
1041    size_t index = 0;
1042    while (index < mPlaylist->size()) {
1043        sp<AMessage> itemMeta;
1044        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1045
1046        int64_t discontinuity;
1047        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1048            curDiscontinuitySeq++;
1049        }
1050
1051        if (curDiscontinuitySeq == discontinuitySeq) {
1052            return firstSeqNumberInPlaylist + index;
1053        }
1054
1055        ++index;
1056    }
1057
1058    return firstSeqNumberInPlaylist + mPlaylist->size();
1059}
1060
1061int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1062    int32_t firstSeqNumberInPlaylist;
1063    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1064                "media-sequence", &firstSeqNumberInPlaylist)) {
1065        firstSeqNumberInPlaylist = 0;
1066    }
1067
1068    size_t index = 0;
1069    int64_t segmentStartUs = 0;
1070    while (index < mPlaylist->size()) {
1071        sp<AMessage> itemMeta;
1072        CHECK(mPlaylist->itemAt(
1073                    index, NULL /* uri */, &itemMeta));
1074
1075        int64_t itemDurationUs;
1076        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1077
1078        if (timeUs < segmentStartUs + itemDurationUs) {
1079            break;
1080        }
1081
1082        segmentStartUs += itemDurationUs;
1083        ++index;
1084    }
1085
1086    if (index >= mPlaylist->size()) {
1087        index = mPlaylist->size() - 1;
1088    }
1089
1090    return firstSeqNumberInPlaylist + index;
1091}
1092
1093const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1094        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1095    sp<MetaData> format = source->getFormat();
1096    if (format != NULL) {
1097        // for simplicity, store a reference to the format in each unit
1098        accessUnit->meta()->setObject("format", format);
1099    }
1100
1101    if (discard) {
1102        accessUnit->meta()->setInt32("discard", discard);
1103    }
1104
1105    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1106    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1107    return accessUnit;
1108}
1109
1110status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1111    if (mTSParser == NULL) {
1112        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1113        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1114    }
1115
1116    if (mNextPTSTimeUs >= 0ll) {
1117        sp<AMessage> extra = new AMessage;
1118        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1119        // ATSParser from skewing the timestamps of access units.
1120        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1121
1122        mTSParser->signalDiscontinuity(
1123                ATSParser::DISCONTINUITY_SEEK, extra);
1124
1125        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1126        mNextPTSTimeUs = -1ll;
1127        mFirstPTSValid = false;
1128    }
1129
1130    size_t offset = 0;
1131    while (offset + 188 <= buffer->size()) {
1132        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1133
1134        if (err != OK) {
1135            return err;
1136        }
1137
1138        offset += 188;
1139    }
1140    // setRange to indicate consumed bytes.
1141    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1142
1143    status_t err = OK;
1144    for (size_t i = mPacketSources.size(); i-- > 0;) {
1145        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1146
1147        const char *key;
1148        ATSParser::SourceType type;
1149        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1150        switch (stream) {
1151            case LiveSession::STREAMTYPE_VIDEO:
1152                type = ATSParser::VIDEO;
1153                key = "timeUsVideo";
1154                break;
1155
1156            case LiveSession::STREAMTYPE_AUDIO:
1157                type = ATSParser::AUDIO;
1158                key = "timeUsAudio";
1159                break;
1160
1161            case LiveSession::STREAMTYPE_SUBTITLES:
1162            {
1163                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1164                return ERROR_MALFORMED;
1165                break;
1166            }
1167
1168            default:
1169                TRESPASS();
1170        }
1171
1172        sp<AnotherPacketSource> source =
1173            static_cast<AnotherPacketSource *>(
1174                    mTSParser->getSource(type).get());
1175
1176        if (source == NULL) {
1177            continue;
1178        }
1179
1180        int64_t timeUs;
1181        sp<ABuffer> accessUnit;
1182        status_t finalResult;
1183        while (source->hasBufferAvailable(&finalResult)
1184                && source->dequeueAccessUnit(&accessUnit) == OK) {
1185
1186            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1187
1188            if (mStartup) {
1189                if (!mFirstPTSValid) {
1190                    mFirstTimeUs = timeUs;
1191                    mFirstPTSValid = true;
1192                }
1193                if (mStartTimeUsRelative) {
1194                    timeUs -= mFirstTimeUs;
1195                    if (timeUs < 0) {
1196                        timeUs = 0;
1197                    }
1198                } else if (mAdaptive && timeUs > mStartTimeUs) {
1199                    int32_t seq;
1200                    if (mStartTimeUsNotify != NULL
1201                            && !mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1202                        mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1203                    }
1204                    int64_t startTimeUs;
1205                    if (mStartTimeUsNotify != NULL
1206                            && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1207                        mStartTimeUsNotify->setInt64(key, timeUs);
1208
1209                        uint32_t streamMask = 0;
1210                        mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1211                        streamMask |= mPacketSources.keyAt(i);
1212                        mStartTimeUsNotify->setInt32("streamMask", streamMask);
1213
1214                        if (streamMask == mStreamTypeMask) {
1215                            mStartTimeUsNotify->post();
1216                            mStartTimeUsNotify.clear();
1217                        }
1218                    }
1219                }
1220
1221                if (timeUs < mStartTimeUs) {
1222                    if (mAdaptive) {
1223                        int32_t targetDuration;
1224                        mPlaylist->meta()->findInt32("target-duration", &targetDuration);
1225                        int32_t incr = (mStartTimeUs - timeUs) / 1000000 / targetDuration;
1226                        if (incr == 0) {
1227                            // increment mSeqNumber by at least one
1228                            incr = 1;
1229                        }
1230                        mSeqNumber += incr;
1231                        err = -EAGAIN;
1232                        break;
1233                    } else {
1234                        // buffer up to the closest preceding IDR frame
1235                        ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1236                                timeUs, mStartTimeUs);
1237                        const char *mime;
1238                        sp<MetaData> format  = source->getFormat();
1239                        bool isAvc = false;
1240                        if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1241                                && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1242                            isAvc = true;
1243                        }
1244                        if (isAvc && IsIDR(accessUnit)) {
1245                            mVideoBuffer->clear();
1246                        }
1247                        if (isAvc) {
1248                            mVideoBuffer->queueAccessUnit(accessUnit);
1249                        }
1250
1251                        continue;
1252                    }
1253                }
1254            }
1255
1256            if (mStopParams != NULL) {
1257                // Queue discontinuity in original stream.
1258                int32_t discontinuitySeq;
1259                int64_t stopTimeUs;
1260                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1261                        || discontinuitySeq > mDiscontinuitySeq
1262                        || !mStopParams->findInt64(key, &stopTimeUs)
1263                        || (discontinuitySeq == mDiscontinuitySeq
1264                                && timeUs >= stopTimeUs)) {
1265                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1266                    mStreamTypeMask &= ~stream;
1267                    mPacketSources.removeItemsAt(i);
1268                    break;
1269                }
1270            }
1271
1272            // Note that we do NOT dequeue any discontinuities except for format change.
1273            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1274                const bool discard = true;
1275                status_t status;
1276                while (mVideoBuffer->hasBufferAvailable(&status)) {
1277                    sp<ABuffer> videoBuffer;
1278                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1279                    setAccessUnitProperties(videoBuffer, source, discard);
1280                    packetSource->queueAccessUnit(videoBuffer);
1281                }
1282            }
1283
1284            setAccessUnitProperties(accessUnit, source);
1285            packetSource->queueAccessUnit(accessUnit);
1286        }
1287
1288        if (err != OK) {
1289            break;
1290        }
1291    }
1292
1293    if (err != OK) {
1294        for (size_t i = mPacketSources.size(); i-- > 0;) {
1295            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1296            packetSource->clear();
1297        }
1298        return err;
1299    }
1300
1301    if (!mStreamTypeMask) {
1302        // Signal gap is filled between original and new stream.
1303        ALOGV("ERROR OUT OF RANGE");
1304        return ERROR_OUT_OF_RANGE;
1305    }
1306
1307    return OK;
1308}
1309
1310/* static */
1311bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1312        const sp<ABuffer> &buffer) {
1313    size_t pos = 0;
1314
1315    // skip possible BOM
1316    if (buffer->size() >= pos + 3 &&
1317            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1318        pos += 3;
1319    }
1320
1321    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1322    if (buffer->size() < pos + 6 ||
1323            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1324        return false;
1325    }
1326    pos += 6;
1327
1328    if (buffer->size() == pos) {
1329        return true;
1330    }
1331
1332    uint8_t sep = buffer->data()[pos];
1333    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1334}
1335
1336status_t PlaylistFetcher::extractAndQueueAccessUnits(
1337        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1338    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1339        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1340            ALOGE("This stream only contains subtitles.");
1341            return ERROR_MALFORMED;
1342        }
1343
1344        const sp<AnotherPacketSource> packetSource =
1345            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1346
1347        int64_t durationUs;
1348        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1349        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1350        buffer->meta()->setInt64("durationUs", durationUs);
1351        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1352        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1353
1354        packetSource->queueAccessUnit(buffer);
1355        return OK;
1356    }
1357
1358    if (mNextPTSTimeUs >= 0ll) {
1359        mFirstPTSValid = false;
1360        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1361        mNextPTSTimeUs = -1ll;
1362    }
1363
1364    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1365    // stream prefixed by an ID3 tag.
1366
1367    bool firstID3Tag = true;
1368    uint64_t PTS = 0;
1369
1370    for (;;) {
1371        // Make sure to skip all ID3 tags preceding the audio data.
1372        // At least one must be present to provide the PTS timestamp.
1373
1374        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1375        if (!id3.isValid()) {
1376            if (firstID3Tag) {
1377                ALOGE("Unable to parse ID3 tag.");
1378                return ERROR_MALFORMED;
1379            } else {
1380                break;
1381            }
1382        }
1383
1384        if (firstID3Tag) {
1385            bool found = false;
1386
1387            ID3::Iterator it(id3, "PRIV");
1388            while (!it.done()) {
1389                size_t length;
1390                const uint8_t *data = it.getData(&length);
1391
1392                static const char *kMatchName =
1393                    "com.apple.streaming.transportStreamTimestamp";
1394                static const size_t kMatchNameLen = strlen(kMatchName);
1395
1396                if (length == kMatchNameLen + 1 + 8
1397                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1398                    found = true;
1399                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1400                }
1401
1402                it.next();
1403            }
1404
1405            if (!found) {
1406                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1407                return ERROR_MALFORMED;
1408            }
1409        }
1410
1411        // skip the ID3 tag
1412        buffer->setRange(
1413                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1414
1415        firstID3Tag = false;
1416    }
1417
1418    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1419        ALOGW("This stream only contains audio data!");
1420
1421        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1422
1423        if (mStreamTypeMask == 0) {
1424            return OK;
1425        }
1426    }
1427
1428    sp<AnotherPacketSource> packetSource =
1429        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1430
1431    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1432        ABitReader bits(buffer->data(), buffer->size());
1433
1434        // adts_fixed_header
1435
1436        CHECK_EQ(bits.getBits(12), 0xfffu);
1437        bits.skipBits(3);  // ID, layer
1438        bool protection_absent = bits.getBits(1) != 0;
1439
1440        unsigned profile = bits.getBits(2);
1441        CHECK_NE(profile, 3u);
1442        unsigned sampling_freq_index = bits.getBits(4);
1443        bits.getBits(1);  // private_bit
1444        unsigned channel_configuration = bits.getBits(3);
1445        CHECK_NE(channel_configuration, 0u);
1446        bits.skipBits(2);  // original_copy, home
1447
1448        sp<MetaData> meta = MakeAACCodecSpecificData(
1449                profile, sampling_freq_index, channel_configuration);
1450
1451        meta->setInt32(kKeyIsADTS, true);
1452
1453        packetSource->setFormat(meta);
1454    }
1455
1456    int64_t numSamples = 0ll;
1457    int32_t sampleRate;
1458    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1459
1460    int64_t timeUs = (PTS * 100ll) / 9ll;
1461    if (!mFirstPTSValid) {
1462        mFirstPTSValid = true;
1463        mFirstTimeUs = timeUs;
1464    }
1465
1466    size_t offset = 0;
1467    while (offset < buffer->size()) {
1468        const uint8_t *adtsHeader = buffer->data() + offset;
1469        CHECK_LT(offset + 5, buffer->size());
1470
1471        unsigned aac_frame_length =
1472            ((adtsHeader[3] & 3) << 11)
1473            | (adtsHeader[4] << 3)
1474            | (adtsHeader[5] >> 5);
1475
1476        if (aac_frame_length == 0) {
1477            const uint8_t *id3Header = adtsHeader;
1478            if (!memcmp(id3Header, "ID3", 3)) {
1479                ID3 id3(id3Header, buffer->size() - offset, true);
1480                if (id3.isValid()) {
1481                    offset += id3.rawSize();
1482                    continue;
1483                };
1484            }
1485            return ERROR_MALFORMED;
1486        }
1487
1488        CHECK_LE(offset + aac_frame_length, buffer->size());
1489
1490        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1491        offset += aac_frame_length;
1492
1493        // Each AAC frame encodes 1024 samples.
1494        numSamples += 1024;
1495
1496        if (mStartup) {
1497            int64_t startTimeUs = unitTimeUs;
1498            if (mStartTimeUsRelative) {
1499                startTimeUs -= mFirstTimeUs;
1500                if (startTimeUs  < 0) {
1501                    startTimeUs = 0;
1502                }
1503            }
1504            if (startTimeUs < mStartTimeUs) {
1505                continue;
1506            }
1507        }
1508
1509        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1510        memcpy(unit->data(), adtsHeader, aac_frame_length);
1511
1512        unit->meta()->setInt64("timeUs", unitTimeUs);
1513        unit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1514        unit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1515        packetSource->queueAccessUnit(unit);
1516    }
1517
1518    return OK;
1519}
1520
1521void PlaylistFetcher::updateDuration() {
1522    int64_t durationUs = 0ll;
1523    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1524        sp<AMessage> itemMeta;
1525        CHECK(mPlaylist->itemAt(
1526                    index, NULL /* uri */, &itemMeta));
1527
1528        int64_t itemDurationUs;
1529        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1530
1531        durationUs += itemDurationUs;
1532    }
1533
1534    sp<AMessage> msg = mNotify->dup();
1535    msg->setInt32("what", kWhatDurationUpdate);
1536    msg->setInt64("durationUs", durationUs);
1537    msg->post();
1538}
1539
1540int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1541    int64_t durationUs, threshold;
1542    if (msg->findInt64("durationUs", &durationUs)) {
1543        return kNumSkipFrames * durationUs;
1544    }
1545
1546    sp<RefBase> obj;
1547    msg->findObject("format", &obj);
1548    MetaData *format = static_cast<MetaData *>(obj.get());
1549
1550    const char *mime;
1551    CHECK(format->findCString(kKeyMIMEType, &mime));
1552    bool audio = !strncasecmp(mime, "audio/", 6);
1553    if (audio) {
1554        // Assumes 1000 samples per frame.
1555        int32_t sampleRate;
1556        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1557        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1558                * (1000000 / sampleRate) /* sample duration (us) */;
1559    } else {
1560        int32_t frameRate;
1561        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1562            return kNumSkipFrames * (1000000 / frameRate);
1563        }
1564    }
1565
1566    return 500000ll;
1567}
1568
1569}  // namespace android
1570