PlaylistFetcher.cpp revision 0e2c09d56bb1d7d33b0de2f446fe0cf2d5b59fcb
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048;
53const int32_t PlaylistFetcher::kNumSkipFrames = 10;
54
55PlaylistFetcher::PlaylistFetcher(
56        const sp<AMessage> &notify,
57        const sp<LiveSession> &session,
58        const char *uri)
59    : mNotify(notify),
60      mStartTimeUsNotify(notify->dup()),
61      mSession(session),
62      mURI(uri),
63      mStreamTypeMask(0),
64      mStartTimeUs(-1ll),
65      mSegmentStartTimeUs(-1ll),
66      mDiscontinuitySeq(-1ll),
67      mStartTimeUsRelative(false),
68      mLastPlaylistFetchTimeUs(-1ll),
69      mSeqNumber(-1),
70      mNumRetries(0),
71      mStartup(true),
72      mAdaptive(false),
73      mPrepared(false),
74      mNextPTSTimeUs(-1ll),
75      mMonitorQueueGeneration(0),
76      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
77      mFirstPTSValid(false),
78      mAbsoluteTimeAnchorUs(0ll),
79      mVideoBuffer(new AnotherPacketSource(NULL)) {
80    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
81    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
82    mStartTimeUsNotify->setInt32("streamMask", 0);
83}
84
85PlaylistFetcher::~PlaylistFetcher() {
86}
87
88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
89    CHECK(mPlaylist != NULL);
90
91    int32_t firstSeqNumberInPlaylist;
92    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
93                "media-sequence", &firstSeqNumberInPlaylist)) {
94        firstSeqNumberInPlaylist = 0;
95    }
96
97    int32_t lastSeqNumberInPlaylist =
98        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
99
100    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
101    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
102
103    int64_t segmentStartUs = 0ll;
104    for (int32_t index = 0;
105            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
106        sp<AMessage> itemMeta;
107        CHECK(mPlaylist->itemAt(
108                    index, NULL /* uri */, &itemMeta));
109
110        int64_t itemDurationUs;
111        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
112
113        segmentStartUs += itemDurationUs;
114    }
115
116    return segmentStartUs;
117}
118
119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
120    int64_t nowUs = ALooper::GetNowUs();
121
122    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
123        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
124        return 0ll;
125    }
126
127    if (mPlaylist->isComplete()) {
128        return (~0llu >> 1);
129    }
130
131    int32_t targetDurationSecs;
132    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
133
134    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
135
136    int64_t minPlaylistAgeUs;
137
138    switch (mRefreshState) {
139        case INITIAL_MINIMUM_RELOAD_DELAY:
140        {
141            size_t n = mPlaylist->size();
142            if (n > 0) {
143                sp<AMessage> itemMeta;
144                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
145
146                int64_t itemDurationUs;
147                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
148
149                minPlaylistAgeUs = itemDurationUs;
150                break;
151            }
152
153            // fall through
154        }
155
156        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
157        {
158            minPlaylistAgeUs = targetDurationUs / 2;
159            break;
160        }
161
162        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
163        {
164            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
165            break;
166        }
167
168        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
169        {
170            minPlaylistAgeUs = targetDurationUs * 3;
171            break;
172        }
173
174        default:
175            TRESPASS();
176            break;
177    }
178
179    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
180    return delayUs > 0ll ? delayUs : 0ll;
181}
182
183status_t PlaylistFetcher::decryptBuffer(
184        size_t playlistIndex, const sp<ABuffer> &buffer,
185        bool first) {
186    sp<AMessage> itemMeta;
187    bool found = false;
188    AString method;
189
190    for (ssize_t i = playlistIndex; i >= 0; --i) {
191        AString uri;
192        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
193
194        if (itemMeta->findString("cipher-method", &method)) {
195            found = true;
196            break;
197        }
198    }
199
200    if (!found) {
201        method = "NONE";
202    }
203    buffer->meta()->setString("cipher-method", method.c_str());
204
205    if (method == "NONE") {
206        return OK;
207    } else if (!(method == "AES-128")) {
208        ALOGE("Unsupported cipher method '%s'", method.c_str());
209        return ERROR_UNSUPPORTED;
210    }
211
212    AString keyURI;
213    if (!itemMeta->findString("cipher-uri", &keyURI)) {
214        ALOGE("Missing key uri");
215        return ERROR_MALFORMED;
216    }
217
218    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
219
220    sp<ABuffer> key;
221    if (index >= 0) {
222        key = mAESKeyForURI.valueAt(index);
223    } else {
224        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
225
226        if (err < 0) {
227            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
228            return ERROR_IO;
229        } else if (key->size() != 16) {
230            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
231            return ERROR_MALFORMED;
232        }
233
234        mAESKeyForURI.add(keyURI, key);
235    }
236
237    AES_KEY aes_key;
238    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
239        ALOGE("failed to set AES decryption key.");
240        return UNKNOWN_ERROR;
241    }
242
243    size_t n = buffer->size();
244    if (!n) {
245        return OK;
246    }
247    CHECK(n % 16 == 0);
248
249    if (first) {
250        // If decrypting the first block in a file, read the iv from the manifest
251        // or derive the iv from the file's sequence number.
252
253        AString iv;
254        if (itemMeta->findString("cipher-iv", &iv)) {
255            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
256                    || iv.size() != 16 * 2 + 2) {
257                ALOGE("malformed cipher IV '%s'.", iv.c_str());
258                return ERROR_MALFORMED;
259            }
260
261            memset(mAESInitVec, 0, sizeof(mAESInitVec));
262            for (size_t i = 0; i < 16; ++i) {
263                char c1 = tolower(iv.c_str()[2 + 2 * i]);
264                char c2 = tolower(iv.c_str()[3 + 2 * i]);
265                if (!isxdigit(c1) || !isxdigit(c2)) {
266                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
267                    return ERROR_MALFORMED;
268                }
269                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
270                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
271
272                mAESInitVec[i] = nibble1 << 4 | nibble2;
273            }
274        } else {
275            memset(mAESInitVec, 0, sizeof(mAESInitVec));
276            mAESInitVec[15] = mSeqNumber & 0xff;
277            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
278            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
279            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
280        }
281    }
282
283    AES_cbc_encrypt(
284            buffer->data(), buffer->data(), buffer->size(),
285            &aes_key, mAESInitVec, AES_DECRYPT);
286
287    return OK;
288}
289
290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
291    status_t err;
292    AString method;
293    CHECK(buffer->meta()->findString("cipher-method", &method));
294    if (method == "NONE") {
295        return OK;
296    }
297
298    uint8_t padding = 0;
299    if (buffer->size() > 0) {
300        padding = buffer->data()[buffer->size() - 1];
301    }
302
303    if (padding > 16) {
304        return ERROR_MALFORMED;
305    }
306
307    for (size_t i = buffer->size() - padding; i < padding; i++) {
308        if (buffer->data()[i] != padding) {
309            return ERROR_MALFORMED;
310        }
311    }
312
313    buffer->setRange(buffer->offset(), buffer->size() - padding);
314    return OK;
315}
316
317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
318    int64_t maxDelayUs = delayUsToRefreshPlaylist();
319    if (maxDelayUs < minDelayUs) {
320        maxDelayUs = minDelayUs;
321    }
322    if (delayUs > maxDelayUs) {
323        ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
324        delayUs = maxDelayUs;
325    }
326    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
327    msg->setInt32("generation", mMonitorQueueGeneration);
328    msg->post(delayUs);
329}
330
331void PlaylistFetcher::cancelMonitorQueue() {
332    ++mMonitorQueueGeneration;
333}
334
335void PlaylistFetcher::startAsync(
336        const sp<AnotherPacketSource> &audioSource,
337        const sp<AnotherPacketSource> &videoSource,
338        const sp<AnotherPacketSource> &subtitleSource,
339        int64_t startTimeUs,
340        int64_t segmentStartTimeUs,
341        int32_t startDiscontinuitySeq,
342        bool adaptive) {
343    sp<AMessage> msg = new AMessage(kWhatStart, id());
344
345    uint32_t streamTypeMask = 0ul;
346
347    if (audioSource != NULL) {
348        msg->setPointer("audioSource", audioSource.get());
349        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
350    }
351
352    if (videoSource != NULL) {
353        msg->setPointer("videoSource", videoSource.get());
354        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
355    }
356
357    if (subtitleSource != NULL) {
358        msg->setPointer("subtitleSource", subtitleSource.get());
359        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
360    }
361
362    msg->setInt32("streamTypeMask", streamTypeMask);
363    msg->setInt64("startTimeUs", startTimeUs);
364    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
365    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
366    msg->setInt32("adaptive", adaptive);
367    msg->post();
368}
369
370void PlaylistFetcher::pauseAsync() {
371    (new AMessage(kWhatPause, id()))->post();
372}
373
374void PlaylistFetcher::stopAsync(bool clear) {
375    sp<AMessage> msg = new AMessage(kWhatStop, id());
376    msg->setInt32("clear", clear);
377    msg->post();
378}
379
380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
381    AMessage* msg = new AMessage(kWhatResumeUntil, id());
382    msg->setMessage("params", params);
383    msg->post();
384}
385
386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
387    switch (msg->what()) {
388        case kWhatStart:
389        {
390            status_t err = onStart(msg);
391
392            sp<AMessage> notify = mNotify->dup();
393            notify->setInt32("what", kWhatStarted);
394            notify->setInt32("err", err);
395            notify->post();
396            break;
397        }
398
399        case kWhatPause:
400        {
401            onPause();
402
403            sp<AMessage> notify = mNotify->dup();
404            notify->setInt32("what", kWhatPaused);
405            notify->post();
406            break;
407        }
408
409        case kWhatStop:
410        {
411            onStop(msg);
412
413            sp<AMessage> notify = mNotify->dup();
414            notify->setInt32("what", kWhatStopped);
415            notify->post();
416            break;
417        }
418
419        case kWhatMonitorQueue:
420        case kWhatDownloadNext:
421        {
422            int32_t generation;
423            CHECK(msg->findInt32("generation", &generation));
424
425            if (generation != mMonitorQueueGeneration) {
426                // Stale event
427                break;
428            }
429
430            if (msg->what() == kWhatMonitorQueue) {
431                onMonitorQueue();
432            } else {
433                onDownloadNext();
434            }
435            break;
436        }
437
438        case kWhatResumeUntil:
439        {
440            onResumeUntil(msg);
441            break;
442        }
443
444        default:
445            TRESPASS();
446    }
447}
448
449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
450    mPacketSources.clear();
451
452    uint32_t streamTypeMask;
453    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
454
455    int64_t startTimeUs;
456    int64_t segmentStartTimeUs;
457    int32_t startDiscontinuitySeq;
458    int32_t adaptive;
459    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
460    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
461    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
462    CHECK(msg->findInt32("adaptive", &adaptive));
463
464    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
465        void *ptr;
466        CHECK(msg->findPointer("audioSource", &ptr));
467
468        mPacketSources.add(
469                LiveSession::STREAMTYPE_AUDIO,
470                static_cast<AnotherPacketSource *>(ptr));
471    }
472
473    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
474        void *ptr;
475        CHECK(msg->findPointer("videoSource", &ptr));
476
477        mPacketSources.add(
478                LiveSession::STREAMTYPE_VIDEO,
479                static_cast<AnotherPacketSource *>(ptr));
480    }
481
482    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
483        void *ptr;
484        CHECK(msg->findPointer("subtitleSource", &ptr));
485
486        mPacketSources.add(
487                LiveSession::STREAMTYPE_SUBTITLES,
488                static_cast<AnotherPacketSource *>(ptr));
489    }
490
491    mStreamTypeMask = streamTypeMask;
492
493    mSegmentStartTimeUs = segmentStartTimeUs;
494    mDiscontinuitySeq = startDiscontinuitySeq;
495
496    if (startTimeUs >= 0) {
497        mStartTimeUs = startTimeUs;
498        mSeqNumber = -1;
499        mStartup = true;
500        mPrepared = false;
501        mAdaptive = adaptive;
502    }
503
504    postMonitorQueue();
505
506    return OK;
507}
508
509void PlaylistFetcher::onPause() {
510    cancelMonitorQueue();
511}
512
513void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
514    cancelMonitorQueue();
515
516    int32_t clear;
517    CHECK(msg->findInt32("clear", &clear));
518    if (clear) {
519        for (size_t i = 0; i < mPacketSources.size(); i++) {
520            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
521            packetSource->clear();
522        }
523    }
524
525    mPacketSources.clear();
526    mStreamTypeMask = 0;
527}
528
529// Resume until we have reached the boundary timestamps listed in `msg`; when
530// the remaining time is too short (within a resume threshold) stop immediately
531// instead.
532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
533    sp<AMessage> params;
534    CHECK(msg->findMessage("params", &params));
535
536    bool stop = false;
537    for (size_t i = 0; i < mPacketSources.size(); i++) {
538        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
539
540        const char *stopKey;
541        int streamType = mPacketSources.keyAt(i);
542        switch (streamType) {
543        case LiveSession::STREAMTYPE_VIDEO:
544            stopKey = "timeUsVideo";
545            break;
546
547        case LiveSession::STREAMTYPE_AUDIO:
548            stopKey = "timeUsAudio";
549            break;
550
551        case LiveSession::STREAMTYPE_SUBTITLES:
552            stopKey = "timeUsSubtitle";
553            break;
554
555        default:
556            TRESPASS();
557        }
558
559        // Don't resume if we would stop within a resume threshold.
560        int32_t discontinuitySeq;
561        int64_t latestTimeUs = 0, stopTimeUs = 0;
562        sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta();
563        if (latestMeta != NULL
564                && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
565                && discontinuitySeq == mDiscontinuitySeq
566                && latestMeta->findInt64("timeUs", &latestTimeUs)
567                && params->findInt64(stopKey, &stopTimeUs)
568                && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
569            stop = true;
570        }
571    }
572
573    if (stop) {
574        for (size_t i = 0; i < mPacketSources.size(); i++) {
575            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
576        }
577        stopAsync(/* clear = */ false);
578        return OK;
579    }
580
581    mStopParams = params;
582    postMonitorQueue();
583
584    return OK;
585}
586
587void PlaylistFetcher::notifyError(status_t err) {
588    sp<AMessage> notify = mNotify->dup();
589    notify->setInt32("what", kWhatError);
590    notify->setInt32("err", err);
591    notify->post();
592}
593
594void PlaylistFetcher::queueDiscontinuity(
595        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
596    for (size_t i = 0; i < mPacketSources.size(); ++i) {
597        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
598        // (seek will discard buffer by abandoning old fetchers)
599        mPacketSources.valueAt(i)->queueDiscontinuity(
600                type, extra, false /* discard */);
601    }
602}
603
604void PlaylistFetcher::onMonitorQueue() {
605    bool downloadMore = false;
606    refreshPlaylist();
607
608    int32_t targetDurationSecs;
609    int64_t targetDurationUs = kMinBufferedDurationUs;
610    if (mPlaylist != NULL) {
611        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
612        targetDurationUs = targetDurationSecs * 1000000ll;
613    }
614
615    // buffer at least 3 times the target duration, or up to 10 seconds
616    int64_t durationToBufferUs = targetDurationUs * 3;
617    if (durationToBufferUs > kMinBufferedDurationUs)  {
618        durationToBufferUs = kMinBufferedDurationUs;
619    }
620
621    int64_t bufferedDurationUs = 0ll;
622    status_t finalResult = NOT_ENOUGH_DATA;
623    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
624        sp<AnotherPacketSource> packetSource =
625            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
626
627        bufferedDurationUs =
628                packetSource->getBufferedDurationUs(&finalResult);
629        finalResult = OK;
630    } else {
631        // Use max stream duration to prevent us from waiting on a non-existent stream;
632        // when we cannot make out from the manifest what streams are included in a playlist
633        // we might assume extra streams.
634        for (size_t i = 0; i < mPacketSources.size(); ++i) {
635            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
636                continue;
637            }
638
639            int64_t bufferedStreamDurationUs =
640                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
641            ALOGV("buffered %" PRId64 " for stream %d",
642                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
643            if (bufferedStreamDurationUs > bufferedDurationUs) {
644                bufferedDurationUs = bufferedStreamDurationUs;
645            }
646        }
647    }
648    downloadMore = (bufferedDurationUs < durationToBufferUs);
649
650    // signal start if buffered up at least the target size
651    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
652        mPrepared = true;
653
654        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
655                bufferedDurationUs, targetDurationUs);
656        sp<AMessage> msg = mNotify->dup();
657        msg->setInt32("what", kWhatTemporarilyDoneFetching);
658        msg->post();
659    }
660
661    if (finalResult == OK && downloadMore) {
662        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
663                bufferedDurationUs, durationToBufferUs);
664        // delay the next download slightly; hopefully this gives other concurrent fetchers
665        // a better chance to run.
666        // onDownloadNext();
667        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
668        msg->setInt32("generation", mMonitorQueueGeneration);
669        msg->post(1000l);
670    } else {
671        // Nothing to do yet, try again in a second.
672
673        sp<AMessage> msg = mNotify->dup();
674        msg->setInt32("what", kWhatTemporarilyDoneFetching);
675        msg->post();
676
677        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
678        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
679                delayUs, bufferedDurationUs, durationToBufferUs);
680        // :TRICKY: need to enforce minimum delay because the delay to
681        // refresh the playlist will become 0
682        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
683    }
684}
685
686status_t PlaylistFetcher::refreshPlaylist() {
687    if (delayUsToRefreshPlaylist() <= 0) {
688        bool unchanged;
689        sp<M3UParser> playlist = mSession->fetchPlaylist(
690                mURI.c_str(), mPlaylistHash, &unchanged);
691
692        if (playlist == NULL) {
693            if (unchanged) {
694                // We succeeded in fetching the playlist, but it was
695                // unchanged from the last time we tried.
696
697                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
698                    mRefreshState = (RefreshState)(mRefreshState + 1);
699                }
700            } else {
701                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
702                notifyError(ERROR_IO);
703                return ERROR_IO;
704            }
705        } else {
706            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
707            mPlaylist = playlist;
708
709            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
710                updateDuration();
711            }
712        }
713
714        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
715    }
716    return OK;
717}
718
719// static
720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
721    return buffer->size() > 0 && buffer->data()[0] == 0x47;
722}
723
724void PlaylistFetcher::onDownloadNext() {
725    if (refreshPlaylist() != OK) {
726        return;
727    }
728
729    int32_t firstSeqNumberInPlaylist;
730    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
731                "media-sequence", &firstSeqNumberInPlaylist)) {
732        firstSeqNumberInPlaylist = 0;
733    }
734
735    bool discontinuity = false;
736
737    const int32_t lastSeqNumberInPlaylist =
738        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
739
740    if (mDiscontinuitySeq < 0) {
741        mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
742    }
743
744    if (mSeqNumber < 0) {
745        CHECK_GE(mStartTimeUs, 0ll);
746
747        if (mSegmentStartTimeUs < 0) {
748            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
749                // If this is a live session, start 3 segments from the end on connect
750                mSeqNumber = lastSeqNumberInPlaylist - 3;
751                if (mSeqNumber < firstSeqNumberInPlaylist) {
752                    mSeqNumber = firstSeqNumberInPlaylist;
753                }
754            } else {
755                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
756                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
757            }
758            mStartTimeUsRelative = true;
759            ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
760                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
761                    lastSeqNumberInPlaylist);
762        } else {
763            mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
764            if (mAdaptive) {
765                // avoid double fetch/decode
766                mSeqNumber += 1;
767            }
768            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
769            if (mSeqNumber < minSeq) {
770                mSeqNumber = minSeq;
771            }
772
773            if (mSeqNumber < firstSeqNumberInPlaylist) {
774                mSeqNumber = firstSeqNumberInPlaylist;
775            }
776
777            if (mSeqNumber > lastSeqNumberInPlaylist) {
778                mSeqNumber = lastSeqNumberInPlaylist;
779            }
780            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
781                    mSeqNumber, firstSeqNumberInPlaylist,
782                    lastSeqNumberInPlaylist);
783        }
784    }
785
786    if (mSeqNumber < firstSeqNumberInPlaylist
787            || mSeqNumber > lastSeqNumberInPlaylist) {
788        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
789            ++mNumRetries;
790
791            if (mSeqNumber > lastSeqNumberInPlaylist) {
792                // refresh in increasing fraction (1/2, 1/3, ...) of the
793                // playlist's target duration or 3 seconds, whichever is less
794                int32_t targetDurationSecs;
795                CHECK(mPlaylist->meta()->findInt32(
796                        "target-duration", &targetDurationSecs));
797                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
798                        1000000ll / (1 + mNumRetries);
799                if (delayUs > kMaxMonitorDelayUs) {
800                    delayUs = kMaxMonitorDelayUs;
801                }
802                ALOGV("sequence number high: %d from (%d .. %d), "
803                      "monitor in %" PRId64 " (retry=%d)",
804                        mSeqNumber, firstSeqNumberInPlaylist,
805                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
806                postMonitorQueue(delayUs);
807                return;
808            }
809
810            // we've missed the boat, let's start from the lowest sequence
811            // number available and signal a discontinuity.
812
813            ALOGI("We've missed the boat, restarting playback."
814                  "  mStartup=%d, was  looking for %d in %d-%d",
815                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
816                    lastSeqNumberInPlaylist);
817            mSeqNumber = lastSeqNumberInPlaylist - 3;
818            if (mSeqNumber < firstSeqNumberInPlaylist) {
819                mSeqNumber = firstSeqNumberInPlaylist;
820            }
821            discontinuity = true;
822
823            // fall through
824        } else {
825            ALOGE("Cannot find sequence number %d in playlist "
826                 "(contains %d - %d)",
827                 mSeqNumber, firstSeqNumberInPlaylist,
828                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
829
830            notifyError(ERROR_END_OF_STREAM);
831            return;
832        }
833    }
834
835    mNumRetries = 0;
836
837    AString uri;
838    sp<AMessage> itemMeta;
839    CHECK(mPlaylist->itemAt(
840                mSeqNumber - firstSeqNumberInPlaylist,
841                &uri,
842                &itemMeta));
843
844    int32_t val;
845    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
846        mDiscontinuitySeq++;
847        discontinuity = true;
848    }
849
850    int64_t range_offset, range_length;
851    if (!itemMeta->findInt64("range-offset", &range_offset)
852            || !itemMeta->findInt64("range-length", &range_length)) {
853        range_offset = 0;
854        range_length = -1;
855    }
856
857    ALOGV("fetching segment %d from (%d .. %d)",
858          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
859
860    ALOGV("fetching '%s'", uri.c_str());
861
862    sp<DataSource> source;
863    sp<ABuffer> buffer, tsBuffer;
864    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
865    // this avoids interleaved connections to the key and segment file.
866    {
867        sp<ABuffer> junk = new ABuffer(16);
868        junk->setRange(0, 16);
869        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
870                true /* first */);
871        if (err != OK) {
872            notifyError(err);
873            return;
874        }
875    }
876
877    // block-wise download
878    bool startup = mStartup;
879    ssize_t bytesRead;
880    do {
881        bytesRead = mSession->fetchFile(
882                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
883
884        if (bytesRead < 0) {
885            status_t err = bytesRead;
886            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
887            notifyError(err);
888            return;
889        }
890
891        CHECK(buffer != NULL);
892
893        size_t size = buffer->size();
894        // Set decryption range.
895        buffer->setRange(size - bytesRead, bytesRead);
896        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
897                buffer->offset() == 0 /* first */);
898        // Unset decryption range.
899        buffer->setRange(0, size);
900
901        if (err != OK) {
902            ALOGE("decryptBuffer failed w/ error %d", err);
903
904            notifyError(err);
905            return;
906        }
907
908        if (startup || discontinuity) {
909            // Signal discontinuity.
910
911            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
912                // If this was a live event this made no sense since
913                // we don't have access to all the segment before the current
914                // one.
915                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
916            }
917
918            if (discontinuity) {
919                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
920
921                queueDiscontinuity(
922                        ATSParser::DISCONTINUITY_FORMATCHANGE,
923                        NULL /* extra */);
924
925                discontinuity = false;
926            }
927
928            startup = false;
929        }
930
931        err = OK;
932        if (bufferStartsWithTsSyncByte(buffer)) {
933            // Incremental extraction is only supported for MPEG2 transport streams.
934            if (tsBuffer == NULL) {
935                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
936                tsBuffer->setRange(0, 0);
937            } else if (tsBuffer->capacity() != buffer->capacity()) {
938                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
939                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
940                tsBuffer->setRange(tsOff, tsSize);
941            }
942            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
943
944            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
945        }
946
947        if (err == -EAGAIN) {
948            // starting sequence number too low
949            mTSParser.clear();
950            postMonitorQueue();
951            return;
952        } else if (err == ERROR_OUT_OF_RANGE) {
953            // reached stopping point
954            stopAsync(/* clear = */ false);
955            return;
956        } else if (err != OK) {
957            notifyError(err);
958            return;
959        }
960
961    } while (bytesRead != 0);
962
963    if (bufferStartsWithTsSyncByte(buffer)) {
964        // If we still don't see a stream after fetching a full ts segment mark it as
965        // nonexistent.
966        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
967        ATSParser::SourceType srcTypes[kNumTypes] =
968                { ATSParser::VIDEO, ATSParser::AUDIO };
969        LiveSession::StreamType streamTypes[kNumTypes] =
970                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
971
972        for (size_t i = 0; i < kNumTypes; i++) {
973            ATSParser::SourceType srcType = srcTypes[i];
974            LiveSession::StreamType streamType = streamTypes[i];
975
976            sp<AnotherPacketSource> source =
977                static_cast<AnotherPacketSource *>(
978                    mTSParser->getSource(srcType).get());
979
980            if (source == NULL) {
981                ALOGW("MPEG2 Transport stream does not contain %s data.",
982                      srcType == ATSParser::VIDEO ? "video" : "audio");
983
984                mStreamTypeMask &= ~streamType;
985                mPacketSources.removeItem(streamType);
986            }
987        }
988
989    }
990
991    if (checkDecryptPadding(buffer) != OK) {
992        ALOGE("Incorrect padding bytes after decryption.");
993        notifyError(ERROR_MALFORMED);
994        return;
995    }
996
997    status_t err = OK;
998    if (tsBuffer != NULL) {
999        AString method;
1000        CHECK(buffer->meta()->findString("cipher-method", &method));
1001        if ((tsBuffer->size() > 0 && method == "NONE")
1002                || tsBuffer->size() > 16) {
1003            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1004                    "bytes in length.");
1005            notifyError(ERROR_MALFORMED);
1006            return;
1007        }
1008    }
1009
1010    // bulk extract non-ts files
1011    if (tsBuffer == NULL) {
1012      err = extractAndQueueAccessUnits(buffer, itemMeta);
1013    }
1014
1015    if (err != OK) {
1016        notifyError(err);
1017        return;
1018    }
1019
1020    mStartup = false;
1021    ++mSeqNumber;
1022
1023    postMonitorQueue();
1024}
1025
1026int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1027    int32_t firstSeqNumberInPlaylist;
1028    if (mPlaylist->meta() == NULL
1029            || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
1030        firstSeqNumberInPlaylist = 0;
1031    }
1032
1033    size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1034    if (discontinuitySeq < curDiscontinuitySeq) {
1035        return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
1036    }
1037
1038    size_t index = 0;
1039    while (index < mPlaylist->size()) {
1040        sp<AMessage> itemMeta;
1041        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1042
1043        int64_t discontinuity;
1044        if (itemMeta->findInt64("discontinuity", &discontinuity)) {
1045            curDiscontinuitySeq++;
1046        }
1047
1048        if (curDiscontinuitySeq == discontinuitySeq) {
1049            return firstSeqNumberInPlaylist + index;
1050        }
1051
1052        ++index;
1053    }
1054
1055    return firstSeqNumberInPlaylist + mPlaylist->size();
1056}
1057
1058int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1059    int32_t firstSeqNumberInPlaylist;
1060    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
1061                "media-sequence", &firstSeqNumberInPlaylist)) {
1062        firstSeqNumberInPlaylist = 0;
1063    }
1064
1065    size_t index = 0;
1066    int64_t segmentStartUs = 0;
1067    while (index < mPlaylist->size()) {
1068        sp<AMessage> itemMeta;
1069        CHECK(mPlaylist->itemAt(
1070                    index, NULL /* uri */, &itemMeta));
1071
1072        int64_t itemDurationUs;
1073        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1074
1075        if (timeUs < segmentStartUs + itemDurationUs) {
1076            break;
1077        }
1078
1079        segmentStartUs += itemDurationUs;
1080        ++index;
1081    }
1082
1083    if (index >= mPlaylist->size()) {
1084        index = mPlaylist->size() - 1;
1085    }
1086
1087    return firstSeqNumberInPlaylist + index;
1088}
1089
1090const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1091        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1092    sp<MetaData> format = source->getFormat();
1093    if (format != NULL) {
1094        // for simplicity, store a reference to the format in each unit
1095        accessUnit->meta()->setObject("format", format);
1096    }
1097
1098    if (discard) {
1099        accessUnit->meta()->setInt32("discard", discard);
1100    }
1101
1102    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1103    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1104    return accessUnit;
1105}
1106
1107status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1108    if (mTSParser == NULL) {
1109        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1110        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1111    }
1112
1113    if (mNextPTSTimeUs >= 0ll) {
1114        sp<AMessage> extra = new AMessage;
1115        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1116        // ATSParser from skewing the timestamps of access units.
1117        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1118
1119        mTSParser->signalDiscontinuity(
1120                ATSParser::DISCONTINUITY_SEEK, extra);
1121
1122        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1123        mNextPTSTimeUs = -1ll;
1124        mFirstPTSValid = false;
1125    }
1126
1127    size_t offset = 0;
1128    while (offset + 188 <= buffer->size()) {
1129        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1130
1131        if (err != OK) {
1132            return err;
1133        }
1134
1135        offset += 188;
1136    }
1137    // setRange to indicate consumed bytes.
1138    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1139
1140    status_t err = OK;
1141    for (size_t i = mPacketSources.size(); i-- > 0;) {
1142        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1143
1144        const char *key;
1145        ATSParser::SourceType type;
1146        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1147        switch (stream) {
1148            case LiveSession::STREAMTYPE_VIDEO:
1149                type = ATSParser::VIDEO;
1150                key = "timeUsVideo";
1151                break;
1152
1153            case LiveSession::STREAMTYPE_AUDIO:
1154                type = ATSParser::AUDIO;
1155                key = "timeUsAudio";
1156                break;
1157
1158            case LiveSession::STREAMTYPE_SUBTITLES:
1159            {
1160                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1161                return ERROR_MALFORMED;
1162                break;
1163            }
1164
1165            default:
1166                TRESPASS();
1167        }
1168
1169        sp<AnotherPacketSource> source =
1170            static_cast<AnotherPacketSource *>(
1171                    mTSParser->getSource(type).get());
1172
1173        if (source == NULL) {
1174            continue;
1175        }
1176
1177        int64_t timeUs;
1178        sp<ABuffer> accessUnit;
1179        status_t finalResult;
1180        while (source->hasBufferAvailable(&finalResult)
1181                && source->dequeueAccessUnit(&accessUnit) == OK) {
1182
1183            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1184
1185            if (mStartup) {
1186                if (!mFirstPTSValid) {
1187                    mFirstTimeUs = timeUs;
1188                    mFirstPTSValid = true;
1189                }
1190                if (mStartTimeUsRelative) {
1191                    timeUs -= mFirstTimeUs;
1192                    if (timeUs < 0) {
1193                        timeUs = 0;
1194                    }
1195                } else if (mAdaptive && timeUs > mStartTimeUs) {
1196                    int32_t seq;
1197                    if (mStartTimeUsNotify != NULL
1198                            && !mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
1199                        mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
1200                    }
1201                    int64_t startTimeUs;
1202                    if (mStartTimeUsNotify != NULL
1203                            && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1204                        mStartTimeUsNotify->setInt64(key, timeUs);
1205
1206                        uint32_t streamMask = 0;
1207                        mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1208                        streamMask |= mPacketSources.keyAt(i);
1209                        mStartTimeUsNotify->setInt32("streamMask", streamMask);
1210
1211                        if (streamMask == mStreamTypeMask) {
1212                            mStartTimeUsNotify->post();
1213                            mStartTimeUsNotify.clear();
1214                        }
1215                    }
1216                }
1217
1218                if (timeUs < mStartTimeUs) {
1219                    if (mAdaptive) {
1220                        int32_t targetDuration;
1221                        mPlaylist->meta()->findInt32("target-duration", &targetDuration);
1222                        int32_t incr = (mStartTimeUs - timeUs) / 1000000 / targetDuration;
1223                        if (incr == 0) {
1224                            // increment mSeqNumber by at least one
1225                            incr = 1;
1226                        }
1227                        mSeqNumber += incr;
1228                        err = -EAGAIN;
1229                        break;
1230                    } else {
1231                        // buffer up to the closest preceding IDR frame
1232                        ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
1233                                timeUs, mStartTimeUs);
1234                        const char *mime;
1235                        sp<MetaData> format  = source->getFormat();
1236                        bool isAvc = false;
1237                        if (format != NULL && format->findCString(kKeyMIMEType, &mime)
1238                                && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
1239                            isAvc = true;
1240                        }
1241                        if (isAvc && IsIDR(accessUnit)) {
1242                            mVideoBuffer->clear();
1243                        }
1244                        if (isAvc) {
1245                            mVideoBuffer->queueAccessUnit(accessUnit);
1246                        }
1247
1248                        continue;
1249                    }
1250                }
1251            }
1252
1253            if (mStopParams != NULL) {
1254                // Queue discontinuity in original stream.
1255                int32_t discontinuitySeq;
1256                int64_t stopTimeUs;
1257                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1258                        || discontinuitySeq > mDiscontinuitySeq
1259                        || !mStopParams->findInt64(key, &stopTimeUs)
1260                        || (discontinuitySeq == mDiscontinuitySeq
1261                                && timeUs >= stopTimeUs)) {
1262                    packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1263                    mStreamTypeMask &= ~stream;
1264                    mPacketSources.removeItemsAt(i);
1265                    break;
1266                }
1267            }
1268
1269            // Note that we do NOT dequeue any discontinuities except for format change.
1270            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1271                const bool discard = true;
1272                status_t status;
1273                while (mVideoBuffer->hasBufferAvailable(&status)) {
1274                    sp<ABuffer> videoBuffer;
1275                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1276                    setAccessUnitProperties(videoBuffer, source, discard);
1277                    packetSource->queueAccessUnit(videoBuffer);
1278                }
1279            }
1280
1281            setAccessUnitProperties(accessUnit, source);
1282            packetSource->queueAccessUnit(accessUnit);
1283        }
1284
1285        if (err != OK) {
1286            break;
1287        }
1288    }
1289
1290    if (err != OK) {
1291        for (size_t i = mPacketSources.size(); i-- > 0;) {
1292            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1293            packetSource->clear();
1294        }
1295        return err;
1296    }
1297
1298    if (!mStreamTypeMask) {
1299        // Signal gap is filled between original and new stream.
1300        ALOGV("ERROR OUT OF RANGE");
1301        return ERROR_OUT_OF_RANGE;
1302    }
1303
1304    return OK;
1305}
1306
1307/* static */
1308bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1309        const sp<ABuffer> &buffer) {
1310    size_t pos = 0;
1311
1312    // skip possible BOM
1313    if (buffer->size() >= pos + 3 &&
1314            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1315        pos += 3;
1316    }
1317
1318    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1319    if (buffer->size() < pos + 6 ||
1320            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1321        return false;
1322    }
1323    pos += 6;
1324
1325    if (buffer->size() == pos) {
1326        return true;
1327    }
1328
1329    uint8_t sep = buffer->data()[pos];
1330    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1331}
1332
1333status_t PlaylistFetcher::extractAndQueueAccessUnits(
1334        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1335    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1336        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1337            ALOGE("This stream only contains subtitles.");
1338            return ERROR_MALFORMED;
1339        }
1340
1341        const sp<AnotherPacketSource> packetSource =
1342            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1343
1344        int64_t durationUs;
1345        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1346        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1347        buffer->meta()->setInt64("durationUs", durationUs);
1348        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1349        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1350
1351        packetSource->queueAccessUnit(buffer);
1352        return OK;
1353    }
1354
1355    if (mNextPTSTimeUs >= 0ll) {
1356        mFirstPTSValid = false;
1357        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1358        mNextPTSTimeUs = -1ll;
1359    }
1360
1361    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1362    // stream prefixed by an ID3 tag.
1363
1364    bool firstID3Tag = true;
1365    uint64_t PTS = 0;
1366
1367    for (;;) {
1368        // Make sure to skip all ID3 tags preceding the audio data.
1369        // At least one must be present to provide the PTS timestamp.
1370
1371        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1372        if (!id3.isValid()) {
1373            if (firstID3Tag) {
1374                ALOGE("Unable to parse ID3 tag.");
1375                return ERROR_MALFORMED;
1376            } else {
1377                break;
1378            }
1379        }
1380
1381        if (firstID3Tag) {
1382            bool found = false;
1383
1384            ID3::Iterator it(id3, "PRIV");
1385            while (!it.done()) {
1386                size_t length;
1387                const uint8_t *data = it.getData(&length);
1388
1389                static const char *kMatchName =
1390                    "com.apple.streaming.transportStreamTimestamp";
1391                static const size_t kMatchNameLen = strlen(kMatchName);
1392
1393                if (length == kMatchNameLen + 1 + 8
1394                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1395                    found = true;
1396                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1397                }
1398
1399                it.next();
1400            }
1401
1402            if (!found) {
1403                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1404                return ERROR_MALFORMED;
1405            }
1406        }
1407
1408        // skip the ID3 tag
1409        buffer->setRange(
1410                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1411
1412        firstID3Tag = false;
1413    }
1414
1415    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1416        ALOGW("This stream only contains audio data!");
1417
1418        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1419
1420        if (mStreamTypeMask == 0) {
1421            return OK;
1422        }
1423    }
1424
1425    sp<AnotherPacketSource> packetSource =
1426        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1427
1428    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1429        ABitReader bits(buffer->data(), buffer->size());
1430
1431        // adts_fixed_header
1432
1433        CHECK_EQ(bits.getBits(12), 0xfffu);
1434        bits.skipBits(3);  // ID, layer
1435        bool protection_absent = bits.getBits(1) != 0;
1436
1437        unsigned profile = bits.getBits(2);
1438        CHECK_NE(profile, 3u);
1439        unsigned sampling_freq_index = bits.getBits(4);
1440        bits.getBits(1);  // private_bit
1441        unsigned channel_configuration = bits.getBits(3);
1442        CHECK_NE(channel_configuration, 0u);
1443        bits.skipBits(2);  // original_copy, home
1444
1445        sp<MetaData> meta = MakeAACCodecSpecificData(
1446                profile, sampling_freq_index, channel_configuration);
1447
1448        meta->setInt32(kKeyIsADTS, true);
1449
1450        packetSource->setFormat(meta);
1451    }
1452
1453    int64_t numSamples = 0ll;
1454    int32_t sampleRate;
1455    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1456
1457    int64_t timeUs = (PTS * 100ll) / 9ll;
1458    if (!mFirstPTSValid) {
1459        mFirstPTSValid = true;
1460        mFirstTimeUs = timeUs;
1461    }
1462
1463    size_t offset = 0;
1464    while (offset < buffer->size()) {
1465        const uint8_t *adtsHeader = buffer->data() + offset;
1466        CHECK_LT(offset + 5, buffer->size());
1467
1468        unsigned aac_frame_length =
1469            ((adtsHeader[3] & 3) << 11)
1470            | (adtsHeader[4] << 3)
1471            | (adtsHeader[5] >> 5);
1472
1473        if (aac_frame_length == 0) {
1474            const uint8_t *id3Header = adtsHeader;
1475            if (!memcmp(id3Header, "ID3", 3)) {
1476                ID3 id3(id3Header, buffer->size() - offset, true);
1477                if (id3.isValid()) {
1478                    offset += id3.rawSize();
1479                    continue;
1480                };
1481            }
1482            return ERROR_MALFORMED;
1483        }
1484
1485        CHECK_LE(offset + aac_frame_length, buffer->size());
1486
1487        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1488        offset += aac_frame_length;
1489
1490        // Each AAC frame encodes 1024 samples.
1491        numSamples += 1024;
1492
1493        if (mStartup) {
1494            int64_t startTimeUs = unitTimeUs;
1495            if (mStartTimeUsRelative) {
1496                startTimeUs -= mFirstTimeUs;
1497                if (startTimeUs  < 0) {
1498                    startTimeUs = 0;
1499                }
1500            }
1501            if (startTimeUs < mStartTimeUs) {
1502                continue;
1503            }
1504        }
1505
1506        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1507        memcpy(unit->data(), adtsHeader, aac_frame_length);
1508
1509        unit->meta()->setInt64("timeUs", unitTimeUs);
1510        unit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1511        unit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1512        packetSource->queueAccessUnit(unit);
1513    }
1514
1515    return OK;
1516}
1517
1518void PlaylistFetcher::updateDuration() {
1519    int64_t durationUs = 0ll;
1520    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1521        sp<AMessage> itemMeta;
1522        CHECK(mPlaylist->itemAt(
1523                    index, NULL /* uri */, &itemMeta));
1524
1525        int64_t itemDurationUs;
1526        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1527
1528        durationUs += itemDurationUs;
1529    }
1530
1531    sp<AMessage> msg = mNotify->dup();
1532    msg->setInt32("what", kWhatDurationUpdate);
1533    msg->setInt64("durationUs", durationUs);
1534    msg->post();
1535}
1536
1537int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1538    int64_t durationUs, threshold;
1539    if (msg->findInt64("durationUs", &durationUs)) {
1540        return kNumSkipFrames * durationUs;
1541    }
1542
1543    sp<RefBase> obj;
1544    msg->findObject("format", &obj);
1545    MetaData *format = static_cast<MetaData *>(obj.get());
1546
1547    const char *mime;
1548    CHECK(format->findCString(kKeyMIMEType, &mime));
1549    bool audio = !strncasecmp(mime, "audio/", 6);
1550    if (audio) {
1551        // Assumes 1000 samples per frame.
1552        int32_t sampleRate;
1553        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1554        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1555                * (1000000 / sampleRate) /* sample duration (us) */;
1556    } else {
1557        int32_t frameRate;
1558        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1559            return kNumSkipFrames * (1000000 / frameRate);
1560        }
1561    }
1562
1563    return 500000ll;
1564}
1565
1566}  // namespace android
1567