PlaylistFetcher.cpp revision f78f62bd6b0a99747db53828d281a50b9270a646
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mSegmentStartTimeUs(-1ll),
66      mDiscontinuitySeq(-1ll),
67      mStartTimeUsRelative(false),
68      mLastPlaylistFetchTimeUs(-1ll),
69      mSeqNumber(-1),
70      mNumRetries(0),
71      mStartup(true),
72      mAdaptive(false),
73      mPrepared(false),
74      mNextPTSTimeUs(-1ll),
75      mMonitorQueueGeneration(0),
76      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
77      mFirstPTSValid(false),
78      mAbsoluteTimeAnchorUs(0ll),
79      mVideoBuffer(new AnotherPacketSource(NULL)) {
80    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
81    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
82    mStartTimeUsNotify->setInt32("streamMask", 0);
83}
84
85PlaylistFetcher::~PlaylistFetcher() {
86}
87
88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
89    CHECK(mPlaylist != NULL);
90
91    int32_t firstSeqNumberInPlaylist;
92    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
93                "media-sequence", &firstSeqNumberInPlaylist)) {
94        firstSeqNumberInPlaylist = 0;
95    }
96
97    int32_t lastSeqNumberInPlaylist =
98        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
99
100    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
101    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
102
103    int64_t segmentStartUs = 0ll;
104    for (int32_t index = 0;
105            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
106        sp<AMessage> itemMeta;
107        CHECK(mPlaylist->itemAt(
108                    index, NULL /* uri */, &itemMeta));
109
110        int64_t itemDurationUs;
111        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
112
113        segmentStartUs += itemDurationUs;
114    }
115
116    return segmentStartUs;
117}
118
119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
120    int64_t nowUs = ALooper::GetNowUs();
121
122    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
123        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
124        return 0ll;
125    }
126
127    if (mPlaylist->isComplete()) {
128        return (~0llu >> 1);
129    }
130
131    int32_t targetDurationSecs;
132    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
133
134    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
135
136    int64_t minPlaylistAgeUs;
137
138    switch (mRefreshState) {
139        case INITIAL_MINIMUM_RELOAD_DELAY:
140        {
141            size_t n = mPlaylist->size();
142            if (n > 0) {
143                sp<AMessage> itemMeta;
144                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
145
146                int64_t itemDurationUs;
147                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
148
149                minPlaylistAgeUs = itemDurationUs;
150                break;
151            }
152
153            // fall through
154        }
155
156        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
157        {
158            minPlaylistAgeUs = targetDurationUs / 2;
159            break;
160        }
161
162        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
163        {
164            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
165            break;
166        }
167
168        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
169        {
170            minPlaylistAgeUs = targetDurationUs * 3;
171            break;
172        }
173
174        default:
175            TRESPASS();
176            break;
177    }
178
179    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
180    return delayUs > 0ll ? delayUs : 0ll;
181}
182
183status_t PlaylistFetcher::decryptBuffer(
184        size_t playlistIndex, const sp<ABuffer> &buffer,
185        bool first) {
186    sp<AMessage> itemMeta;
187    bool found = false;
188    AString method;
189
190    for (ssize_t i = playlistIndex; i >= 0; --i) {
191        AString uri;
192        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
193
194        if (itemMeta->findString("cipher-method", &method)) {
195            found = true;
196            break;
197        }
198    }
199
200    if (!found) {
201        method = "NONE";
202    }
203    buffer->meta()->setString("cipher-method", method.c_str());
204
205    if (method == "NONE") {
206        return OK;
207    } else if (!(method == "AES-128")) {
208        ALOGE("Unsupported cipher method '%s'", method.c_str());
209        return ERROR_UNSUPPORTED;
210    }
211
212    AString keyURI;
213    if (!itemMeta->findString("cipher-uri", &keyURI)) {
214        ALOGE("Missing key uri");
215        return ERROR_MALFORMED;
216    }
217
218    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
219
220    sp<ABuffer> key;
221    if (index >= 0) {
222        key = mAESKeyForURI.valueAt(index);
223    } else {
224        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
225
226        if (err < 0) {
227            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
228            return ERROR_IO;
229        } else if (key->size() != 16) {
230            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
231            return ERROR_MALFORMED;
232        }
233
234        mAESKeyForURI.add(keyURI, key);
235    }
236
237    AES_KEY aes_key;
238    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
239        ALOGE("failed to set AES decryption key.");
240        return UNKNOWN_ERROR;
241    }
242
243    size_t n = buffer->size();
244    if (!n) {
245        return OK;
246    }
247    CHECK(n % 16 == 0);
248
249    if (first) {
250        // If decrypting the first block in a file, read the iv from the manifest
251        // or derive the iv from the file's sequence number.
252
253        AString iv;
254        if (itemMeta->findString("cipher-iv", &iv)) {
255            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
256                    || iv.size() != 16 * 2 + 2) {
257                ALOGE("malformed cipher IV '%s'.", iv.c_str());
258                return ERROR_MALFORMED;
259            }
260
261            memset(mAESInitVec, 0, sizeof(mAESInitVec));
262            for (size_t i = 0; i < 16; ++i) {
263                char c1 = tolower(iv.c_str()[2 + 2 * i]);
264                char c2 = tolower(iv.c_str()[3 + 2 * i]);
265                if (!isxdigit(c1) || !isxdigit(c2)) {
266                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
267                    return ERROR_MALFORMED;
268                }
269                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
270                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
271
272                mAESInitVec[i] = nibble1 << 4 | nibble2;
273            }
274        } else {
275            memset(mAESInitVec, 0, sizeof(mAESInitVec));
276            mAESInitVec[15] = mSeqNumber & 0xff;
277            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
278            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
279            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
280        }
281    }
282
283    AES_cbc_encrypt(
284            buffer->data(), buffer->data(), buffer->size(),
285            &aes_key, mAESInitVec, AES_DECRYPT);
286
287    return OK;
288}
289
290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
291    status_t err;
292    AString method;
293    CHECK(buffer->meta()->findString("cipher-method", &method));
294    if (method == "NONE") {
295        return OK;
296    }
297
298    uint8_t padding = 0;
299    if (buffer->size() > 0) {
300        padding = buffer->data()[buffer->size() - 1];
301    }
302
303    if (padding > 16) {
304        return ERROR_MALFORMED;
305    }
306
307    for (size_t i = buffer->size() - padding; i < padding; i++) {
308        if (buffer->data()[i] != padding) {
309            return ERROR_MALFORMED;
310        }
311    }
312
313    buffer->setRange(buffer->offset(), buffer->size() - padding);
314    return OK;
315}
316
317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
318    int64_t maxDelayUs = delayUsToRefreshPlaylist();
319    if (maxDelayUs < minDelayUs) {
320        maxDelayUs = minDelayUs;
321    }
322    if (delayUs > maxDelayUs) {
323        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
324        delayUs = maxDelayUs;
325    }
326    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
327    msg->setInt32("generation", mMonitorQueueGeneration);
328    msg->post(delayUs);
329}
330
331void PlaylistFetcher::cancelMonitorQueue() {
332    ++mMonitorQueueGeneration;
333}
334
335void PlaylistFetcher::startAsync(
336        const sp<AnotherPacketSource> &audioSource,
337        const sp<AnotherPacketSource> &videoSource,
338        const sp<AnotherPacketSource> &subtitleSource,
339        int64_t startTimeUs,
340        int64_t segmentStartTimeUs,
341        int32_t startDiscontinuitySeq,
342        bool adaptive) {
343    sp<AMessage> msg = new AMessage(kWhatStart, id());
344
345    uint32_t streamTypeMask = 0ul;
346
347    if (audioSource != NULL) {
348        msg->setPointer("audioSource", audioSource.get());
349        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
350    }
351
352    if (videoSource != NULL) {
353        msg->setPointer("videoSource", videoSource.get());
354        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
355    }
356
357    if (subtitleSource != NULL) {
358        msg->setPointer("subtitleSource", subtitleSource.get());
359        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
360    }
361
362    msg->setInt32("streamTypeMask", streamTypeMask);
363    msg->setInt64("startTimeUs", startTimeUs);
364    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
365    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
366    msg->setInt32("adaptive", adaptive);
367    msg->post();
368}
369
370void PlaylistFetcher::pauseAsync() {
371    (new AMessage(kWhatPause, id()))->post();
372}
373
374void PlaylistFetcher::stopAsync(bool clear) {
375    sp<AMessage> msg = new AMessage(kWhatStop, id());
376    msg->setInt32("clear", clear);
377    msg->post();
378}
379
380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
381    AMessage* msg = new AMessage(kWhatResumeUntil, id());
382    msg->setMessage("params", params);
383    msg->post();
384}
385
386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
387    switch (msg->what()) {
388        case kWhatStart:
389        {
390            status_t err = onStart(msg);
391
392            sp<AMessage> notify = mNotify->dup();
393            notify->setInt32("what", kWhatStarted);
394            notify->setInt32("err", err);
395            notify->post();
396            break;
397        }
398
399        case kWhatPause:
400        {
401            onPause();
402
403            sp<AMessage> notify = mNotify->dup();
404            notify->setInt32("what", kWhatPaused);
405            notify->post();
406            break;
407        }
408
409        case kWhatStop:
410        {
411            onStop(msg);
412
413            sp<AMessage> notify = mNotify->dup();
414            notify->setInt32("what", kWhatStopped);
415            notify->post();
416            break;
417        }
418
419        case kWhatMonitorQueue:
420        case kWhatDownloadNext:
421        {
422            int32_t generation;
423            CHECK(msg->findInt32("generation", &generation));
424
425            if (generation != mMonitorQueueGeneration) {
426                // Stale event
427                break;
428            }
429
430            if (msg->what() == kWhatMonitorQueue) {
431                onMonitorQueue();
432            } else {
433                onDownloadNext();
434            }
435            break;
436        }
437
438        case kWhatResumeUntil:
439        {
440            onResumeUntil(msg);
441            break;
442        }
443
444        default:
445            TRESPASS();
446    }
447}
448
449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
450    mPacketSources.clear();
451
452    uint32_t streamTypeMask;
453    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
454
455    int64_t startTimeUs;
456    int64_t segmentStartTimeUs;
457    int32_t startDiscontinuitySeq;
458    int32_t adaptive;
459    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
460    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
461    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
462    CHECK(msg->findInt32("adaptive", &adaptive));
463
464    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
465        void *ptr;
466        CHECK(msg->findPointer("audioSource", &ptr));
467
468        mPacketSources.add(
469                LiveSession::STREAMTYPE_AUDIO,
470                static_cast<AnotherPacketSource *>(ptr));
471    }
472
473    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
474        void *ptr;
475        CHECK(msg->findPointer("videoSource", &ptr));
476
477        mPacketSources.add(
478                LiveSession::STREAMTYPE_VIDEO,
479                static_cast<AnotherPacketSource *>(ptr));
480    }
481
482    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
483        void *ptr;
484        CHECK(msg->findPointer("subtitleSource", &ptr));
485
486        mPacketSources.add(
487                LiveSession::STREAMTYPE_SUBTITLES,
488                static_cast<AnotherPacketSource *>(ptr));
489    }
490
491    mStreamTypeMask = streamTypeMask;
492
493    mSegmentStartTimeUs = segmentStartTimeUs;
494    mDiscontinuitySeq = startDiscontinuitySeq;
495
496    if (startTimeUs >= 0) {
497        mStartTimeUs = startTimeUs;
498        mSeqNumber = -1;
499        mStartup = true;
500        mPrepared = false;
501        mAdaptive = adaptive;
502    }
503
504    postMonitorQueue();
505
506    return OK;
507}
508
509void PlaylistFetcher::onPause() {
510    cancelMonitorQueue();
511}
512
513void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
514    cancelMonitorQueue();
515
516    int32_t clear;
517    CHECK(msg->findInt32("clear", &clear));
518    if (clear) {
519        for (size_t i = 0; i < mPacketSources.size(); i++) {
520            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
521            packetSource->clear();
522        }
523    }
524
525    mPacketSources.clear();
526    mStreamTypeMask = 0;
527}
528
529// Resume until we have reached the boundary timestamps listed in `msg`; when
530// the remaining time is too short (within a resume threshold) stop immediately
531// instead.
532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
533    sp<AMessage> params;
534    CHECK(msg->findMessage("params", &params));
535
536    bool stop = false;
537    for (size_t i = 0; i < mPacketSources.size(); i++) {
538        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
539
540        const char *stopKey;
541        int streamType = mPacketSources.keyAt(i);
542        switch (streamType) {
543        case LiveSession::STREAMTYPE_VIDEO:
544            stopKey = "timeUsVideo";
545            break;
546
547        case LiveSession::STREAMTYPE_AUDIO:
548            stopKey = "timeUsAudio";
549            break;
550
551        case LiveSession::STREAMTYPE_SUBTITLES:
552            stopKey = "timeUsSubtitle";
553            break;
554
555        default:
556            TRESPASS();
557        }
558
559        // Don't resume if we would stop within a resume threshold.
560        int32_t discontinuitySeq;
561        int64_t latestTimeUs = 0, stopTimeUs = 0;
562        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
563        if (latestMeta != NULL
564                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
565                && discontinuitySeq == mDiscontinuitySeq
566                && latestMeta->findInt64("timeUs", &latestTimeUs)
567                && params->findInt64(stopKey, &stopTimeUs)
568                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
569            stop = true;
570        }
571    }
572
573    if (stop) {
574        for (size_t i = 0; i < mPacketSources.size(); i++) {
575            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
576        }
577        stopAsync(/* clear = */ false);
578        return OK;
579    }
580
581    mStopParams = params;
582    postMonitorQueue();
583
584    return OK;
585}
586
587void PlaylistFetcher::notifyError(status_t err) {
588    sp<AMessage> notify = mNotify->dup();
589    notify->setInt32("what", kWhatError);
590    notify->setInt32("err", err);
591    notify->post();
592}
593
594void PlaylistFetcher::queueDiscontinuity(
595        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
596    for (size_t i = 0; i < mPacketSources.size(); ++i) {
597        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
598        // (seek will discard buffer by abandoning old fetchers)
599        mPacketSources.valueAt(i)->queueDiscontinuity(
600                type, extra, false /* discard */);
601    }
602}
603
604void PlaylistFetcher::onMonitorQueue() {
605    bool downloadMore = false;
606    refreshPlaylist();
607
608    int32_t targetDurationSecs;
609    int64_t targetDurationUs = kMinBufferedDurationUs;
610    if (mPlaylist != NULL) {
611        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
612        targetDurationUs = targetDurationSecs * 1000000ll;
613    }
614
615    // buffer at least 3 times the target duration, or up to 10 seconds
616    int64_t durationToBufferUs = targetDurationUs * 3;
617    if (durationToBufferUs > kMinBufferedDurationUs)  {
618        durationToBufferUs = kMinBufferedDurationUs;
619    }
620
621    int64_t bufferedDurationUs = 0ll;
622    status_t finalResult = NOT_ENOUGH_DATA;
623    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
624        sp<AnotherPacketSource> packetSource =
625            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
626
627        bufferedDurationUs =
628                packetSource->getBufferedDurationUs(&finalResult);
629        finalResult = OK;
630    } else {
631        // Use max stream duration to prevent us from waiting on a non-existent stream;
632        // when we cannot make out from the manifest what streams are included in a playlist
633        // we might assume extra streams.
634        for (size_t i = 0; i < mPacketSources.size(); ++i) {
635            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
636                continue;
637            }
638
639            int64_t bufferedStreamDurationUs =
640                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
641            ALOGV("buffered %" PRId64 " for stream %d",
642                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
643            if (bufferedStreamDurationUs > bufferedDurationUs) {
644                bufferedDurationUs = bufferedStreamDurationUs;
645            }
646        }
647    }
648    downloadMore = (bufferedDurationUs < durationToBufferUs);
649
650    // signal start if buffered up at least the target size
651    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
652        mPrepared = true;
653
654        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
655                bufferedDurationUs, targetDurationUs);
656        sp<AMessage> msg = mNotify->dup();
657        msg->setInt32("what", kWhatTemporarilyDoneFetching);
658        msg->post();
659    }
660
661    if (finalResult == OK && downloadMore) {
662        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
663                bufferedDurationUs, durationToBufferUs);
664        // delay the next download slightly; hopefully this gives other concurrent fetchers
665        // a better chance to run.
666        // onDownloadNext();
667        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
668        msg->setInt32("generation", mMonitorQueueGeneration);
669        msg->post(1000l);
670    } else {
671        // Nothing to do yet, try again in a second.
672
673        sp<AMessage> msg = mNotify->dup();
674        msg->setInt32("what", kWhatTemporarilyDoneFetching);
675        msg->post();
676
677        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
678        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
679                delayUs, bufferedDurationUs, durationToBufferUs);
680        // :TRICKY: need to enforce minimum delay because the delay to
681        // refresh the playlist will become 0
682        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
683    }
684}
685
686status_t PlaylistFetcher::refreshPlaylist() {
687    if (delayUsToRefreshPlaylist() <= 0) {
688        bool unchanged;
689        sp<M3UParser> playlist = mSession->fetchPlaylist(
690                mURI.c_str(), mPlaylistHash, &unchanged);
691
692        if (playlist == NULL) {
693            if (unchanged) {
694                // We succeeded in fetching the playlist, but it was
695                // unchanged from the last time we tried.
696
697                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
698                    mRefreshState = (RefreshState)(mRefreshState + 1);
699                }
700            } else {
701                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
702                notifyError(ERROR_IO);
703                return ERROR_IO;
704            }
705        } else {
706            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
707            mPlaylist = playlist;
708
709            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
710                updateDuration();
711            }
712        }
713
714        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
715    }
716    return OK;
717}
718
719// static
720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
721    return buffer->size() > 0 && buffer->data()[0] == 0x47;
722}
723
724void PlaylistFetcher::onDownloadNext() {
725    if (refreshPlaylist() != OK) {
726        return;
727    }
728
729    int32_t firstSeqNumberInPlaylist;
730    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
731                "media-sequence", &firstSeqNumberInPlaylist)) {
732        firstSeqNumberInPlaylist = 0;
733    }
734
735    bool discontinuity = false;
736
737    const int32_t lastSeqNumberInPlaylist =
738        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
739
740    if (mStartup && mSeqNumber >= 0
741            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
742        // in case we guessed wrong during reconfiguration, try fetching the latest content.
743        mSeqNumber = lastSeqNumberInPlaylist;
744    }
745
746    if (mDiscontinuitySeq < 0) {
747        mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
748    }
749
750    if (mSeqNumber < 0) {
751        CHECK_GE(mStartTimeUs, 0ll);
752
753        if (mSegmentStartTimeUs < 0) {
754            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
755                // If this is a live session, start 3 segments from the end on connect
756                mSeqNumber = lastSeqNumberInPlaylist - 3;
757                if (mSeqNumber < firstSeqNumberInPlaylist) {
758                    mSeqNumber = firstSeqNumberInPlaylist;
759                }
760            } else {
761                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
762                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
763            }
764            mStartTimeUsRelative = true;
765            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
766                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
767                    lastSeqNumberInPlaylist);
768        } else {
769            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
770            if (mAdaptive) {
771                // avoid double fetch/decode
772                mSeqNumber += 1;
773            }
774            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
775            if (mSeqNumber < minSeq) {
776                mSeqNumber = minSeq;
777            }
778
779            if (mSeqNumber < firstSeqNumberInPlaylist) {
780                mSeqNumber = firstSeqNumberInPlaylist;
781            }
782
783            if (mSeqNumber > lastSeqNumberInPlaylist) {
784                mSeqNumber = lastSeqNumberInPlaylist;
785            }
786            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
787                    mSeqNumber, firstSeqNumberInPlaylist,
788                    lastSeqNumberInPlaylist);
789        }
790    }
791
792    if (mSeqNumber < firstSeqNumberInPlaylist
793            || mSeqNumber > lastSeqNumberInPlaylist) {
794        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
795            ++mNumRetries;
796
797            if (mSeqNumber > lastSeqNumberInPlaylist) {
798                // refresh in increasing fraction (1/2, 1/3, ...) of the
799                // playlist's target duration or 3 seconds, whichever is less
800                int32_t targetDurationSecs;
801                CHECK(mPlaylist->meta()->findInt32(
802                        "target-duration", &targetDurationSecs));
803                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
804                        1000000ll / (1 + mNumRetries);
805                if (delayUs > kMaxMonitorDelayUs) {
806                    delayUs = kMaxMonitorDelayUs;
807                }
808                ALOGV("sequence number high: %d from (%d .. %d), "
809                      "monitor in %" PRId64 " (retry=%d)",
810                        mSeqNumber, firstSeqNumberInPlaylist,
811                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
812                postMonitorQueue(delayUs);
813                return;
814            }
815
816            // we've missed the boat, let's start from the lowest sequence
817            // number available and signal a discontinuity.
818
819            ALOGI("We've missed the boat, restarting playback."
820                  "  mStartup=%d, was  looking for %d in %d-%d",
821                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
822                    lastSeqNumberInPlaylist);
823            mSeqNumber = lastSeqNumberInPlaylist - 3;
824            if (mSeqNumber < firstSeqNumberInPlaylist) {
825                mSeqNumber = firstSeqNumberInPlaylist;
826            }
827            discontinuity = true;
828
829            // fall through
830        } else {
831            ALOGE("Cannot find sequence number %d in playlist "
832                 "(contains %d - %d)",
833                 mSeqNumber, firstSeqNumberInPlaylist,
834                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
835
836            notifyError(ERROR_END_OF_STREAM);
837            return;
838        }
839    }
840
841    mNumRetries = 0;
842
843    AString uri;
844    sp<AMessage> itemMeta;
845    CHECK(mPlaylist->itemAt(
846                mSeqNumber - firstSeqNumberInPlaylist,
847                &uri,
848                &itemMeta));
849
850    int32_t val;
851    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
852        mDiscontinuitySeq++;
853        discontinuity = true;
854    }
855
856    int64_t range_offset, range_length;
857    if (!itemMeta->findInt64("range-offset", &range_offset)
858            || !itemMeta->findInt64("range-length", &range_length)) {
859        range_offset = 0;
860        range_length = -1;
861    }
862
863    ALOGV("fetching segment %d from (%d .. %d)",
864          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
865
866    ALOGV("fetching '%s'", uri.c_str());
867
868    sp<DataSource> source;
869    sp<ABuffer> buffer, tsBuffer;
870    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
871    // this avoids interleaved connections to the key and segment file.
872    {
873        sp<ABuffer> junk = new ABuffer(16);
874        junk->setRange(0, 16);
875        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
876                true /* first */);
877        if (err != OK) {
878            notifyError(err);
879            return;
880        }
881    }
882
883    // block-wise download
884    bool startup = mStartup;
885    ssize_t bytesRead;
886    do {
887        bytesRead = mSession->fetchFile(
888                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
889
890        if (bytesRead < 0) {
891            status_t err = bytesRead;
892            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
893            notifyError(err);
894            return;
895        }
896
897        CHECK(buffer != NULL);
898
899        size_t size = buffer->size();
900        // Set decryption range.
901        buffer->setRange(size - bytesRead, bytesRead);
902        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
903                buffer->offset() == 0 /* first */);
904        // Unset decryption range.
905        buffer->setRange(0, size);
906
907        if (err != OK) {
908            ALOGE("decryptBuffer failed w/ error %d", err);
909
910            notifyError(err);
911            return;
912        }
913
914        if (startup || discontinuity) {
915            // Signal discontinuity.
916
917            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
918                // If this was a live event this made no sense since
919                // we don't have access to all the segment before the current
920                // one.
921                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
922            }
923
924            if (discontinuity) {
925                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
926
927                queueDiscontinuity(
928                        ATSParser::DISCONTINUITY_FORMATCHANGE,
929                        NULL /* extra */);
930
931                discontinuity = false;
932            }
933
934            startup = false;
935        }
936
937        err = OK;
938        if (bufferStartsWithTsSyncByte(buffer)) {
939            // Incremental extraction is only supported for MPEG2 transport streams.
940            if (tsBuffer == NULL) {
941                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
942                tsBuffer->setRange(0, 0);
943            } else if (tsBuffer->capacity() != buffer->capacity()) {
944                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
945                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
946                tsBuffer->setRange(tsOff, tsSize);
947            }
948            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
949
950            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
951        }
952
953        if (err == -EAGAIN) {
954            // starting sequence number too low/high
955            mTSParser.clear();
956            postMonitorQueue();
957            return;
958        } else if (err == ERROR_OUT_OF_RANGE) {
959            // reached stopping point
960            stopAsync(/* clear = */ false);
961            return;
962        } else if (err != OK) {
963            notifyError(err);
964            return;
965        }
966
967    } while (bytesRead != 0);
968
969    if (bufferStartsWithTsSyncByte(buffer)) {
970        // If we still don't see a stream after fetching a full ts segment mark it as
971        // nonexistent.
972        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
973        ATSParser::SourceType srcTypes[kNumTypes] =
974                { ATSParser::VIDEO, ATSParser::AUDIO };
975        LiveSession::StreamType streamTypes[kNumTypes] =
976                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
977
978        for (size_t i = 0; i < kNumTypes; i++) {
979            ATSParser::SourceType srcType = srcTypes[i];
980            LiveSession::StreamType streamType = streamTypes[i];
981
982            sp<AnotherPacketSource> source =
983                static_cast<AnotherPacketSource *>(
984                    mTSParser->getSource(srcType).get());
985
986            if (source == NULL) {
987                ALOGW("MPEG2 Transport stream does not contain %s data.",
988                      srcType == ATSParser::VIDEO ? "video" : "audio");
989
990                mStreamTypeMask &= ~streamType;
991                mPacketSources.removeItem(streamType);
992            }
993        }
994
995    }
996
997    if (checkDecryptPadding(buffer) != OK) {
998        ALOGE("Incorrect padding bytes after decryption.");
999        notifyError(ERROR_MALFORMED);
1000        return;
1001    }
1002
1003    status_t err = OK;
1004    if (tsBuffer != NULL) {
1005        AString method;
1006        CHECK(buffer->meta()->findString("cipher-method", &method));
1007        if ((tsBuffer->size() > 0 && method == "NONE")
1008                || tsBuffer->size() > 16) {
1009            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1010                    "bytes in length.");
1011            notifyError(ERROR_MALFORMED);
1012            return;
1013        }
1014    }
1015
1016    // bulk extract non-ts files
1017    if (tsBuffer == NULL) {
1018      err = extractAndQueueAccessUnits(buffer, itemMeta);
1019    }
1020
1021    if (err != OK) {
1022        notifyError(err);
1023        return;
1024    }
1025
1026    ++mSeqNumber;
1027
1028    postMonitorQueue();
1029}
1030
1031int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1032    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1033    if (mPlaylist->meta() == NULL
1034            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1035        firstSeqNumberInPlaylist = 0;
1036    }
1037    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1038
1039    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1040    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1041        sp<AMessage> itemMeta;
1042        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1043
1044        int64_t itemDurationUs;
1045        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1046
1047        anchorTimeUs -= itemDurationUs;
1048        --index;
1049    }
1050
1051    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1052    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1053        return newSeqNumber;
1054    } else {
1055        return lastSeqNumberInPlaylist;
1056    }
1057}
1058
1059int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1060    int32_t firstSeqNumberInPlaylist;
1061    if (mPlaylist->meta() == NULL
1062            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1063        firstSeqNumberInPlaylist = 0;
1064    }
1065
1066    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1067    if (discontinuitySeq < curDiscontinuitySeq) {
1068        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1069    }
1070
1071    size_t index = 0;
1072    while (index < mPlaylist->size()) {
1073        sp<AMessage> itemMeta;
1074        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1075
1076        int64_t discontinuity;
1077        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1078            curDiscontinuitySeq++;
1079        }
1080
1081        if (curDiscontinuitySeq == discontinuitySeq) {
1082            return firstSeqNumberInPlaylist + index;
1083        }
1084
1085        ++index;
1086    }
1087
1088    return firstSeqNumberInPlaylist + mPlaylist->size();
1089}
1090
1091int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1092    int32_t firstSeqNumberInPlaylist;
1093    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1094                "media-sequence", &firstSeqNumberInPlaylist)) {
1095        firstSeqNumberInPlaylist = 0;
1096    }
1097
1098    size_t index = 0;
1099    int64_t segmentStartUs = 0;
1100    while (index < mPlaylist->size()) {
1101        sp<AMessage> itemMeta;
1102        CHECK(mPlaylist->itemAt(
1103                    index, NULL /* uri */, &itemMeta));
1104
1105        int64_t itemDurationUs;
1106        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1107
1108        if (timeUs < segmentStartUs + itemDurationUs) {
1109            break;
1110        }
1111
1112        segmentStartUs += itemDurationUs;
1113        ++index;
1114    }
1115
1116    if (index >= mPlaylist->size()) {
1117        index = mPlaylist->size() - 1;
1118    }
1119
1120    return firstSeqNumberInPlaylist + index;
1121}
1122
1123const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1124        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1125    sp<MetaData> format = source->getFormat();
1126    if (format != NULL) {
1127        // for simplicity, store a reference to the format in each unit
1128        accessUnit->meta()->setObject("format", format);
1129    }
1130
1131    if (discard) {
1132        accessUnit->meta()->setInt32("discard", discard);
1133    }
1134
1135    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1136    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1137    return accessUnit;
1138}
1139
1140status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1141    if (mTSParser == NULL) {
1142        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1143        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1144    }
1145
1146    if (mNextPTSTimeUs >= 0ll) {
1147        sp<AMessage> extra = new AMessage;
1148        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1149        // ATSParser from skewing the timestamps of access units.
1150        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1151
1152        mTSParser->signalDiscontinuity(
1153                ATSParser::DISCONTINUITY_SEEK, extra);
1154
1155        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1156        mNextPTSTimeUs = -1ll;
1157        mFirstPTSValid = false;
1158    }
1159
1160    size_t offset = 0;
1161    while (offset + 188 <= buffer->size()) {
1162        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1163
1164        if (err != OK) {
1165            return err;
1166        }
1167
1168        offset += 188;
1169    }
1170    // setRange to indicate consumed bytes.
1171    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1172
1173    status_t err = OK;
1174    for (size_t i = mPacketSources.size(); i-- > 0;) {
1175        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1176
1177        const char *key;
1178        ATSParser::SourceType type;
1179        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1180        switch (stream) {
1181            case LiveSession::STREAMTYPE_VIDEO:
1182                type = ATSParser::VIDEO;
1183                key = "timeUsVideo";
1184                break;
1185
1186            case LiveSession::STREAMTYPE_AUDIO:
1187                type = ATSParser::AUDIO;
1188                key = "timeUsAudio";
1189                break;
1190
1191            case LiveSession::STREAMTYPE_SUBTITLES:
1192            {
1193                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1194                return ERROR_MALFORMED;
1195                break;
1196            }
1197
1198            default:
1199                TRESPASS();
1200        }
1201
1202        sp<AnotherPacketSource> source =
1203            static_cast<AnotherPacketSource *>(
1204                    mTSParser->getSource(type).get());
1205
1206        if (source == NULL) {
1207            continue;
1208        }
1209
1210        int64_t timeUs;
1211        sp<ABuffer> accessUnit;
1212        status_t finalResult;
1213        while (source->hasBufferAvailable(&finalResult)
1214                && source->dequeueAccessUnit(&accessUnit) == OK) {
1215
1216            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1217
1218            if (mStartup) {
1219                if (!mFirstPTSValid) {
1220                    mFirstTimeUs = timeUs;
1221                    mFirstPTSValid = true;
1222                }
1223                if (mStartTimeUsRelative) {
1224                    timeUs -= mFirstTimeUs;
1225                    if (timeUs < 0) {
1226                        timeUs = 0;
1227                    }
1228                }
1229
1230                if (timeUs < mStartTimeUs) {
1231                    // buffer up to the closest preceding IDR frame
1232                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1233                            timeUs, mStartTimeUs);
1234                    const char *mime;
1235                    sp<MetaData> format  = source->getFormat();
1236                    bool isAvc = false;
1237                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1238                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1239                        isAvc = true;
1240                    }
1241                    if (isAvc && IsIDR(accessUnit)) {
1242                        mVideoBuffer->clear();
1243                    }
1244                    if (isAvc) {
1245                        mVideoBuffer->queueAccessUnit(accessUnit);
1246                    }
1247
1248                    continue;
1249                }
1250            }
1251
1252            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1253            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1254
1255                int32_t targetDurationSecs;
1256                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1257                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1258                // mStartup
1259                //   mStartup is true until we have queued a packet for all the streams
1260                //   we are fetching. We queue packets whose timestamps are greater than
1261                //   mStartTimeUs.
1262                // mSegmentStartTimeUs >= 0
1263                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1264                // timeUs - mStartTimeUs > targetDurationUs:
1265                //   This and the 2 above conditions should only happen when adapting in a live
1266                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1267                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1268                //   the old fetcher would then continue fetching data until timeUs. We don't want
1269                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1270                //   stop as early as possible. The definition of being "too far ahead" is
1271                //   arbitrary; here we use targetDurationUs as threshold.
1272                if (mStartup && mSegmentStartTimeUs >= 0
1273                        && timeUs - mStartTimeUs > targetDurationUs) {
1274                    // we just guessed a starting timestamp that is too high when adapting in a
1275                    // live stream; re-adjust based on the actual timestamp extracted from the
1276                    // media segment; if we didn't move backward after the re-adjustment
1277                    // (newSeqNumber), start at least 1 segment prior.
1278                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1279                    if (newSeqNumber >= mSeqNumber) {
1280                        --mSeqNumber;
1281                    } else {
1282                        mSeqNumber = newSeqNumber;
1283                    }
1284                    mStartTimeUsNotify = mNotify->dup();
1285                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1286                    return -EAGAIN;
1287                }
1288
1289                int32_t seq;
1290                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1291                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1292                }
1293                int64_t startTimeUs;
1294                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1295                    mStartTimeUsNotify->setInt64(key, timeUs);
1296
1297                    uint32_t streamMask = 0;
1298                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1299                    streamMask |= mPacketSources.keyAt(i);
1300                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1301
1302                    if (streamMask == mStreamTypeMask) {
1303                        mStartup = false;
1304                        mStartTimeUsNotify->post();
1305                        mStartTimeUsNotify.clear();
1306                    }
1307                }
1308            }
1309
1310            if (mStopParams != NULL) {
1311                // Queue discontinuity in original stream.
1312                int32_t discontinuitySeq;
1313                int64_t stopTimeUs;
1314                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1315                        || discontinuitySeq > mDiscontinuitySeq
1316                        || !mStopParams->findInt64(key, &stopTimeUs)
1317                        || (discontinuitySeq == mDiscontinuitySeq
1318                                && timeUs >= stopTimeUs)) {
1319                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1320                    mStreamTypeMask &= ~stream;
1321                    mPacketSources.removeItemsAt(i);
1322                    break;
1323                }
1324            }
1325
1326            // Note that we do NOT dequeue any discontinuities except for format change.
1327            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1328                const bool discard = true;
1329                status_t status;
1330                while (mVideoBuffer->hasBufferAvailable(&status)) {
1331                    sp<ABuffer> videoBuffer;
1332                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1333                    setAccessUnitProperties(videoBuffer, source, discard);
1334                    packetSource->queueAccessUnit(videoBuffer);
1335                }
1336            }
1337
1338            setAccessUnitProperties(accessUnit, source);
1339            packetSource->queueAccessUnit(accessUnit);
1340        }
1341
1342        if (err != OK) {
1343            break;
1344        }
1345    }
1346
1347    if (err != OK) {
1348        for (size_t i = mPacketSources.size(); i-- > 0;) {
1349            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1350            packetSource->clear();
1351        }
1352        return err;
1353    }
1354
1355    if (!mStreamTypeMask) {
1356        // Signal gap is filled between original and new stream.
1357        ALOGV("ERROR OUT OF RANGE");
1358        return ERROR_OUT_OF_RANGE;
1359    }
1360
1361    return OK;
1362}
1363
1364/* static */
1365bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1366        const sp<ABuffer> &buffer) {
1367    size_t pos = 0;
1368
1369    // skip possible BOM
1370    if (buffer->size() >= pos + 3 &&
1371            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1372        pos += 3;
1373    }
1374
1375    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1376    if (buffer->size() < pos + 6 ||
1377            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1378        return false;
1379    }
1380    pos += 6;
1381
1382    if (buffer->size() == pos) {
1383        return true;
1384    }
1385
1386    uint8_t sep = buffer->data()[pos];
1387    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1388}
1389
1390status_t PlaylistFetcher::extractAndQueueAccessUnits(
1391        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1392    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1393        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1394            ALOGE("This stream only contains subtitles.");
1395            return ERROR_MALFORMED;
1396        }
1397
1398        const sp<AnotherPacketSource> packetSource =
1399            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1400
1401        int64_t durationUs;
1402        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1403        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1404        buffer->meta()->setInt64("durationUs", durationUs);
1405        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1406        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1407
1408        packetSource->queueAccessUnit(buffer);
1409        return OK;
1410    }
1411
1412    if (mNextPTSTimeUs >= 0ll) {
1413        mFirstPTSValid = false;
1414        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1415        mNextPTSTimeUs = -1ll;
1416    }
1417
1418    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1419    // stream prefixed by an ID3 tag.
1420
1421    bool firstID3Tag = true;
1422    uint64_t PTS = 0;
1423
1424    for (;;) {
1425        // Make sure to skip all ID3 tags preceding the audio data.
1426        // At least one must be present to provide the PTS timestamp.
1427
1428        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1429        if (!id3.isValid()) {
1430            if (firstID3Tag) {
1431                ALOGE("Unable to parse ID3 tag.");
1432                return ERROR_MALFORMED;
1433            } else {
1434                break;
1435            }
1436        }
1437
1438        if (firstID3Tag) {
1439            bool found = false;
1440
1441            ID3::Iterator it(id3, "PRIV");
1442            while (!it.done()) {
1443                size_t length;
1444                const uint8_t *data = it.getData(&length);
1445
1446                static const char *kMatchName =
1447                    "com.apple.streaming.transportStreamTimestamp";
1448                static const size_t kMatchNameLen = strlen(kMatchName);
1449
1450                if (length == kMatchNameLen + 1 + 8
1451                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1452                    found = true;
1453                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1454                }
1455
1456                it.next();
1457            }
1458
1459            if (!found) {
1460                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1461                return ERROR_MALFORMED;
1462            }
1463        }
1464
1465        // skip the ID3 tag
1466        buffer->setRange(
1467                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1468
1469        firstID3Tag = false;
1470    }
1471
1472    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1473        ALOGW("This stream only contains audio data!");
1474
1475        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1476
1477        if (mStreamTypeMask == 0) {
1478            return OK;
1479        }
1480    }
1481
1482    sp<AnotherPacketSource> packetSource =
1483        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1484
1485    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1486        ABitReader bits(buffer->data(), buffer->size());
1487
1488        // adts_fixed_header
1489
1490        CHECK_EQ(bits.getBits(12), 0xfffu);
1491        bits.skipBits(3);  // ID, layer
1492        bool protection_absent = bits.getBits(1) != 0;
1493
1494        unsigned profile = bits.getBits(2);
1495        CHECK_NE(profile, 3u);
1496        unsigned sampling_freq_index = bits.getBits(4);
1497        bits.getBits(1);  // private_bit
1498        unsigned channel_configuration = bits.getBits(3);
1499        CHECK_NE(channel_configuration, 0u);
1500        bits.skipBits(2);  // original_copy, home
1501
1502        sp<MetaData> meta = MakeAACCodecSpecificData(
1503                profile, sampling_freq_index, channel_configuration);
1504
1505        meta->setInt32(kKeyIsADTS, true);
1506
1507        packetSource->setFormat(meta);
1508    }
1509
1510    int64_t numSamples = 0ll;
1511    int32_t sampleRate;
1512    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1513
1514    int64_t timeUs = (PTS * 100ll) / 9ll;
1515    if (!mFirstPTSValid) {
1516        mFirstPTSValid = true;
1517        mFirstTimeUs = timeUs;
1518    }
1519
1520    size_t offset = 0;
1521    while (offset < buffer->size()) {
1522        const uint8_t *adtsHeader = buffer->data() + offset;
1523        CHECK_LT(offset + 5, buffer->size());
1524
1525        unsigned aac_frame_length =
1526            ((adtsHeader[3] & 3) << 11)
1527            | (adtsHeader[4] << 3)
1528            | (adtsHeader[5] >> 5);
1529
1530        if (aac_frame_length == 0) {
1531            const uint8_t *id3Header = adtsHeader;
1532            if (!memcmp(id3Header, "ID3", 3)) {
1533                ID3 id3(id3Header, buffer->size() - offset, true);
1534                if (id3.isValid()) {
1535                    offset += id3.rawSize();
1536                    continue;
1537                };
1538            }
1539            return ERROR_MALFORMED;
1540        }
1541
1542        CHECK_LE(offset + aac_frame_length, buffer->size());
1543
1544        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1545        offset += aac_frame_length;
1546
1547        // Each AAC frame encodes 1024 samples.
1548        numSamples += 1024;
1549
1550        if (mStartup) {
1551            int64_t startTimeUs = unitTimeUs;
1552            if (mStartTimeUsRelative) {
1553                startTimeUs -= mFirstTimeUs;
1554                if (startTimeUs  < 0) {
1555                    startTimeUs = 0;
1556                }
1557            }
1558            if (startTimeUs < mStartTimeUs) {
1559                continue;
1560            }
1561        }
1562
1563        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1564        memcpy(unit->data(), adtsHeader, aac_frame_length);
1565
1566        unit->meta()->setInt64("timeUs", unitTimeUs);
1567        unit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1568        unit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1569        packetSource->queueAccessUnit(unit);
1570    }
1571
1572    return OK;
1573}
1574
1575void PlaylistFetcher::updateDuration() {
1576    int64_t durationUs = 0ll;
1577    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1578        sp<AMessage> itemMeta;
1579        CHECK(mPlaylist->itemAt(
1580                    index, NULL /* uri */, &itemMeta));
1581
1582        int64_t itemDurationUs;
1583        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1584
1585        durationUs += itemDurationUs;
1586    }
1587
1588    sp<AMessage> msg = mNotify->dup();
1589    msg->setInt32("what", kWhatDurationUpdate);
1590    msg->setInt64("durationUs", durationUs);
1591    msg->post();
1592}
1593
1594int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1595    int64_t durationUs, threshold;
1596    if (msg->findInt64("durationUs", &durationUs)) {
1597        return kNumSkipFrames * durationUs;
1598    }
1599
1600    sp<RefBase> obj;
1601    msg->findObject("format", &obj);
1602    MetaData *format = static_cast<MetaData *>(obj.get());
1603
1604    const char *mime;
1605    CHECK(format->findCString(kKeyMIMEType, &mime));
1606    bool audio = !strncasecmp(mime, "audio/", 6);
1607    if (audio) {
1608        // Assumes 1000 samples per frame.
1609        int32_t sampleRate;
1610        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1611        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1612                * (1000000 / sampleRate) /* sample duration (us) */;
1613    } else {
1614        int32_t frameRate;
1615        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1616            return kNumSkipFrames * (1000000 / frameRate);
1617        }
1618    }
1619
1620    return 500000ll;
1621}
1622
1623}  // namespace android
1624