PlaylistFetcher.cpp revision f0d689934e70d3e5b3784265e890377db04c7c1d
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri,
59        int32_t subtitleGeneration)
60    : mNotify(notify),
61      mStartTimeUsNotify(notify->dup()),
62      mSession(session),
63      mURI(uri),
64      mStreamTypeMask(0),
65      mStartTimeUs(-1ll),
66      mSegmentStartTimeUs(-1ll),
67      mDiscontinuitySeq(-1ll),
68      mStartTimeUsRelative(false),
69      mLastPlaylistFetchTimeUs(-1ll),
70      mSeqNumber(-1),
71      mNumRetries(0),
72      mStartup(true),
73      mAdaptive(false),
74      mPrepared(false),
75      mNextPTSTimeUs(-1ll),
76      mMonitorQueueGeneration(0),
77      mSubtitleGeneration(subtitleGeneration),
78      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
79      mFirstPTSValid(false),
80      mAbsoluteTimeAnchorUs(0ll),
81      mVideoBuffer(new AnotherPacketSource(NULL)) {
82    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
83    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
84    mStartTimeUsNotify->setInt32("streamMask", 0);
85}
86
87PlaylistFetcher::~PlaylistFetcher() {
88}
89
90int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
91    CHECK(mPlaylist != NULL);
92
93    int32_t firstSeqNumberInPlaylist;
94    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
95                "media-sequence", &firstSeqNumberInPlaylist)) {
96        firstSeqNumberInPlaylist = 0;
97    }
98
99    int32_t lastSeqNumberInPlaylist =
100        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
101
102    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
103    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
104
105    int64_t segmentStartUs = 0ll;
106    for (int32_t index = 0;
107            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
108        sp<AMessage> itemMeta;
109        CHECK(mPlaylist->itemAt(
110                    index, NULL /* uri */, &itemMeta));
111
112        int64_t itemDurationUs;
113        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
114
115        segmentStartUs += itemDurationUs;
116    }
117
118    return segmentStartUs;
119}
120
121int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
122    int64_t nowUs = ALooper::GetNowUs();
123
124    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
125        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
126        return 0ll;
127    }
128
129    if (mPlaylist->isComplete()) {
130        return (~0llu >> 1);
131    }
132
133    int32_t targetDurationSecs;
134    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
135
136    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
137
138    int64_t minPlaylistAgeUs;
139
140    switch (mRefreshState) {
141        case INITIAL_MINIMUM_RELOAD_DELAY:
142        {
143            size_t n = mPlaylist->size();
144            if (n > 0) {
145                sp<AMessage> itemMeta;
146                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
147
148                int64_t itemDurationUs;
149                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
150
151                minPlaylistAgeUs = itemDurationUs;
152                break;
153            }
154
155            // fall through
156        }
157
158        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
159        {
160            minPlaylistAgeUs = targetDurationUs / 2;
161            break;
162        }
163
164        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
165        {
166            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
167            break;
168        }
169
170        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
171        {
172            minPlaylistAgeUs = targetDurationUs * 3;
173            break;
174        }
175
176        default:
177            TRESPASS();
178            break;
179    }
180
181    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
182    return delayUs > 0ll ? delayUs : 0ll;
183}
184
185status_t PlaylistFetcher::decryptBuffer(
186        size_t playlistIndex, const sp<ABuffer> &buffer,
187        bool first) {
188    sp<AMessage> itemMeta;
189    bool found = false;
190    AString method;
191
192    for (ssize_t i = playlistIndex; i >= 0; --i) {
193        AString uri;
194        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
195
196        if (itemMeta->findString("cipher-method", &method)) {
197            found = true;
198            break;
199        }
200    }
201
202    if (!found) {
203        method = "NONE";
204    }
205    buffer->meta()->setString("cipher-method", method.c_str());
206
207    if (method == "NONE") {
208        return OK;
209    } else if (!(method == "AES-128")) {
210        ALOGE("Unsupported cipher method '%s'", method.c_str());
211        return ERROR_UNSUPPORTED;
212    }
213
214    AString keyURI;
215    if (!itemMeta->findString("cipher-uri", &keyURI)) {
216        ALOGE("Missing key uri");
217        return ERROR_MALFORMED;
218    }
219
220    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
221
222    sp<ABuffer> key;
223    if (index >= 0) {
224        key = mAESKeyForURI.valueAt(index);
225    } else {
226        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
227
228        if (err < 0) {
229            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
230            return ERROR_IO;
231        } else if (key->size() != 16) {
232            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
233            return ERROR_MALFORMED;
234        }
235
236        mAESKeyForURI.add(keyURI, key);
237    }
238
239    AES_KEY aes_key;
240    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
241        ALOGE("failed to set AES decryption key.");
242        return UNKNOWN_ERROR;
243    }
244
245    size_t n = buffer->size();
246    if (!n) {
247        return OK;
248    }
249    CHECK(n % 16 == 0);
250
251    if (first) {
252        // If decrypting the first block in a file, read the iv from the manifest
253        // or derive the iv from the file's sequence number.
254
255        AString iv;
256        if (itemMeta->findString("cipher-iv", &iv)) {
257            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
258                    || iv.size() != 16 * 2 + 2) {
259                ALOGE("malformed cipher IV '%s'.", iv.c_str());
260                return ERROR_MALFORMED;
261            }
262
263            memset(mAESInitVec, 0, sizeof(mAESInitVec));
264            for (size_t i = 0; i < 16; ++i) {
265                char c1 = tolower(iv.c_str()[2 + 2 * i]);
266                char c2 = tolower(iv.c_str()[3 + 2 * i]);
267                if (!isxdigit(c1) || !isxdigit(c2)) {
268                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
269                    return ERROR_MALFORMED;
270                }
271                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
272                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
273
274                mAESInitVec[i] = nibble1 << 4 | nibble2;
275            }
276        } else {
277            memset(mAESInitVec, 0, sizeof(mAESInitVec));
278            mAESInitVec[15] = mSeqNumber & 0xff;
279            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
280            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
281            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
282        }
283    }
284
285    AES_cbc_encrypt(
286            buffer->data(), buffer->data(), buffer->size(),
287            &aes_key, mAESInitVec, AES_DECRYPT);
288
289    return OK;
290}
291
292status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
293    status_t err;
294    AString method;
295    CHECK(buffer->meta()->findString("cipher-method", &method));
296    if (method == "NONE") {
297        return OK;
298    }
299
300    uint8_t padding = 0;
301    if (buffer->size() > 0) {
302        padding = buffer->data()[buffer->size() - 1];
303    }
304
305    if (padding > 16) {
306        return ERROR_MALFORMED;
307    }
308
309    for (size_t i = buffer->size() - padding; i < padding; i++) {
310        if (buffer->data()[i] != padding) {
311            return ERROR_MALFORMED;
312        }
313    }
314
315    buffer->setRange(buffer->offset(), buffer->size() - padding);
316    return OK;
317}
318
319void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
320    int64_t maxDelayUs = delayUsToRefreshPlaylist();
321    if (maxDelayUs < minDelayUs) {
322        maxDelayUs = minDelayUs;
323    }
324    if (delayUs > maxDelayUs) {
325        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
326        delayUs = maxDelayUs;
327    }
328    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
329    msg->setInt32("generation", mMonitorQueueGeneration);
330    msg->post(delayUs);
331}
332
333void PlaylistFetcher::cancelMonitorQueue() {
334    ++mMonitorQueueGeneration;
335}
336
337void PlaylistFetcher::startAsync(
338        const sp<AnotherPacketSource> &audioSource,
339        const sp<AnotherPacketSource> &videoSource,
340        const sp<AnotherPacketSource> &subtitleSource,
341        int64_t startTimeUs,
342        int64_t segmentStartTimeUs,
343        int32_t startDiscontinuitySeq,
344        bool adaptive) {
345    sp<AMessage> msg = new AMessage(kWhatStart, id());
346
347    uint32_t streamTypeMask = 0ul;
348
349    if (audioSource != NULL) {
350        msg->setPointer("audioSource", audioSource.get());
351        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
352    }
353
354    if (videoSource != NULL) {
355        msg->setPointer("videoSource", videoSource.get());
356        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
357    }
358
359    if (subtitleSource != NULL) {
360        msg->setPointer("subtitleSource", subtitleSource.get());
361        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
362    }
363
364    msg->setInt32("streamTypeMask", streamTypeMask);
365    msg->setInt64("startTimeUs", startTimeUs);
366    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
367    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
368    msg->setInt32("adaptive", adaptive);
369    msg->post();
370}
371
372void PlaylistFetcher::pauseAsync() {
373    (new AMessage(kWhatPause, id()))->post();
374}
375
376void PlaylistFetcher::stopAsync(bool clear) {
377    sp<AMessage> msg = new AMessage(kWhatStop, id());
378    msg->setInt32("clear", clear);
379    msg->post();
380}
381
382void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
383    AMessage* msg = new AMessage(kWhatResumeUntil, id());
384    msg->setMessage("params", params);
385    msg->post();
386}
387
388void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
389    switch (msg->what()) {
390        case kWhatStart:
391        {
392            status_t err = onStart(msg);
393
394            sp<AMessage> notify = mNotify->dup();
395            notify->setInt32("what", kWhatStarted);
396            notify->setInt32("err", err);
397            notify->post();
398            break;
399        }
400
401        case kWhatPause:
402        {
403            onPause();
404
405            sp<AMessage> notify = mNotify->dup();
406            notify->setInt32("what", kWhatPaused);
407            notify->post();
408            break;
409        }
410
411        case kWhatStop:
412        {
413            onStop(msg);
414
415            sp<AMessage> notify = mNotify->dup();
416            notify->setInt32("what", kWhatStopped);
417            notify->post();
418            break;
419        }
420
421        case kWhatMonitorQueue:
422        case kWhatDownloadNext:
423        {
424            int32_t generation;
425            CHECK(msg->findInt32("generation", &generation));
426
427            if (generation != mMonitorQueueGeneration) {
428                // Stale event
429                break;
430            }
431
432            if (msg->what() == kWhatMonitorQueue) {
433                onMonitorQueue();
434            } else {
435                onDownloadNext();
436            }
437            break;
438        }
439
440        case kWhatResumeUntil:
441        {
442            onResumeUntil(msg);
443            break;
444        }
445
446        default:
447            TRESPASS();
448    }
449}
450
451status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
452    mPacketSources.clear();
453
454    uint32_t streamTypeMask;
455    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
456
457    int64_t startTimeUs;
458    int64_t segmentStartTimeUs;
459    int32_t startDiscontinuitySeq;
460    int32_t adaptive;
461    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
462    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
463    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
464    CHECK(msg->findInt32("adaptive", &adaptive));
465
466    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
467        void *ptr;
468        CHECK(msg->findPointer("audioSource", &ptr));
469
470        mPacketSources.add(
471                LiveSession::STREAMTYPE_AUDIO,
472                static_cast<AnotherPacketSource *>(ptr));
473    }
474
475    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
476        void *ptr;
477        CHECK(msg->findPointer("videoSource", &ptr));
478
479        mPacketSources.add(
480                LiveSession::STREAMTYPE_VIDEO,
481                static_cast<AnotherPacketSource *>(ptr));
482    }
483
484    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
485        void *ptr;
486        CHECK(msg->findPointer("subtitleSource", &ptr));
487
488        mPacketSources.add(
489                LiveSession::STREAMTYPE_SUBTITLES,
490                static_cast<AnotherPacketSource *>(ptr));
491    }
492
493    mStreamTypeMask = streamTypeMask;
494
495    mSegmentStartTimeUs = segmentStartTimeUs;
496    mDiscontinuitySeq = startDiscontinuitySeq;
497
498    if (startTimeUs >= 0) {
499        mStartTimeUs = startTimeUs;
500        mSeqNumber = -1;
501        mStartup = true;
502        mPrepared = false;
503        mAdaptive = adaptive;
504    }
505
506    postMonitorQueue();
507
508    return OK;
509}
510
511void PlaylistFetcher::onPause() {
512    cancelMonitorQueue();
513}
514
515void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
516    cancelMonitorQueue();
517
518    int32_t clear;
519    CHECK(msg->findInt32("clear", &clear));
520    if (clear) {
521        for (size_t i = 0; i < mPacketSources.size(); i++) {
522            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
523            packetSource->clear();
524        }
525    }
526
527    mPacketSources.clear();
528    mStreamTypeMask = 0;
529}
530
531// Resume until we have reached the boundary timestamps listed in `msg`; when
532// the remaining time is too short (within a resume threshold) stop immediately
533// instead.
534status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
535    sp<AMessage> params;
536    CHECK(msg->findMessage("params", &params));
537
538    bool stop = false;
539    for (size_t i = 0; i < mPacketSources.size(); i++) {
540        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
541
542        const char *stopKey;
543        int streamType = mPacketSources.keyAt(i);
544        switch (streamType) {
545        case LiveSession::STREAMTYPE_VIDEO:
546            stopKey = "timeUsVideo";
547            break;
548
549        case LiveSession::STREAMTYPE_AUDIO:
550            stopKey = "timeUsAudio";
551            break;
552
553        case LiveSession::STREAMTYPE_SUBTITLES:
554            stopKey = "timeUsSubtitle";
555            break;
556
557        default:
558            TRESPASS();
559        }
560
561        // Don't resume if we would stop within a resume threshold.
562        int32_t discontinuitySeq;
563        int64_t latestTimeUs = 0, stopTimeUs = 0;
564        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
565        if (latestMeta != NULL
566                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
567                && discontinuitySeq == mDiscontinuitySeq
568                && latestMeta->findInt64("timeUs", &latestTimeUs)
569                && params->findInt64(stopKey, &stopTimeUs)
570                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
571            stop = true;
572        }
573    }
574
575    if (stop) {
576        for (size_t i = 0; i < mPacketSources.size(); i++) {
577            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
578        }
579        stopAsync(/* clear = */ false);
580        return OK;
581    }
582
583    mStopParams = params;
584    postMonitorQueue();
585
586    return OK;
587}
588
589void PlaylistFetcher::notifyError(status_t err) {
590    sp<AMessage> notify = mNotify->dup();
591    notify->setInt32("what", kWhatError);
592    notify->setInt32("err", err);
593    notify->post();
594}
595
596void PlaylistFetcher::queueDiscontinuity(
597        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
598    for (size_t i = 0; i < mPacketSources.size(); ++i) {
599        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
600        // (seek will discard buffer by abandoning old fetchers)
601        mPacketSources.valueAt(i)->queueDiscontinuity(
602                type, extra, false /* discard */);
603    }
604}
605
606void PlaylistFetcher::onMonitorQueue() {
607    bool downloadMore = false;
608    refreshPlaylist();
609
610    int32_t targetDurationSecs;
611    int64_t targetDurationUs = kMinBufferedDurationUs;
612    if (mPlaylist != NULL) {
613        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
614        targetDurationUs = targetDurationSecs * 1000000ll;
615    }
616
617    // buffer at least 3 times the target duration, or up to 10 seconds
618    int64_t durationToBufferUs = targetDurationUs * 3;
619    if (durationToBufferUs > kMinBufferedDurationUs)  {
620        durationToBufferUs = kMinBufferedDurationUs;
621    }
622
623    int64_t bufferedDurationUs = 0ll;
624    status_t finalResult = NOT_ENOUGH_DATA;
625    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
626        sp<AnotherPacketSource> packetSource =
627            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
628
629        bufferedDurationUs =
630                packetSource->getBufferedDurationUs(&finalResult);
631        finalResult = OK;
632    } else {
633        // Use max stream duration to prevent us from waiting on a non-existent stream;
634        // when we cannot make out from the manifest what streams are included in a playlist
635        // we might assume extra streams.
636        for (size_t i = 0; i < mPacketSources.size(); ++i) {
637            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
638                continue;
639            }
640
641            int64_t bufferedStreamDurationUs =
642                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
643            ALOGV("buffered %" PRId64 " for stream %d",
644                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
645            if (bufferedStreamDurationUs > bufferedDurationUs) {
646                bufferedDurationUs = bufferedStreamDurationUs;
647            }
648        }
649    }
650    downloadMore = (bufferedDurationUs < durationToBufferUs);
651
652    // signal start if buffered up at least the target size
653    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
654        mPrepared = true;
655
656        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
657                bufferedDurationUs, targetDurationUs);
658        sp<AMessage> msg = mNotify->dup();
659        msg->setInt32("what", kWhatTemporarilyDoneFetching);
660        msg->post();
661    }
662
663    if (finalResult == OK && downloadMore) {
664        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
665                bufferedDurationUs, durationToBufferUs);
666        // delay the next download slightly; hopefully this gives other concurrent fetchers
667        // a better chance to run.
668        // onDownloadNext();
669        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
670        msg->setInt32("generation", mMonitorQueueGeneration);
671        msg->post(1000l);
672    } else {
673        // Nothing to do yet, try again in a second.
674
675        sp<AMessage> msg = mNotify->dup();
676        msg->setInt32("what", kWhatTemporarilyDoneFetching);
677        msg->post();
678
679        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
680        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
681                delayUs, bufferedDurationUs, durationToBufferUs);
682        // :TRICKY: need to enforce minimum delay because the delay to
683        // refresh the playlist will become 0
684        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
685    }
686}
687
688status_t PlaylistFetcher::refreshPlaylist() {
689    if (delayUsToRefreshPlaylist() <= 0) {
690        bool unchanged;
691        sp<M3UParser> playlist = mSession->fetchPlaylist(
692                mURI.c_str(), mPlaylistHash, &unchanged);
693
694        if (playlist == NULL) {
695            if (unchanged) {
696                // We succeeded in fetching the playlist, but it was
697                // unchanged from the last time we tried.
698
699                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
700                    mRefreshState = (RefreshState)(mRefreshState + 1);
701                }
702            } else {
703                ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
704                return ERROR_IO;
705            }
706        } else {
707            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
708            mPlaylist = playlist;
709
710            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
711                updateDuration();
712            }
713        }
714
715        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
716    }
717    return OK;
718}
719
720// static
721bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
722    return buffer->size() > 0 && buffer->data()[0] == 0x47;
723}
724
725void PlaylistFetcher::onDownloadNext() {
726    status_t err = refreshPlaylist();
727    int32_t firstSeqNumberInPlaylist = 0;
728    int32_t lastSeqNumberInPlaylist = 0;
729    bool discontinuity = false;
730
731    if (mPlaylist != NULL) {
732        if (mPlaylist->meta() != NULL) {
733            mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist);
734        }
735
736        lastSeqNumberInPlaylist =
737                firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
738
739        if (mDiscontinuitySeq < 0) {
740            mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
741        }
742    }
743
744    if (mPlaylist != NULL && mSeqNumber < 0) {
745        CHECK_GE(mStartTimeUs, 0ll);
746
747        if (mSegmentStartTimeUs < 0) {
748            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
749                // If this is a live session, start 3 segments from the end on connect
750                mSeqNumber = lastSeqNumberInPlaylist - 3;
751                if (mSeqNumber < firstSeqNumberInPlaylist) {
752                    mSeqNumber = firstSeqNumberInPlaylist;
753                }
754            } else {
755                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
756                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
757            }
758            mStartTimeUsRelative = true;
759            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
760                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
761                    lastSeqNumberInPlaylist);
762        } else {
763            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
764            if (mAdaptive) {
765                // avoid double fetch/decode
766                mSeqNumber += 1;
767            }
768            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
769            if (mSeqNumber < minSeq) {
770                mSeqNumber = minSeq;
771            }
772
773            if (mSeqNumber < firstSeqNumberInPlaylist) {
774                mSeqNumber = firstSeqNumberInPlaylist;
775            }
776
777            if (mSeqNumber > lastSeqNumberInPlaylist) {
778                mSeqNumber = lastSeqNumberInPlaylist;
779            }
780            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
781                    mSeqNumber, firstSeqNumberInPlaylist,
782                    lastSeqNumberInPlaylist);
783        }
784    }
785
786    // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
787    if (mSeqNumber < firstSeqNumberInPlaylist
788            || mSeqNumber > lastSeqNumberInPlaylist
789            || err != OK) {
790        if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
791            ++mNumRetries;
792
793            if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
794                // make sure we reach this retry logic on refresh failures
795                // by adding an err != OK clause to all enclosing if's.
796
797                // refresh in increasing fraction (1/2, 1/3, ...) of the
798                // playlist's target duration or 3 seconds, whichever is less
799                int64_t delayUs = kMaxMonitorDelayUs;
800                if (mPlaylist != NULL && mPlaylist->meta() != NULL) {
801                    int32_t targetDurationSecs;
802                    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
803                    delayUs = mPlaylist->size() * targetDurationSecs *
804                            1000000ll / (1 + mNumRetries);
805                }
806                if (delayUs > kMaxMonitorDelayUs) {
807                    delayUs = kMaxMonitorDelayUs;
808                }
809                ALOGV("sequence number high: %d from (%d .. %d), "
810                      "monitor in %" PRId64 " (retry=%d)",
811                        mSeqNumber, firstSeqNumberInPlaylist,
812                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
813                postMonitorQueue(delayUs);
814                return;
815            }
816
817            if (err != OK) {
818                notifyError(err);
819                return;
820            }
821
822            // we've missed the boat, let's start 3 segments prior to the latest sequence
823            // number available and signal a discontinuity.
824
825            ALOGI("We've missed the boat, restarting playback."
826                  "  mStartup=%d, was  looking for %d in %d-%d",
827                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
828                    lastSeqNumberInPlaylist);
829            if (mStopParams != NULL) {
830                // we should have kept on fetching until we hit the boundaries in mStopParams,
831                // but since the segments we are supposed to fetch have already rolled off
832                // the playlist, i.e. we have already missed the boat, we inevitably have to
833                // skip.
834                for (size_t i = 0; i < mPacketSources.size(); i++) {
835                    sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
836                    mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
837                }
838                stopAsync(/* clear = */ false);
839                return;
840            }
841            mSeqNumber = lastSeqNumberInPlaylist - 3;
842            if (mSeqNumber < firstSeqNumberInPlaylist) {
843                mSeqNumber = firstSeqNumberInPlaylist;
844            }
845            discontinuity = true;
846
847            // fall through
848        } else {
849            ALOGE("Cannot find sequence number %d in playlist "
850                 "(contains %d - %d)",
851                 mSeqNumber, firstSeqNumberInPlaylist,
852                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
853
854            notifyError(ERROR_END_OF_STREAM);
855            return;
856        }
857    }
858
859    mNumRetries = 0;
860
861    AString uri;
862    sp<AMessage> itemMeta;
863    CHECK(mPlaylist->itemAt(
864                mSeqNumber - firstSeqNumberInPlaylist,
865                &uri,
866                &itemMeta));
867
868    int32_t val;
869    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
870        mDiscontinuitySeq++;
871        discontinuity = true;
872    }
873
874    int64_t range_offset, range_length;
875    if (!itemMeta->findInt64("range-offset", &range_offset)
876            || !itemMeta->findInt64("range-length", &range_length)) {
877        range_offset = 0;
878        range_length = -1;
879    }
880
881    ALOGV("fetching segment %d from (%d .. %d)",
882          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
883
884    ALOGV("fetching '%s'", uri.c_str());
885
886    sp<DataSource> source;
887    sp<ABuffer> buffer, tsBuffer;
888    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
889    // this avoids interleaved connections to the key and segment file.
890    {
891        sp<ABuffer> junk = new ABuffer(16);
892        junk->setRange(0, 16);
893        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
894                true /* first */);
895        if (err != OK) {
896            notifyError(err);
897            return;
898        }
899    }
900
901    // block-wise download
902    bool startup = mStartup;
903    ssize_t bytesRead;
904    do {
905        bytesRead = mSession->fetchFile(
906                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
907
908        if (bytesRead < 0) {
909            status_t err = bytesRead;
910            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
911            notifyError(err);
912            return;
913        }
914
915        CHECK(buffer != NULL);
916
917        size_t size = buffer->size();
918        // Set decryption range.
919        buffer->setRange(size - bytesRead, bytesRead);
920        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
921                buffer->offset() == 0 /* first */);
922        // Unset decryption range.
923        buffer->setRange(0, size);
924
925        if (err != OK) {
926            ALOGE("decryptBuffer failed w/ error %d", err);
927
928            notifyError(err);
929            return;
930        }
931
932        if (startup || discontinuity) {
933            // Signal discontinuity.
934
935            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
936                // If this was a live event this made no sense since
937                // we don't have access to all the segment before the current
938                // one.
939                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
940            }
941
942            if (discontinuity) {
943                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
944
945                queueDiscontinuity(
946                        ATSParser::DISCONTINUITY_FORMATCHANGE,
947                        NULL /* extra */);
948
949                discontinuity = false;
950            }
951
952            startup = false;
953        }
954
955        err = OK;
956        if (bufferStartsWithTsSyncByte(buffer)) {
957            // Incremental extraction is only supported for MPEG2 transport streams.
958            if (tsBuffer == NULL) {
959                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
960                tsBuffer->setRange(0, 0);
961            } else if (tsBuffer->capacity() != buffer->capacity()) {
962                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
963                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
964                tsBuffer->setRange(tsOff, tsSize);
965            }
966            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
967
968            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
969        }
970
971        if (err == -EAGAIN) {
972            // starting sequence number too low/high
973            mTSParser.clear();
974            postMonitorQueue();
975            return;
976        } else if (err == ERROR_OUT_OF_RANGE) {
977            // reached stopping point
978            stopAsync(/* clear = */ false);
979            return;
980        } else if (err != OK) {
981            notifyError(err);
982            return;
983        }
984
985    } while (bytesRead != 0);
986
987    if (bufferStartsWithTsSyncByte(buffer)) {
988        // If we don't see a stream in the program table after fetching a full ts segment
989        // mark it as nonexistent.
990        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
991        ATSParser::SourceType srcTypes[kNumTypes] =
992                { ATSParser::VIDEO, ATSParser::AUDIO };
993        LiveSession::StreamType streamTypes[kNumTypes] =
994                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
995
996        for (size_t i = 0; i < kNumTypes; i++) {
997            ATSParser::SourceType srcType = srcTypes[i];
998            LiveSession::StreamType streamType = streamTypes[i];
999
1000            sp<AnotherPacketSource> source =
1001                static_cast<AnotherPacketSource *>(
1002                    mTSParser->getSource(srcType).get());
1003
1004            if (!mTSParser->hasSource(srcType)) {
1005                ALOGW("MPEG2 Transport stream does not contain %s data.",
1006                      srcType == ATSParser::VIDEO ? "video" : "audio");
1007
1008                mStreamTypeMask &= ~streamType;
1009                mPacketSources.removeItem(streamType);
1010            }
1011        }
1012
1013    }
1014
1015    if (checkDecryptPadding(buffer) != OK) {
1016        ALOGE("Incorrect padding bytes after decryption.");
1017        notifyError(ERROR_MALFORMED);
1018        return;
1019    }
1020
1021    err = OK;
1022    if (tsBuffer != NULL) {
1023        AString method;
1024        CHECK(buffer->meta()->findString("cipher-method", &method));
1025        if ((tsBuffer->size() > 0 && method == "NONE")
1026                || tsBuffer->size() > 16) {
1027            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1028                    "bytes in length.");
1029            notifyError(ERROR_MALFORMED);
1030            return;
1031        }
1032    }
1033
1034    // bulk extract non-ts files
1035    if (tsBuffer == NULL) {
1036        err = extractAndQueueAccessUnits(buffer, itemMeta);
1037        if (err == -EAGAIN) {
1038            // starting sequence number too low/high
1039            postMonitorQueue();
1040            return;
1041        } else if (err == ERROR_OUT_OF_RANGE) {
1042            // reached stopping point
1043            stopAsync(/* clear = */false);
1044            return;
1045        }
1046    }
1047
1048    if (err != OK) {
1049        notifyError(err);
1050        return;
1051    }
1052
1053    ++mSeqNumber;
1054
1055    postMonitorQueue();
1056}
1057
1058int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
1059    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
1060    if (mPlaylist->meta() == NULL
1061            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1062        firstSeqNumberInPlaylist = 0;
1063    }
1064    lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
1065
1066    int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
1067    while (index >= 0 && anchorTimeUs > mStartTimeUs) {
1068        sp<AMessage> itemMeta;
1069        CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1070
1071        int64_t itemDurationUs;
1072        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1073
1074        anchorTimeUs -= itemDurationUs;
1075        --index;
1076    }
1077
1078    int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
1079    if (newSeqNumber <= lastSeqNumberInPlaylist) {
1080        return newSeqNumber;
1081    } else {
1082        return lastSeqNumberInPlaylist;
1083    }
1084}
1085
1086int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1087    int32_t firstSeqNumberInPlaylist;
1088    if (mPlaylist->meta() == NULL
1089            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1090        firstSeqNumberInPlaylist = 0;
1091    }
1092
1093    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1094    if (discontinuitySeq < curDiscontinuitySeq) {
1095        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1096    }
1097
1098    size_t index = 0;
1099    while (index < mPlaylist->size()) {
1100        sp<AMessage> itemMeta;
1101        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1102
1103        int64_t discontinuity;
1104        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1105            curDiscontinuitySeq++;
1106        }
1107
1108        if (curDiscontinuitySeq == discontinuitySeq) {
1109            return firstSeqNumberInPlaylist + index;
1110        }
1111
1112        ++index;
1113    }
1114
1115    return firstSeqNumberInPlaylist + mPlaylist->size();
1116}
1117
1118int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1119    int32_t firstSeqNumberInPlaylist;
1120    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1121                "media-sequence", &firstSeqNumberInPlaylist)) {
1122        firstSeqNumberInPlaylist = 0;
1123    }
1124
1125    size_t index = 0;
1126    int64_t segmentStartUs = 0;
1127    while (index < mPlaylist->size()) {
1128        sp<AMessage> itemMeta;
1129        CHECK(mPlaylist->itemAt(
1130                    index, NULL /* uri */, &itemMeta));
1131
1132        int64_t itemDurationUs;
1133        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1134
1135        if (timeUs < segmentStartUs + itemDurationUs) {
1136            break;
1137        }
1138
1139        segmentStartUs += itemDurationUs;
1140        ++index;
1141    }
1142
1143    if (index >= mPlaylist->size()) {
1144        index = mPlaylist->size() - 1;
1145    }
1146
1147    return firstSeqNumberInPlaylist + index;
1148}
1149
1150const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1151        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1152    sp<MetaData> format = source->getFormat();
1153    if (format != NULL) {
1154        // for simplicity, store a reference to the format in each unit
1155        accessUnit->meta()->setObject("format", format);
1156    }
1157
1158    if (discard) {
1159        accessUnit->meta()->setInt32("discard", discard);
1160    }
1161
1162    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1163    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1164    return accessUnit;
1165}
1166
1167status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1168    if (mTSParser == NULL) {
1169        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1170        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1171    }
1172
1173    if (mNextPTSTimeUs >= 0ll) {
1174        sp<AMessage> extra = new AMessage;
1175        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1176        // ATSParser from skewing the timestamps of access units.
1177        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1178
1179        mTSParser->signalDiscontinuity(
1180                ATSParser::DISCONTINUITY_TIME, extra);
1181
1182        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1183        mNextPTSTimeUs = -1ll;
1184        mFirstPTSValid = false;
1185    }
1186
1187    size_t offset = 0;
1188    while (offset + 188 <= buffer->size()) {
1189        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1190
1191        if (err != OK) {
1192            return err;
1193        }
1194
1195        offset += 188;
1196    }
1197    // setRange to indicate consumed bytes.
1198    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1199
1200    status_t err = OK;
1201    for (size_t i = mPacketSources.size(); i-- > 0;) {
1202        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1203
1204        const char *key;
1205        ATSParser::SourceType type;
1206        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1207        switch (stream) {
1208            case LiveSession::STREAMTYPE_VIDEO:
1209                type = ATSParser::VIDEO;
1210                key = "timeUsVideo";
1211                break;
1212
1213            case LiveSession::STREAMTYPE_AUDIO:
1214                type = ATSParser::AUDIO;
1215                key = "timeUsAudio";
1216                break;
1217
1218            case LiveSession::STREAMTYPE_SUBTITLES:
1219            {
1220                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1221                return ERROR_MALFORMED;
1222                break;
1223            }
1224
1225            default:
1226                TRESPASS();
1227        }
1228
1229        sp<AnotherPacketSource> source =
1230            static_cast<AnotherPacketSource *>(
1231                    mTSParser->getSource(type).get());
1232
1233        if (source == NULL) {
1234            continue;
1235        }
1236
1237        int64_t timeUs;
1238        sp<ABuffer> accessUnit;
1239        status_t finalResult;
1240        while (source->hasBufferAvailable(&finalResult)
1241                && source->dequeueAccessUnit(&accessUnit) == OK) {
1242
1243            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1244
1245            if (mStartup) {
1246                if (!mFirstPTSValid) {
1247                    mFirstTimeUs = timeUs;
1248                    mFirstPTSValid = true;
1249                }
1250                if (mStartTimeUsRelative) {
1251                    timeUs -= mFirstTimeUs;
1252                    if (timeUs < 0) {
1253                        timeUs = 0;
1254                    }
1255                }
1256
1257                if (timeUs < mStartTimeUs) {
1258                    // buffer up to the closest preceding IDR frame
1259                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1260                            timeUs, mStartTimeUs);
1261                    const char *mime;
1262                    sp<MetaData> format  = source->getFormat();
1263                    bool isAvc = false;
1264                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1265                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1266                        isAvc = true;
1267                    }
1268                    if (isAvc && IsIDR(accessUnit)) {
1269                        mVideoBuffer->clear();
1270                    }
1271                    if (isAvc) {
1272                        mVideoBuffer->queueAccessUnit(accessUnit);
1273                    }
1274
1275                    continue;
1276                }
1277            }
1278
1279            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1280            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
1281                int32_t firstSeqNumberInPlaylist;
1282                if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1283                            "media-sequence", &firstSeqNumberInPlaylist)) {
1284                    firstSeqNumberInPlaylist = 0;
1285                }
1286
1287                int32_t targetDurationSecs;
1288                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1289                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1290                // mStartup
1291                //   mStartup is true until we have queued a packet for all the streams
1292                //   we are fetching. We queue packets whose timestamps are greater than
1293                //   mStartTimeUs.
1294                // mSegmentStartTimeUs >= 0
1295                //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1296                // mSeqNumber > firstSeqNumberInPlaylist
1297                //   don't decrement mSeqNumber if it already points to the 1st segment
1298                // timeUs - mStartTimeUs > targetDurationUs:
1299                //   This and the 2 above conditions should only happen when adapting in a live
1300                //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
1301                //   would start fetching after timeUs, which should be greater than mStartTimeUs;
1302                //   the old fetcher would then continue fetching data until timeUs. We don't want
1303                //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
1304                //   stop as early as possible. The definition of being "too far ahead" is
1305                //   arbitrary; here we use targetDurationUs as threshold.
1306                if (mStartup && mSegmentStartTimeUs >= 0
1307                        && mSeqNumber > firstSeqNumberInPlaylist
1308                        && timeUs - mStartTimeUs > targetDurationUs) {
1309                    // we just guessed a starting timestamp that is too high when adapting in a
1310                    // live stream; re-adjust based on the actual timestamp extracted from the
1311                    // media segment; if we didn't move backward after the re-adjustment
1312                    // (newSeqNumber), start at least 1 segment prior.
1313                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1314                    if (newSeqNumber >= mSeqNumber) {
1315                        --mSeqNumber;
1316                    } else {
1317                        mSeqNumber = newSeqNumber;
1318                    }
1319                    mStartTimeUsNotify = mNotify->dup();
1320                    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1321                    return -EAGAIN;
1322                }
1323
1324                int32_t seq;
1325                if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1326                    mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1327                }
1328                int64_t startTimeUs;
1329                if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1330                    mStartTimeUsNotify->setInt64(key, timeUs);
1331
1332                    uint32_t streamMask = 0;
1333                    mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1334                    streamMask |= mPacketSources.keyAt(i);
1335                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1336
1337                    if (streamMask == mStreamTypeMask) {
1338                        mStartup = false;
1339                        mStartTimeUsNotify->post();
1340                        mStartTimeUsNotify.clear();
1341                    }
1342                }
1343            }
1344
1345            if (mStopParams != NULL) {
1346                // Queue discontinuity in original stream.
1347                int32_t discontinuitySeq;
1348                int64_t stopTimeUs;
1349                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1350                        || discontinuitySeq > mDiscontinuitySeq
1351                        || !mStopParams->findInt64(key, &stopTimeUs)
1352                        || (discontinuitySeq == mDiscontinuitySeq
1353                                && timeUs >= stopTimeUs)) {
1354                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1355                    mStreamTypeMask &= ~stream;
1356                    mPacketSources.removeItemsAt(i);
1357                    break;
1358                }
1359            }
1360
1361            // Note that we do NOT dequeue any discontinuities except for format change.
1362            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1363                const bool discard = true;
1364                status_t status;
1365                while (mVideoBuffer->hasBufferAvailable(&status)) {
1366                    sp<ABuffer> videoBuffer;
1367                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1368                    setAccessUnitProperties(videoBuffer, source, discard);
1369                    packetSource->queueAccessUnit(videoBuffer);
1370                }
1371            }
1372
1373            setAccessUnitProperties(accessUnit, source);
1374            packetSource->queueAccessUnit(accessUnit);
1375        }
1376
1377        if (err != OK) {
1378            break;
1379        }
1380    }
1381
1382    if (err != OK) {
1383        for (size_t i = mPacketSources.size(); i-- > 0;) {
1384            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1385            packetSource->clear();
1386        }
1387        return err;
1388    }
1389
1390    if (!mStreamTypeMask) {
1391        // Signal gap is filled between original and new stream.
1392        ALOGV("ERROR OUT OF RANGE");
1393        return ERROR_OUT_OF_RANGE;
1394    }
1395
1396    return OK;
1397}
1398
1399/* static */
1400bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1401        const sp<ABuffer> &buffer) {
1402    size_t pos = 0;
1403
1404    // skip possible BOM
1405    if (buffer->size() >= pos + 3 &&
1406            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1407        pos += 3;
1408    }
1409
1410    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1411    if (buffer->size() < pos + 6 ||
1412            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1413        return false;
1414    }
1415    pos += 6;
1416
1417    if (buffer->size() == pos) {
1418        return true;
1419    }
1420
1421    uint8_t sep = buffer->data()[pos];
1422    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1423}
1424
1425status_t PlaylistFetcher::extractAndQueueAccessUnits(
1426        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1427    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1428        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1429            ALOGE("This stream only contains subtitles.");
1430            return ERROR_MALFORMED;
1431        }
1432
1433        const sp<AnotherPacketSource> packetSource =
1434            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1435
1436        int64_t durationUs;
1437        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1438        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1439        buffer->meta()->setInt64("durationUs", durationUs);
1440        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1441        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1442        buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1443
1444        packetSource->queueAccessUnit(buffer);
1445        return OK;
1446    }
1447
1448    if (mNextPTSTimeUs >= 0ll) {
1449        mFirstPTSValid = false;
1450        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1451        mNextPTSTimeUs = -1ll;
1452    }
1453
1454    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1455    // stream prefixed by an ID3 tag.
1456
1457    bool firstID3Tag = true;
1458    uint64_t PTS = 0;
1459
1460    for (;;) {
1461        // Make sure to skip all ID3 tags preceding the audio data.
1462        // At least one must be present to provide the PTS timestamp.
1463
1464        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1465        if (!id3.isValid()) {
1466            if (firstID3Tag) {
1467                ALOGE("Unable to parse ID3 tag.");
1468                return ERROR_MALFORMED;
1469            } else {
1470                break;
1471            }
1472        }
1473
1474        if (firstID3Tag) {
1475            bool found = false;
1476
1477            ID3::Iterator it(id3, "PRIV");
1478            while (!it.done()) {
1479                size_t length;
1480                const uint8_t *data = it.getData(&length);
1481
1482                static const char *kMatchName =
1483                    "com.apple.streaming.transportStreamTimestamp";
1484                static const size_t kMatchNameLen = strlen(kMatchName);
1485
1486                if (length == kMatchNameLen + 1 + 8
1487                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1488                    found = true;
1489                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1490                }
1491
1492                it.next();
1493            }
1494
1495            if (!found) {
1496                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1497                return ERROR_MALFORMED;
1498            }
1499        }
1500
1501        // skip the ID3 tag
1502        buffer->setRange(
1503                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1504
1505        firstID3Tag = false;
1506    }
1507
1508    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1509        ALOGW("This stream only contains audio data!");
1510
1511        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1512
1513        if (mStreamTypeMask == 0) {
1514            return OK;
1515        }
1516    }
1517
1518    sp<AnotherPacketSource> packetSource =
1519        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1520
1521    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1522        ABitReader bits(buffer->data(), buffer->size());
1523
1524        // adts_fixed_header
1525
1526        CHECK_EQ(bits.getBits(12), 0xfffu);
1527        bits.skipBits(3);  // ID, layer
1528        bool protection_absent = bits.getBits(1) != 0;
1529
1530        unsigned profile = bits.getBits(2);
1531        CHECK_NE(profile, 3u);
1532        unsigned sampling_freq_index = bits.getBits(4);
1533        bits.getBits(1);  // private_bit
1534        unsigned channel_configuration = bits.getBits(3);
1535        CHECK_NE(channel_configuration, 0u);
1536        bits.skipBits(2);  // original_copy, home
1537
1538        sp<MetaData> meta = MakeAACCodecSpecificData(
1539                profile, sampling_freq_index, channel_configuration);
1540
1541        meta->setInt32(kKeyIsADTS, true);
1542
1543        packetSource->setFormat(meta);
1544    }
1545
1546    int64_t numSamples = 0ll;
1547    int32_t sampleRate;
1548    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1549
1550    int64_t timeUs = (PTS * 100ll) / 9ll;
1551    if (!mFirstPTSValid) {
1552        mFirstPTSValid = true;
1553        mFirstTimeUs = timeUs;
1554    }
1555
1556    size_t offset = 0;
1557    while (offset < buffer->size()) {
1558        const uint8_t *adtsHeader = buffer->data() + offset;
1559        CHECK_LT(offset + 5, buffer->size());
1560
1561        unsigned aac_frame_length =
1562            ((adtsHeader[3] & 3) << 11)
1563            | (adtsHeader[4] << 3)
1564            | (adtsHeader[5] >> 5);
1565
1566        if (aac_frame_length == 0) {
1567            const uint8_t *id3Header = adtsHeader;
1568            if (!memcmp(id3Header, "ID3", 3)) {
1569                ID3 id3(id3Header, buffer->size() - offset, true);
1570                if (id3.isValid()) {
1571                    offset += id3.rawSize();
1572                    continue;
1573                };
1574            }
1575            return ERROR_MALFORMED;
1576        }
1577
1578        CHECK_LE(offset + aac_frame_length, buffer->size());
1579
1580        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1581        offset += aac_frame_length;
1582
1583        // Each AAC frame encodes 1024 samples.
1584        numSamples += 1024;
1585
1586        if (mStartup) {
1587            int64_t startTimeUs = unitTimeUs;
1588            if (mStartTimeUsRelative) {
1589                startTimeUs -= mFirstTimeUs;
1590                if (startTimeUs  < 0) {
1591                    startTimeUs = 0;
1592                }
1593            }
1594            if (startTimeUs < mStartTimeUs) {
1595                continue;
1596            }
1597
1598            if (mStartTimeUsNotify != NULL) {
1599                int32_t targetDurationSecs;
1600                CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
1601                int64_t targetDurationUs = targetDurationSecs * 1000000ll;
1602
1603                // Duplicated logic from how we handle .ts playlists.
1604                if (mStartup && mSegmentStartTimeUs >= 0
1605                        && timeUs - mStartTimeUs > targetDurationUs) {
1606                    int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
1607                    if (newSeqNumber >= mSeqNumber) {
1608                        --mSeqNumber;
1609                    } else {
1610                        mSeqNumber = newSeqNumber;
1611                    }
1612                    return -EAGAIN;
1613                }
1614
1615                mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
1616                mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1617                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
1618                mStartTimeUsNotify->post();
1619                mStartTimeUsNotify.clear();
1620                mStartup = false;
1621            }
1622        }
1623
1624        if (mStopParams != NULL) {
1625            // Queue discontinuity in original stream.
1626            int32_t discontinuitySeq;
1627            int64_t stopTimeUs;
1628            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1629                    || discontinuitySeq > mDiscontinuitySeq
1630                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
1631                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
1632                packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1633                mStreamTypeMask = 0;
1634                mPacketSources.clear();
1635                return ERROR_OUT_OF_RANGE;
1636            }
1637        }
1638
1639        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1640        memcpy(unit->data(), adtsHeader, aac_frame_length);
1641
1642        unit->meta()->setInt64("timeUs", unitTimeUs);
1643        setAccessUnitProperties(unit, packetSource);
1644        packetSource->queueAccessUnit(unit);
1645    }
1646
1647    return OK;
1648}
1649
1650void PlaylistFetcher::updateDuration() {
1651    int64_t durationUs = 0ll;
1652    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1653        sp<AMessage> itemMeta;
1654        CHECK(mPlaylist->itemAt(
1655                    index, NULL /* uri */, &itemMeta));
1656
1657        int64_t itemDurationUs;
1658        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1659
1660        durationUs += itemDurationUs;
1661    }
1662
1663    sp<AMessage> msg = mNotify->dup();
1664    msg->setInt32("what", kWhatDurationUpdate);
1665    msg->setInt64("durationUs", durationUs);
1666    msg->post();
1667}
1668
1669int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1670    int64_t durationUs, threshold;
1671    if (msg->findInt64("durationUs", &durationUs)) {
1672        return kNumSkipFrames * durationUs;
1673    }
1674
1675    sp<RefBase> obj;
1676    msg->findObject("format", &obj);
1677    MetaData *format = static_cast<MetaData *>(obj.get());
1678
1679    const char *mime;
1680    CHECK(format->findCString(kKeyMIMEType, &mime));
1681    bool audio = !strncasecmp(mime, "audio/", 6);
1682    if (audio) {
1683        // Assumes 1000 samples per frame.
1684        int32_t sampleRate;
1685        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1686        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1687                * (1000000 / sampleRate) /* sample duration (us) */;
1688    } else {
1689        int32_t frameRate;
1690        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1691            return kNumSkipFrames * (1000000 / frameRate);
1692        }
1693    }
1694
1695    return 500000ll;
1696}
1697
1698}  // namespace android
1699