1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20#include <utils/misc.h>
21
22#include "PlaylistFetcher.h"
23#include "HTTPDownloader.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26#include "include/avc_utils.h"
27#include "include/ID3.h"
28#include "mpeg2ts/AnotherPacketSource.h"
29
30#include <media/stagefright/foundation/ABitReader.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/MediaDefs.h>
34#include <media/stagefright/MetaData.h>
35#include <media/stagefright/Utils.h>
36
37#include <ctype.h>
38#include <inttypes.h>
39#include <openssl/aes.h>
40
41#define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42#define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43         LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44
45namespace android {
46
47// static
48const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50// LCM of 188 (size of a TS packet) & 1k works well
51const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52
53struct PlaylistFetcher::DownloadState : public RefBase {
54    DownloadState();
55    void resetState();
56    bool hasSavedState() const;
57    void restoreState(
58            AString &uri,
59            sp<AMessage> &itemMeta,
60            sp<ABuffer> &buffer,
61            sp<ABuffer> &tsBuffer,
62            int32_t &firstSeqNumberInPlaylist,
63            int32_t &lastSeqNumberInPlaylist);
64    void saveState(
65            AString &uri,
66            sp<AMessage> &itemMeta,
67            sp<ABuffer> &buffer,
68            sp<ABuffer> &tsBuffer,
69            int32_t &firstSeqNumberInPlaylist,
70            int32_t &lastSeqNumberInPlaylist);
71
72private:
73    bool mHasSavedState;
74    AString mUri;
75    sp<AMessage> mItemMeta;
76    sp<ABuffer> mBuffer;
77    sp<ABuffer> mTsBuffer;
78    int32_t mFirstSeqNumberInPlaylist;
79    int32_t mLastSeqNumberInPlaylist;
80};
81
82PlaylistFetcher::DownloadState::DownloadState() {
83    resetState();
84}
85
86bool PlaylistFetcher::DownloadState::hasSavedState() const {
87    return mHasSavedState;
88}
89
90void PlaylistFetcher::DownloadState::resetState() {
91    mHasSavedState = false;
92
93    mUri.clear();
94    mItemMeta = NULL;
95    mBuffer = NULL;
96    mTsBuffer = NULL;
97    mFirstSeqNumberInPlaylist = 0;
98    mLastSeqNumberInPlaylist = 0;
99}
100
101void PlaylistFetcher::DownloadState::restoreState(
102        AString &uri,
103        sp<AMessage> &itemMeta,
104        sp<ABuffer> &buffer,
105        sp<ABuffer> &tsBuffer,
106        int32_t &firstSeqNumberInPlaylist,
107        int32_t &lastSeqNumberInPlaylist) {
108    if (!mHasSavedState) {
109        return;
110    }
111
112    uri = mUri;
113    itemMeta = mItemMeta;
114    buffer = mBuffer;
115    tsBuffer = mTsBuffer;
116    firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117    lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118
119    resetState();
120}
121
122void PlaylistFetcher::DownloadState::saveState(
123        AString &uri,
124        sp<AMessage> &itemMeta,
125        sp<ABuffer> &buffer,
126        sp<ABuffer> &tsBuffer,
127        int32_t &firstSeqNumberInPlaylist,
128        int32_t &lastSeqNumberInPlaylist) {
129    mHasSavedState = true;
130
131    mUri = uri;
132    mItemMeta = itemMeta;
133    mBuffer = buffer;
134    mTsBuffer = tsBuffer;
135    mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136    mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137}
138
139PlaylistFetcher::PlaylistFetcher(
140        const sp<AMessage> &notify,
141        const sp<LiveSession> &session,
142        const char *uri,
143        int32_t id,
144        int32_t subtitleGeneration)
145    : mNotify(notify),
146      mSession(session),
147      mURI(uri),
148      mFetcherID(id),
149      mStreamTypeMask(0),
150      mStartTimeUs(-1ll),
151      mSegmentStartTimeUs(-1ll),
152      mDiscontinuitySeq(-1ll),
153      mStartTimeUsRelative(false),
154      mLastPlaylistFetchTimeUs(-1ll),
155      mPlaylistTimeUs(-1ll),
156      mSeqNumber(-1),
157      mNumRetries(0),
158      mStartup(true),
159      mIDRFound(false),
160      mSeekMode(LiveSession::kSeekModeExactPosition),
161      mTimeChangeSignaled(false),
162      mNextPTSTimeUs(-1ll),
163      mMonitorQueueGeneration(0),
164      mSubtitleGeneration(subtitleGeneration),
165      mLastDiscontinuitySeq(-1ll),
166      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167      mFirstPTSValid(false),
168      mFirstTimeUs(-1ll),
169      mVideoBuffer(new AnotherPacketSource(NULL)),
170      mThresholdRatio(-1.0f),
171      mDownloadState(new DownloadState()),
172      mHasMetadata(false) {
173    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
174    mHTTPDownloader = mSession->getHTTPDownloader();
175}
176
177PlaylistFetcher::~PlaylistFetcher() {
178}
179
180int32_t PlaylistFetcher::getFetcherID() const {
181    return mFetcherID;
182}
183
184int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
185    CHECK(mPlaylist != NULL);
186
187    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
188    mPlaylist->getSeqNumberRange(
189            &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
190
191    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
192    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
193
194    int64_t segmentStartUs = 0ll;
195    for (int32_t index = 0;
196            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
197        sp<AMessage> itemMeta;
198        CHECK(mPlaylist->itemAt(
199                    index, NULL /* uri */, &itemMeta));
200
201        int64_t itemDurationUs;
202        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
203
204        segmentStartUs += itemDurationUs;
205    }
206
207    return segmentStartUs;
208}
209
210int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
211    CHECK(mPlaylist != NULL);
212
213    int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
214    mPlaylist->getSeqNumberRange(
215            &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
216
217    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
218    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
219
220    int32_t index = seqNumber - firstSeqNumberInPlaylist;
221    sp<AMessage> itemMeta;
222    CHECK(mPlaylist->itemAt(
223                index, NULL /* uri */, &itemMeta));
224
225    int64_t itemDurationUs;
226    CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
227
228    return itemDurationUs;
229}
230
231int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
232    int64_t nowUs = ALooper::GetNowUs();
233
234    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
235        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
236        return 0ll;
237    }
238
239    if (mPlaylist->isComplete()) {
240        return (~0llu >> 1);
241    }
242
243    int64_t targetDurationUs = mPlaylist->getTargetDuration();
244
245    int64_t minPlaylistAgeUs;
246
247    switch (mRefreshState) {
248        case INITIAL_MINIMUM_RELOAD_DELAY:
249        {
250            size_t n = mPlaylist->size();
251            if (n > 0) {
252                sp<AMessage> itemMeta;
253                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
254
255                int64_t itemDurationUs;
256                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
257
258                minPlaylistAgeUs = itemDurationUs;
259                break;
260            }
261
262            // fall through
263        }
264
265        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
266        {
267            minPlaylistAgeUs = targetDurationUs / 2;
268            break;
269        }
270
271        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
272        {
273            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
274            break;
275        }
276
277        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
278        {
279            minPlaylistAgeUs = targetDurationUs * 3;
280            break;
281        }
282
283        default:
284            TRESPASS();
285            break;
286    }
287
288    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
289    return delayUs > 0ll ? delayUs : 0ll;
290}
291
292status_t PlaylistFetcher::decryptBuffer(
293        size_t playlistIndex, const sp<ABuffer> &buffer,
294        bool first) {
295    sp<AMessage> itemMeta;
296    bool found = false;
297    AString method;
298
299    for (ssize_t i = playlistIndex; i >= 0; --i) {
300        AString uri;
301        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
302
303        if (itemMeta->findString("cipher-method", &method)) {
304            found = true;
305            break;
306        }
307    }
308
309    if (!found) {
310        method = "NONE";
311    }
312    buffer->meta()->setString("cipher-method", method.c_str());
313
314    if (method == "NONE") {
315        return OK;
316    } else if (!(method == "AES-128")) {
317        ALOGE("Unsupported cipher method '%s'", method.c_str());
318        return ERROR_UNSUPPORTED;
319    }
320
321    AString keyURI;
322    if (!itemMeta->findString("cipher-uri", &keyURI)) {
323        ALOGE("Missing key uri");
324        return ERROR_MALFORMED;
325    }
326
327    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
328
329    sp<ABuffer> key;
330    if (index >= 0) {
331        key = mAESKeyForURI.valueAt(index);
332    } else {
333        ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
334
335        if (err == ERROR_NOT_CONNECTED) {
336            return ERROR_NOT_CONNECTED;
337        } else if (err < 0) {
338            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
339            return ERROR_IO;
340        } else if (key->size() != 16) {
341            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
342            return ERROR_MALFORMED;
343        }
344
345        mAESKeyForURI.add(keyURI, key);
346    }
347
348    AES_KEY aes_key;
349    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
350        ALOGE("failed to set AES decryption key.");
351        return UNKNOWN_ERROR;
352    }
353
354    size_t n = buffer->size();
355    if (!n) {
356        return OK;
357    }
358
359    if (n < 16 || n % 16) {
360        ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
361        return ERROR_MALFORMED;
362    }
363
364    if (first) {
365        // If decrypting the first block in a file, read the iv from the manifest
366        // or derive the iv from the file's sequence number.
367
368        AString iv;
369        if (itemMeta->findString("cipher-iv", &iv)) {
370            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
371                    || iv.size() > 16 * 2 + 2) {
372                ALOGE("malformed cipher IV '%s'.", iv.c_str());
373                return ERROR_MALFORMED;
374            }
375
376            while (iv.size() < 16 * 2 + 2) {
377                iv.insert("0", 1, 2);
378            }
379
380            memset(mAESInitVec, 0, sizeof(mAESInitVec));
381            for (size_t i = 0; i < 16; ++i) {
382                char c1 = tolower(iv.c_str()[2 + 2 * i]);
383                char c2 = tolower(iv.c_str()[3 + 2 * i]);
384                if (!isxdigit(c1) || !isxdigit(c2)) {
385                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
386                    return ERROR_MALFORMED;
387                }
388                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
389                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
390
391                mAESInitVec[i] = nibble1 << 4 | nibble2;
392            }
393        } else {
394            memset(mAESInitVec, 0, sizeof(mAESInitVec));
395            mAESInitVec[15] = mSeqNumber & 0xff;
396            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
397            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
398            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
399        }
400    }
401
402    AES_cbc_encrypt(
403            buffer->data(), buffer->data(), buffer->size(),
404            &aes_key, mAESInitVec, AES_DECRYPT);
405
406    return OK;
407}
408
409status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
410    AString method;
411    CHECK(buffer->meta()->findString("cipher-method", &method));
412    if (method == "NONE") {
413        return OK;
414    }
415
416    uint8_t padding = 0;
417    if (buffer->size() > 0) {
418        padding = buffer->data()[buffer->size() - 1];
419    }
420
421    if (padding > 16) {
422        return ERROR_MALFORMED;
423    }
424
425    for (size_t i = buffer->size() - padding; i < padding; i++) {
426        if (buffer->data()[i] != padding) {
427            return ERROR_MALFORMED;
428        }
429    }
430
431    buffer->setRange(buffer->offset(), buffer->size() - padding);
432    return OK;
433}
434
435void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
436    int64_t maxDelayUs = delayUsToRefreshPlaylist();
437    if (maxDelayUs < minDelayUs) {
438        maxDelayUs = minDelayUs;
439    }
440    if (delayUs > maxDelayUs) {
441        FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
442        delayUs = maxDelayUs;
443    }
444    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
445    msg->setInt32("generation", mMonitorQueueGeneration);
446    msg->post(delayUs);
447}
448
449void PlaylistFetcher::cancelMonitorQueue() {
450    ++mMonitorQueueGeneration;
451}
452
453void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
454    {
455        AutoMutex _l(mThresholdLock);
456        mThresholdRatio = thresholdRatio;
457    }
458    if (disconnect) {
459        mHTTPDownloader->disconnect();
460    }
461}
462
463void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
464    {
465        AutoMutex _l(mThresholdLock);
466        mThresholdRatio = -1.0f;
467    }
468    if (disconnect) {
469        mHTTPDownloader->disconnect();
470    } else {
471        // allow reconnect
472        mHTTPDownloader->reconnect();
473    }
474}
475
476float PlaylistFetcher::getStoppingThreshold() {
477    AutoMutex _l(mThresholdLock);
478    return mThresholdRatio;
479}
480
481void PlaylistFetcher::startAsync(
482        const sp<AnotherPacketSource> &audioSource,
483        const sp<AnotherPacketSource> &videoSource,
484        const sp<AnotherPacketSource> &subtitleSource,
485        const sp<AnotherPacketSource> &metadataSource,
486        int64_t startTimeUs,
487        int64_t segmentStartTimeUs,
488        int32_t startDiscontinuitySeq,
489        LiveSession::SeekMode seekMode) {
490    sp<AMessage> msg = new AMessage(kWhatStart, this);
491
492    uint32_t streamTypeMask = 0ul;
493
494    if (audioSource != NULL) {
495        msg->setPointer("audioSource", audioSource.get());
496        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
497    }
498
499    if (videoSource != NULL) {
500        msg->setPointer("videoSource", videoSource.get());
501        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
502    }
503
504    if (subtitleSource != NULL) {
505        msg->setPointer("subtitleSource", subtitleSource.get());
506        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
507    }
508
509    if (metadataSource != NULL) {
510        msg->setPointer("metadataSource", metadataSource.get());
511        // metadataSource does not affect streamTypeMask.
512    }
513
514    msg->setInt32("streamTypeMask", streamTypeMask);
515    msg->setInt64("startTimeUs", startTimeUs);
516    msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
517    msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
518    msg->setInt32("seekMode", seekMode);
519    msg->post();
520}
521
522/*
523 * pauseAsync
524 *
525 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
526 *           -1.0f - pause after finishing current segment
527 *        0.0~1.0f - pause if remaining of current segment exceeds threshold
528 */
529void PlaylistFetcher::pauseAsync(
530        float thresholdRatio, bool disconnect) {
531    setStoppingThreshold(thresholdRatio, disconnect);
532
533    (new AMessage(kWhatPause, this))->post();
534}
535
536void PlaylistFetcher::stopAsync(bool clear) {
537    setStoppingThreshold(0.0f, true /* disconncect */);
538
539    sp<AMessage> msg = new AMessage(kWhatStop, this);
540    msg->setInt32("clear", clear);
541    msg->post();
542}
543
544void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
545    FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
546
547    AMessage* msg = new AMessage(kWhatResumeUntil, this);
548    msg->setMessage("params", params);
549    msg->post();
550}
551
552void PlaylistFetcher::fetchPlaylistAsync() {
553    (new AMessage(kWhatFetchPlaylist, this))->post();
554}
555
556void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
557    switch (msg->what()) {
558        case kWhatStart:
559        {
560            status_t err = onStart(msg);
561
562            sp<AMessage> notify = mNotify->dup();
563            notify->setInt32("what", kWhatStarted);
564            notify->setInt32("err", err);
565            notify->post();
566            break;
567        }
568
569        case kWhatPause:
570        {
571            onPause();
572
573            sp<AMessage> notify = mNotify->dup();
574            notify->setInt32("what", kWhatPaused);
575            notify->setInt32("seekMode",
576                    mDownloadState->hasSavedState()
577                    ? LiveSession::kSeekModeNextSample
578                    : LiveSession::kSeekModeNextSegment);
579            notify->post();
580            break;
581        }
582
583        case kWhatStop:
584        {
585            onStop(msg);
586
587            sp<AMessage> notify = mNotify->dup();
588            notify->setInt32("what", kWhatStopped);
589            notify->post();
590            break;
591        }
592
593        case kWhatFetchPlaylist:
594        {
595            bool unchanged;
596            sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
597                    mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
598
599            sp<AMessage> notify = mNotify->dup();
600            notify->setInt32("what", kWhatPlaylistFetched);
601            notify->setObject("playlist", playlist);
602            notify->post();
603            break;
604        }
605
606        case kWhatMonitorQueue:
607        case kWhatDownloadNext:
608        {
609            int32_t generation;
610            CHECK(msg->findInt32("generation", &generation));
611
612            if (generation != mMonitorQueueGeneration) {
613                // Stale event
614                break;
615            }
616
617            if (msg->what() == kWhatMonitorQueue) {
618                onMonitorQueue();
619            } else {
620                onDownloadNext();
621            }
622            break;
623        }
624
625        case kWhatResumeUntil:
626        {
627            onResumeUntil(msg);
628            break;
629        }
630
631        default:
632            TRESPASS();
633    }
634}
635
636status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
637    mPacketSources.clear();
638    mStopParams.clear();
639    mStartTimeUsNotify = mNotify->dup();
640    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
641    mStartTimeUsNotify->setString("uri", mURI);
642
643    uint32_t streamTypeMask;
644    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
645
646    int64_t startTimeUs;
647    int64_t segmentStartTimeUs;
648    int32_t startDiscontinuitySeq;
649    int32_t seekMode;
650    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
651    CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
652    CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
653    CHECK(msg->findInt32("seekMode", &seekMode));
654
655    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
656        void *ptr;
657        CHECK(msg->findPointer("audioSource", &ptr));
658
659        mPacketSources.add(
660                LiveSession::STREAMTYPE_AUDIO,
661                static_cast<AnotherPacketSource *>(ptr));
662    }
663
664    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
665        void *ptr;
666        CHECK(msg->findPointer("videoSource", &ptr));
667
668        mPacketSources.add(
669                LiveSession::STREAMTYPE_VIDEO,
670                static_cast<AnotherPacketSource *>(ptr));
671    }
672
673    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
674        void *ptr;
675        CHECK(msg->findPointer("subtitleSource", &ptr));
676
677        mPacketSources.add(
678                LiveSession::STREAMTYPE_SUBTITLES,
679                static_cast<AnotherPacketSource *>(ptr));
680    }
681
682    void *ptr;
683    // metadataSource is not part of streamTypeMask
684    if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
685            && msg->findPointer("metadataSource", &ptr)) {
686        mPacketSources.add(
687                LiveSession::STREAMTYPE_METADATA,
688                static_cast<AnotherPacketSource *>(ptr));
689    }
690
691    mStreamTypeMask = streamTypeMask;
692
693    mSegmentStartTimeUs = segmentStartTimeUs;
694
695    if (startDiscontinuitySeq >= 0) {
696        mDiscontinuitySeq = startDiscontinuitySeq;
697    }
698
699    mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
700    mSeekMode = (LiveSession::SeekMode) seekMode;
701
702    if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
703        mStartup = true;
704        mIDRFound = false;
705        mVideoBuffer->clear();
706    }
707
708    if (startTimeUs >= 0) {
709        mStartTimeUs = startTimeUs;
710        mFirstPTSValid = false;
711        mSeqNumber = -1;
712        mTimeChangeSignaled = false;
713        mDownloadState->resetState();
714    }
715
716    postMonitorQueue();
717
718    return OK;
719}
720
721void PlaylistFetcher::onPause() {
722    cancelMonitorQueue();
723    mLastDiscontinuitySeq = mDiscontinuitySeq;
724
725    resetStoppingThreshold(false /* disconnect */);
726}
727
728void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
729    cancelMonitorQueue();
730
731    int32_t clear;
732    CHECK(msg->findInt32("clear", &clear));
733    if (clear) {
734        for (size_t i = 0; i < mPacketSources.size(); i++) {
735            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
736            packetSource->clear();
737        }
738    }
739
740    mDownloadState->resetState();
741    mPacketSources.clear();
742    mStreamTypeMask = 0;
743
744    resetStoppingThreshold(true /* disconnect */);
745}
746
747// Resume until we have reached the boundary timestamps listed in `msg`; when
748// the remaining time is too short (within a resume threshold) stop immediately
749// instead.
750status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
751    sp<AMessage> params;
752    CHECK(msg->findMessage("params", &params));
753
754    mStopParams = params;
755    onDownloadNext();
756
757    return OK;
758}
759
760void PlaylistFetcher::notifyStopReached() {
761    sp<AMessage> notify = mNotify->dup();
762    notify->setInt32("what", kWhatStopReached);
763    notify->post();
764}
765
766void PlaylistFetcher::notifyError(status_t err) {
767    sp<AMessage> notify = mNotify->dup();
768    notify->setInt32("what", kWhatError);
769    notify->setInt32("err", err);
770    notify->post();
771}
772
773void PlaylistFetcher::queueDiscontinuity(
774        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
775    for (size_t i = 0; i < mPacketSources.size(); ++i) {
776        // do not discard buffer upon #EXT-X-DISCONTINUITY tag
777        // (seek will discard buffer by abandoning old fetchers)
778        mPacketSources.valueAt(i)->queueDiscontinuity(
779                type, extra, false /* discard */);
780    }
781}
782
783void PlaylistFetcher::onMonitorQueue() {
784    // in the middle of an unfinished download, delay
785    // playlist refresh as it'll change seq numbers
786    if (!mDownloadState->hasSavedState()) {
787        refreshPlaylist();
788    }
789
790    int64_t targetDurationUs = kMinBufferedDurationUs;
791    if (mPlaylist != NULL) {
792        targetDurationUs = mPlaylist->getTargetDuration();
793    }
794
795    int64_t bufferedDurationUs = 0ll;
796    status_t finalResult = OK;
797    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
798        sp<AnotherPacketSource> packetSource =
799            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
800
801        bufferedDurationUs =
802                packetSource->getBufferedDurationUs(&finalResult);
803    } else {
804        // Use min stream duration, but ignore streams that never have any packet
805        // enqueued to prevent us from waiting on a non-existent stream;
806        // when we cannot make out from the manifest what streams are included in
807        // a playlist we might assume extra streams.
808        bufferedDurationUs = -1ll;
809        for (size_t i = 0; i < mPacketSources.size(); ++i) {
810            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
811                    || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
812                continue;
813            }
814
815            int64_t bufferedStreamDurationUs =
816                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
817
818            FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
819
820            if (bufferedDurationUs == -1ll
821                 || bufferedStreamDurationUs < bufferedDurationUs) {
822                bufferedDurationUs = bufferedStreamDurationUs;
823            }
824        }
825        if (bufferedDurationUs == -1ll) {
826            bufferedDurationUs = 0ll;
827        }
828    }
829
830    if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
831        FLOGV("monitoring, buffered=%lld < %lld",
832                (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
833
834        // delay the next download slightly; hopefully this gives other concurrent fetchers
835        // a better chance to run.
836        // onDownloadNext();
837        sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
838        msg->setInt32("generation", mMonitorQueueGeneration);
839        msg->post(1000l);
840    } else {
841        // We'd like to maintain buffering above durationToBufferUs, so try
842        // again when buffer just about to go below durationToBufferUs
843        // (or after targetDurationUs / 2, whichever is smaller).
844        int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
845        if (delayUs > targetDurationUs / 2) {
846            delayUs = targetDurationUs / 2;
847        }
848
849        FLOGV("pausing for %lld, buffered=%lld > %lld",
850                (long long)delayUs,
851                (long long)bufferedDurationUs,
852                (long long)kMinBufferedDurationUs);
853
854        postMonitorQueue(delayUs);
855    }
856}
857
858status_t PlaylistFetcher::refreshPlaylist() {
859    if (delayUsToRefreshPlaylist() <= 0) {
860        bool unchanged;
861        sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
862                mURI.c_str(), mPlaylistHash, &unchanged);
863
864        if (playlist == NULL) {
865            if (unchanged) {
866                // We succeeded in fetching the playlist, but it was
867                // unchanged from the last time we tried.
868
869                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
870                    mRefreshState = (RefreshState)(mRefreshState + 1);
871                }
872            } else {
873                ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
874                return ERROR_IO;
875            }
876        } else {
877            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
878            mPlaylist = playlist;
879
880            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
881                updateDuration();
882            }
883            // Notify LiveSession to use target-duration based buffering level
884            // for up/down switch. Default LiveSession::kUpSwitchMark may not
885            // be reachable for live streams, as our max buffering amount is
886            // limited to 3 segments.
887            if (!mPlaylist->isComplete()) {
888                updateTargetDuration();
889            }
890            mPlaylistTimeUs = ALooper::GetNowUs();
891        }
892
893        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
894    }
895    return OK;
896}
897
898// static
899bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
900    return buffer->size() > 0 && buffer->data()[0] == 0x47;
901}
902
903bool PlaylistFetcher::shouldPauseDownload() {
904    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
905        // doesn't apply to subtitles
906        return false;
907    }
908
909    // Calculate threshold to abort current download
910    float thresholdRatio = getStoppingThreshold();
911
912    if (thresholdRatio < 0.0f) {
913        // never abort
914        return false;
915    } else if (thresholdRatio == 0.0f) {
916        // immediately abort
917        return true;
918    }
919
920    // now we have a positive thresholdUs, abort if remaining
921    // portion to download is over that threshold.
922    if (mSegmentFirstPTS < 0) {
923        // this means we haven't even find the first access unit,
924        // abort now as we must be very far away from the end.
925        return true;
926    }
927    int64_t lastEnqueueUs = mSegmentFirstPTS;
928    for (size_t i = 0; i < mPacketSources.size(); ++i) {
929        if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
930            continue;
931        }
932        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
933        int32_t type;
934        if (meta == NULL || meta->findInt32("discontinuity", &type)) {
935            continue;
936        }
937        int64_t tmpUs;
938        CHECK(meta->findInt64("timeUs", &tmpUs));
939        if (tmpUs > lastEnqueueUs) {
940            lastEnqueueUs = tmpUs;
941        }
942    }
943    lastEnqueueUs -= mSegmentFirstPTS;
944
945    int64_t targetDurationUs = mPlaylist->getTargetDuration();
946    int64_t thresholdUs = thresholdRatio * targetDurationUs;
947
948    FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
949            targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
950            (long long)thresholdUs,
951            (long long)(targetDurationUs - lastEnqueueUs));
952
953    if (targetDurationUs - lastEnqueueUs > thresholdUs) {
954        return true;
955    }
956    return false;
957}
958
959bool PlaylistFetcher::initDownloadState(
960        AString &uri,
961        sp<AMessage> &itemMeta,
962        int32_t &firstSeqNumberInPlaylist,
963        int32_t &lastSeqNumberInPlaylist) {
964    status_t err = refreshPlaylist();
965    firstSeqNumberInPlaylist = 0;
966    lastSeqNumberInPlaylist = 0;
967    bool discontinuity = false;
968
969    if (mPlaylist != NULL) {
970        mPlaylist->getSeqNumberRange(
971                &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
972
973        if (mDiscontinuitySeq < 0) {
974            mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
975        }
976    }
977
978    mSegmentFirstPTS = -1ll;
979
980    if (mPlaylist != NULL && mSeqNumber < 0) {
981        CHECK_GE(mStartTimeUs, 0ll);
982
983        if (mSegmentStartTimeUs < 0) {
984            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
985                // If this is a live session, start 3 segments from the end on connect
986                mSeqNumber = lastSeqNumberInPlaylist - 3;
987                if (mSeqNumber < firstSeqNumberInPlaylist) {
988                    mSeqNumber = firstSeqNumberInPlaylist;
989                }
990            } else {
991                // When seeking mSegmentStartTimeUs is unavailable (< 0), we
992                // use mStartTimeUs (client supplied timestamp) to determine both start segment
993                // and relative position inside a segment
994                mSeqNumber = getSeqNumberForTime(mStartTimeUs);
995                mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
996            }
997            mStartTimeUsRelative = true;
998            FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
999                    (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1000                    lastSeqNumberInPlaylist);
1001        } else {
1002            // When adapting or track switching, mSegmentStartTimeUs (relative
1003            // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1004            // timestamps coming from the media container) is used to determine the position
1005            // inside a segments.
1006            if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1007                    && mSeekMode != LiveSession::kSeekModeNextSample) {
1008                // avoid double fetch/decode
1009                // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1010                // for the starting segment in new variant.
1011                // If the two variants' segments are aligned, this gives the
1012                // next segment. If they're not aligned, this gives the segment
1013                // that overlaps no more than 1/2 * targetDurationUs.
1014                mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1015                        + mPlaylist->getTargetDuration() / 2);
1016            } else {
1017                mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1018            }
1019            ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1020            if (mSeqNumber < minSeq) {
1021                mSeqNumber = minSeq;
1022            }
1023
1024            if (mSeqNumber < firstSeqNumberInPlaylist) {
1025                mSeqNumber = firstSeqNumberInPlaylist;
1026            }
1027
1028            if (mSeqNumber > lastSeqNumberInPlaylist) {
1029                mSeqNumber = lastSeqNumberInPlaylist;
1030            }
1031            FLOGV("Initial sequence number is %d from (%d .. %d)",
1032                    mSeqNumber, firstSeqNumberInPlaylist,
1033                    lastSeqNumberInPlaylist);
1034        }
1035    }
1036
1037    // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1038    if (mSeqNumber < firstSeqNumberInPlaylist
1039            || mSeqNumber > lastSeqNumberInPlaylist
1040            || err != OK) {
1041        if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1042            ++mNumRetries;
1043
1044            if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1045                // make sure we reach this retry logic on refresh failures
1046                // by adding an err != OK clause to all enclosing if's.
1047
1048                // refresh in increasing fraction (1/2, 1/3, ...) of the
1049                // playlist's target duration or 3 seconds, whichever is less
1050                int64_t delayUs = kMaxMonitorDelayUs;
1051                if (mPlaylist != NULL) {
1052                    delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1053                            / (1 + mNumRetries);
1054                }
1055                if (delayUs > kMaxMonitorDelayUs) {
1056                    delayUs = kMaxMonitorDelayUs;
1057                }
1058                FLOGV("sequence number high: %d from (%d .. %d), "
1059                      "monitor in %lld (retry=%d)",
1060                        mSeqNumber, firstSeqNumberInPlaylist,
1061                        lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1062                postMonitorQueue(delayUs);
1063                return false;
1064            }
1065
1066            if (err != OK) {
1067                notifyError(err);
1068                return false;
1069            }
1070
1071            // we've missed the boat, let's start 3 segments prior to the latest sequence
1072            // number available and signal a discontinuity.
1073
1074            ALOGI("We've missed the boat, restarting playback."
1075                  "  mStartup=%d, was  looking for %d in %d-%d",
1076                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1077                    lastSeqNumberInPlaylist);
1078            if (mStopParams != NULL) {
1079                // we should have kept on fetching until we hit the boundaries in mStopParams,
1080                // but since the segments we are supposed to fetch have already rolled off
1081                // the playlist, i.e. we have already missed the boat, we inevitably have to
1082                // skip.
1083                notifyStopReached();
1084                return false;
1085            }
1086            mSeqNumber = lastSeqNumberInPlaylist - 3;
1087            if (mSeqNumber < firstSeqNumberInPlaylist) {
1088                mSeqNumber = firstSeqNumberInPlaylist;
1089            }
1090            discontinuity = true;
1091
1092            // fall through
1093        } else {
1094            if (mPlaylist != NULL) {
1095                if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1096                        && !mPlaylist->isComplete()) {
1097                    // Live playlists
1098                    ALOGW("sequence number %d not yet available", mSeqNumber);
1099                    postMonitorQueue(delayUsToRefreshPlaylist());
1100                    return false;
1101                }
1102                ALOGE("Cannot find sequence number %d in playlist "
1103                     "(contains %d - %d)",
1104                     mSeqNumber, firstSeqNumberInPlaylist,
1105                      firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1106
1107                if (mTSParser != NULL) {
1108                    mTSParser->signalEOS(ERROR_END_OF_STREAM);
1109                    // Use an empty buffer; we don't have any new data, just want to extract
1110                    // potential new access units after flush.  Reset mSeqNumber to
1111                    // lastSeqNumberInPlaylist such that we set the correct access unit
1112                    // properties in extractAndQueueAccessUnitsFromTs.
1113                    sp<ABuffer> buffer = new ABuffer(0);
1114                    mSeqNumber = lastSeqNumberInPlaylist;
1115                    extractAndQueueAccessUnitsFromTs(buffer);
1116                }
1117                notifyError(ERROR_END_OF_STREAM);
1118            } else {
1119                // It's possible that we were never able to download the playlist.
1120                // In this case we should notify error, instead of EOS, as EOS during
1121                // prepare means we succeeded in downloading everything.
1122                ALOGE("Failed to download playlist!");
1123                notifyError(ERROR_IO);
1124            }
1125
1126            return false;
1127        }
1128    }
1129
1130    mNumRetries = 0;
1131
1132    CHECK(mPlaylist->itemAt(
1133                mSeqNumber - firstSeqNumberInPlaylist,
1134                &uri,
1135                &itemMeta));
1136
1137    CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1138
1139    int32_t val;
1140    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1141        discontinuity = true;
1142    } else if (mLastDiscontinuitySeq >= 0
1143            && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1144        // Seek jumped to a new discontinuity sequence. We need to signal
1145        // a format change to decoder. Decoder needs to shutdown and be
1146        // created again if seamless format change is unsupported.
1147        FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1148                "mDiscontinuitySeq %d, mStartTimeUs %lld",
1149                mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1150        discontinuity = true;
1151    }
1152    mLastDiscontinuitySeq = -1;
1153
1154    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1155    // this avoids interleaved connections to the key and segment file.
1156    {
1157        sp<ABuffer> junk = new ABuffer(16);
1158        junk->setRange(0, 16);
1159        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1160                true /* first */);
1161        if (err == ERROR_NOT_CONNECTED) {
1162            return false;
1163        } else if (err != OK) {
1164            notifyError(err);
1165            return false;
1166        }
1167    }
1168
1169    if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1170        // We need to signal a time discontinuity to ATSParser on the
1171        // first segment after start, or on a discontinuity segment.
1172        // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1173        // to send the time discontinuity.
1174        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1175            // If this was a live event this made no sense since
1176            // we don't have access to all the segment before the current
1177            // one.
1178            mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1179        }
1180
1181        // Setting mTimeChangeSignaled to true, so that if start time
1182        // searching goes into 2nd segment (without a discontinuity),
1183        // we don't reset time again. It causes corruption when pending
1184        // data in ATSParser is cleared.
1185        mTimeChangeSignaled = true;
1186    }
1187
1188    if (discontinuity) {
1189        ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1190
1191        // Signal a format discontinuity to ATSParser to clear partial data
1192        // from previous streams. Not doing this causes bitstream corruption.
1193        if (mTSParser != NULL) {
1194            mTSParser.clear();
1195        }
1196
1197        queueDiscontinuity(
1198                ATSParser::DISCONTINUITY_FORMAT_ONLY,
1199                NULL /* extra */);
1200
1201        if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1202            // This means we guessed mStartTimeUs to be in the previous
1203            // segment (likely very close to the end), but either video or
1204            // audio has not found start by the end of that segment.
1205            //
1206            // If this new segment is not a discontinuity, keep searching.
1207            //
1208            // If this new segment even got a discontinuity marker, just
1209            // set mStartTimeUs=0, and take all samples from now on.
1210            mStartTimeUs = 0;
1211            mFirstPTSValid = false;
1212            mIDRFound = false;
1213            mVideoBuffer->clear();
1214        }
1215    }
1216
1217    FLOGV("fetching segment %d from (%d .. %d)",
1218            mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1219    return true;
1220}
1221
1222void PlaylistFetcher::onDownloadNext() {
1223    AString uri;
1224    sp<AMessage> itemMeta;
1225    sp<ABuffer> buffer;
1226    sp<ABuffer> tsBuffer;
1227    int32_t firstSeqNumberInPlaylist = 0;
1228    int32_t lastSeqNumberInPlaylist = 0;
1229    bool connectHTTP = true;
1230
1231    if (mDownloadState->hasSavedState()) {
1232        mDownloadState->restoreState(
1233                uri,
1234                itemMeta,
1235                buffer,
1236                tsBuffer,
1237                firstSeqNumberInPlaylist,
1238                lastSeqNumberInPlaylist);
1239        connectHTTP = false;
1240        FLOGV("resuming: '%s'", uri.c_str());
1241    } else {
1242        if (!initDownloadState(
1243                uri,
1244                itemMeta,
1245                firstSeqNumberInPlaylist,
1246                lastSeqNumberInPlaylist)) {
1247            return;
1248        }
1249        FLOGV("fetching: '%s'", uri.c_str());
1250    }
1251
1252    int64_t range_offset, range_length;
1253    if (!itemMeta->findInt64("range-offset", &range_offset)
1254            || !itemMeta->findInt64("range-length", &range_length)) {
1255        range_offset = 0;
1256        range_length = -1;
1257    }
1258
1259    // block-wise download
1260    bool shouldPause = false;
1261    ssize_t bytesRead;
1262    do {
1263        int64_t startUs = ALooper::GetNowUs();
1264        bytesRead = mHTTPDownloader->fetchBlock(
1265                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1266                NULL /* actualURL */, connectHTTP);
1267        int64_t delayUs = ALooper::GetNowUs() - startUs;
1268
1269        if (bytesRead == ERROR_NOT_CONNECTED) {
1270            return;
1271        }
1272        if (bytesRead < 0) {
1273            status_t err = bytesRead;
1274            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1275            notifyError(err);
1276            return;
1277        }
1278
1279        // add sample for bandwidth estimation, excluding samples from subtitles (as
1280        // its too small), or during startup/resumeUntil (when we could have more than
1281        // one connection open which affects bandwidth)
1282        if (!mStartup && mStopParams == NULL && bytesRead > 0
1283                && (mStreamTypeMask
1284                        & (LiveSession::STREAMTYPE_AUDIO
1285                        | LiveSession::STREAMTYPE_VIDEO))) {
1286            mSession->addBandwidthMeasurement(bytesRead, delayUs);
1287            if (delayUs > 2000000ll) {
1288                FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1289                        bytesRead, (double)delayUs / 1.0e6);
1290            }
1291        }
1292
1293        connectHTTP = false;
1294
1295        CHECK(buffer != NULL);
1296
1297        size_t size = buffer->size();
1298        // Set decryption range.
1299        buffer->setRange(size - bytesRead, bytesRead);
1300        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1301                buffer->offset() == 0 /* first */);
1302        // Unset decryption range.
1303        buffer->setRange(0, size);
1304
1305        if (err != OK) {
1306            ALOGE("decryptBuffer failed w/ error %d", err);
1307
1308            notifyError(err);
1309            return;
1310        }
1311
1312        bool startUp = mStartup; // save current start up state
1313
1314        err = OK;
1315        if (bufferStartsWithTsSyncByte(buffer)) {
1316            // Incremental extraction is only supported for MPEG2 transport streams.
1317            if (tsBuffer == NULL) {
1318                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1319                tsBuffer->setRange(0, 0);
1320            } else if (tsBuffer->capacity() != buffer->capacity()) {
1321                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1322                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1323                tsBuffer->setRange(tsOff, tsSize);
1324            }
1325            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1326            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1327        }
1328
1329        if (err == -EAGAIN) {
1330            // starting sequence number too low/high
1331            mTSParser.clear();
1332            for (size_t i = 0; i < mPacketSources.size(); i++) {
1333                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1334                packetSource->clear();
1335            }
1336            postMonitorQueue();
1337            return;
1338        } else if (err == ERROR_OUT_OF_RANGE) {
1339            // reached stopping point
1340            notifyStopReached();
1341            return;
1342        } else if (err != OK) {
1343            notifyError(err);
1344            return;
1345        }
1346        // If we're switching, post start notification
1347        // this should only be posted when the last chunk is full processed by TSParser
1348        if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1349            CHECK(mStartTimeUsNotify != NULL);
1350            mStartTimeUsNotify->post();
1351            mStartTimeUsNotify.clear();
1352            shouldPause = true;
1353        }
1354        if (shouldPause || shouldPauseDownload()) {
1355            // save state and return if this is not the last chunk,
1356            // leaving the fetcher in paused state.
1357            if (bytesRead != 0) {
1358                mDownloadState->saveState(
1359                        uri,
1360                        itemMeta,
1361                        buffer,
1362                        tsBuffer,
1363                        firstSeqNumberInPlaylist,
1364                        lastSeqNumberInPlaylist);
1365                return;
1366            }
1367            shouldPause = true;
1368        }
1369    } while (bytesRead != 0);
1370
1371    if (bufferStartsWithTsSyncByte(buffer)) {
1372        // If we don't see a stream in the program table after fetching a full ts segment
1373        // mark it as nonexistent.
1374        ATSParser::SourceType srcTypes[] =
1375                { ATSParser::VIDEO, ATSParser::AUDIO };
1376        LiveSession::StreamType streamTypes[] =
1377                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1378        const size_t kNumTypes = NELEM(srcTypes);
1379
1380        for (size_t i = 0; i < kNumTypes; i++) {
1381            ATSParser::SourceType srcType = srcTypes[i];
1382            LiveSession::StreamType streamType = streamTypes[i];
1383
1384            sp<AnotherPacketSource> source =
1385                static_cast<AnotherPacketSource *>(
1386                    mTSParser->getSource(srcType).get());
1387
1388            if (!mTSParser->hasSource(srcType)) {
1389                ALOGW("MPEG2 Transport stream does not contain %s data.",
1390                      srcType == ATSParser::VIDEO ? "video" : "audio");
1391
1392                mStreamTypeMask &= ~streamType;
1393                mPacketSources.removeItem(streamType);
1394            }
1395        }
1396
1397    }
1398
1399    if (checkDecryptPadding(buffer) != OK) {
1400        ALOGE("Incorrect padding bytes after decryption.");
1401        notifyError(ERROR_MALFORMED);
1402        return;
1403    }
1404
1405    if (tsBuffer != NULL) {
1406        AString method;
1407        CHECK(buffer->meta()->findString("cipher-method", &method));
1408        if ((tsBuffer->size() > 0 && method == "NONE")
1409                || tsBuffer->size() > 16) {
1410            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1411                    "bytes in length.");
1412            notifyError(ERROR_MALFORMED);
1413            return;
1414        }
1415    }
1416
1417    // bulk extract non-ts files
1418    bool startUp = mStartup;
1419    if (tsBuffer == NULL) {
1420        status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1421        if (err == -EAGAIN) {
1422            // starting sequence number too low/high
1423            postMonitorQueue();
1424            return;
1425        } else if (err == ERROR_OUT_OF_RANGE) {
1426            // reached stopping point
1427            notifyStopReached();
1428            return;
1429        } else if (err != OK) {
1430            notifyError(err);
1431            return;
1432        }
1433    }
1434
1435    ++mSeqNumber;
1436
1437    // if adapting, pause after found the next starting point
1438    if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1439        CHECK(mStartTimeUsNotify != NULL);
1440        mStartTimeUsNotify->post();
1441        mStartTimeUsNotify.clear();
1442        shouldPause = true;
1443    }
1444
1445    if (!shouldPause) {
1446        postMonitorQueue();
1447    }
1448}
1449
1450/*
1451 * returns true if we need to adjust mSeqNumber
1452 */
1453bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1454    int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1455
1456    int64_t minDiffUs, maxDiffUs;
1457    if (mSeekMode == LiveSession::kSeekModeNextSample) {
1458        // if the previous fetcher paused in the middle of a segment, we
1459        // want to start at a segment that overlaps the last sample
1460        minDiffUs = -mPlaylist->getTargetDuration();
1461        maxDiffUs = 0ll;
1462    } else {
1463        // if the previous fetcher paused at the end of a segment, ideally
1464        // we want to start at the segment that's roughly aligned with its
1465        // next segment, but if the two variants are not well aligned we
1466        // adjust the diff to within (-T/2, T/2)
1467        minDiffUs = -mPlaylist->getTargetDuration() / 2;
1468        maxDiffUs = mPlaylist->getTargetDuration() / 2;
1469    }
1470
1471    int32_t oldSeqNumber = mSeqNumber;
1472    ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1473
1474    // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1475    int64_t diffUs = anchorTimeUs - mStartTimeUs;
1476    if (diffUs > maxDiffUs) {
1477        while (index > 0 && diffUs > maxDiffUs) {
1478            --index;
1479
1480            sp<AMessage> itemMeta;
1481            CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1482
1483            int64_t itemDurationUs;
1484            CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1485
1486            diffUs -= itemDurationUs;
1487        }
1488    } else if (diffUs < minDiffUs) {
1489        while (index + 1 < (ssize_t) mPlaylist->size()
1490                && diffUs < minDiffUs) {
1491            ++index;
1492
1493            sp<AMessage> itemMeta;
1494            CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1495
1496            int64_t itemDurationUs;
1497            CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1498
1499            diffUs += itemDurationUs;
1500        }
1501    }
1502
1503    mSeqNumber = firstSeqNumberInPlaylist + index;
1504
1505    if (mSeqNumber != oldSeqNumber) {
1506        FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1507                (long long) anchorTimeUs - mStartTimeUs,
1508                (long long) minDiffUs,
1509                (long long) maxDiffUs);
1510        return true;
1511    }
1512    return false;
1513}
1514
1515int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1516    int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1517
1518    size_t index = 0;
1519    while (index < mPlaylist->size()) {
1520        sp<AMessage> itemMeta;
1521        CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1522        size_t curDiscontinuitySeq;
1523        CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1524        int32_t seqNumber = firstSeqNumberInPlaylist + index;
1525        if (curDiscontinuitySeq == discontinuitySeq) {
1526            return seqNumber;
1527        } else if (curDiscontinuitySeq > discontinuitySeq) {
1528            return seqNumber <= 0 ? 0 : seqNumber - 1;
1529        }
1530
1531        ++index;
1532    }
1533
1534    return firstSeqNumberInPlaylist + mPlaylist->size();
1535}
1536
1537int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1538    size_t index = 0;
1539    int64_t segmentStartUs = 0;
1540    while (index < mPlaylist->size()) {
1541        sp<AMessage> itemMeta;
1542        CHECK(mPlaylist->itemAt(
1543                    index, NULL /* uri */, &itemMeta));
1544
1545        int64_t itemDurationUs;
1546        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1547
1548        if (timeUs < segmentStartUs + itemDurationUs) {
1549            break;
1550        }
1551
1552        segmentStartUs += itemDurationUs;
1553        ++index;
1554    }
1555
1556    if (index >= mPlaylist->size()) {
1557        index = mPlaylist->size() - 1;
1558    }
1559
1560    return mPlaylist->getFirstSeqNumber() + index;
1561}
1562
1563const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1564        const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1565    sp<MetaData> format = source->getFormat();
1566    if (format != NULL) {
1567        // for simplicity, store a reference to the format in each unit
1568        accessUnit->meta()->setObject("format", format);
1569    }
1570
1571    if (discard) {
1572        accessUnit->meta()->setInt32("discard", discard);
1573    }
1574
1575    accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1576    accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1577    accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1578    accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1579    if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1580        accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1581    }
1582    return accessUnit;
1583}
1584
1585bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1586    if (!mFirstPTSValid) {
1587        mFirstTimeUs = timeUs;
1588        mFirstPTSValid = true;
1589    }
1590    bool startTimeReached = true;
1591    if (mStartTimeUsRelative) {
1592        FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1593                (long long)timeUs,
1594                (long long)mFirstTimeUs,
1595                (long long)(timeUs - mFirstTimeUs));
1596        timeUs -= mFirstTimeUs;
1597        if (timeUs < 0) {
1598            FLOGV("clamp negative timeUs to 0");
1599            timeUs = 0;
1600        }
1601        startTimeReached = (timeUs >= mStartTimeUs);
1602    }
1603    return startTimeReached;
1604}
1605
1606status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1607    if (mTSParser == NULL) {
1608        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1609        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1610    }
1611
1612    if (mNextPTSTimeUs >= 0ll) {
1613        sp<AMessage> extra = new AMessage;
1614        // Since we are using absolute timestamps, signal an offset of 0 to prevent
1615        // ATSParser from skewing the timestamps of access units.
1616        extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1617
1618        // When adapting, signal a recent media time to the parser,
1619        // so that PTS wrap around is handled for the new variant.
1620        if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1621            extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1622        }
1623
1624        mTSParser->signalDiscontinuity(
1625                ATSParser::DISCONTINUITY_TIME, extra);
1626
1627        mNextPTSTimeUs = -1ll;
1628    }
1629
1630    size_t offset = 0;
1631    while (offset + 188 <= buffer->size()) {
1632        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1633
1634        if (err != OK) {
1635            return err;
1636        }
1637
1638        offset += 188;
1639    }
1640    // setRange to indicate consumed bytes.
1641    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1642
1643    if (mSegmentFirstPTS < 0ll) {
1644        // get the smallest first PTS from all streams present in this parser
1645        for (size_t i = mPacketSources.size(); i > 0;) {
1646            i--;
1647            const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1648            if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1649                ALOGE("MPEG2 Transport streams do not contain subtitles.");
1650                return ERROR_MALFORMED;
1651            }
1652            if (stream == LiveSession::STREAMTYPE_METADATA) {
1653                continue;
1654            }
1655            ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1656            sp<AnotherPacketSource> source =
1657                static_cast<AnotherPacketSource *>(
1658                        mTSParser->getSource(type).get());
1659
1660            if (source == NULL) {
1661                continue;
1662            }
1663            sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1664            if (meta != NULL) {
1665                int64_t timeUs;
1666                CHECK(meta->findInt64("timeUs", &timeUs));
1667                if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1668                    mSegmentFirstPTS = timeUs;
1669                }
1670            }
1671        }
1672        if (mSegmentFirstPTS < 0ll) {
1673            // didn't find any TS packet, can return early
1674            return OK;
1675        }
1676        if (!mStartTimeUsRelative) {
1677            // mStartup
1678            //   mStartup is true until we have queued a packet for all the streams
1679            //   we are fetching. We queue packets whose timestamps are greater than
1680            //   mStartTimeUs.
1681            // mSegmentStartTimeUs >= 0
1682            //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1683            // adjustSeqNumberWithAnchorTime(timeUs) == true
1684            //   we guessed a seq number that's either too large or too small.
1685            // If this happens, we'll adjust mSeqNumber and restart fetching from new
1686            // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1687            // to -1 so that we don't enter this chunk next time.
1688            if (mStartup && mSegmentStartTimeUs >= 0
1689                    && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1690                mStartTimeUsNotify = mNotify->dup();
1691                mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1692                mStartTimeUsNotify->setString("uri", mURI);
1693                mIDRFound = false;
1694                mSegmentStartTimeUs = -1;
1695                return -EAGAIN;
1696            }
1697        }
1698    }
1699
1700    status_t err = OK;
1701    for (size_t i = mPacketSources.size(); i > 0;) {
1702        i--;
1703        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1704
1705        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1706        if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1707            ALOGE("MPEG2 Transport streams do not contain subtitles.");
1708            return ERROR_MALFORMED;
1709        }
1710
1711        const char *key = LiveSession::getKeyForStream(stream);
1712        ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1713
1714        sp<AnotherPacketSource> source =
1715            static_cast<AnotherPacketSource *>(
1716                    mTSParser->getSource(type).get());
1717
1718        if (source == NULL) {
1719            continue;
1720        }
1721
1722        const char *mime;
1723        sp<MetaData> format  = source->getFormat();
1724        bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1725                && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1726
1727        sp<ABuffer> accessUnit;
1728        status_t finalResult;
1729        while (source->hasBufferAvailable(&finalResult)
1730                && source->dequeueAccessUnit(&accessUnit) == OK) {
1731
1732            int64_t timeUs;
1733            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1734
1735            if (mStartup) {
1736                bool startTimeReached = isStartTimeReached(timeUs);
1737
1738                if (!startTimeReached || (isAvc && !mIDRFound)) {
1739                    // buffer up to the closest preceding IDR frame in the next segement,
1740                    // or the closest succeeding IDR frame after the exact position
1741                    FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1742                            (long long)timeUs,
1743                            (long long)mStartTimeUs,
1744                            (long long)timeUs - mStartTimeUs,
1745                            mIDRFound);
1746                    if (isAvc) {
1747                        if (IsIDR(accessUnit)) {
1748                            mVideoBuffer->clear();
1749                            FSLOGV(stream, "found IDR, clear mVideoBuffer");
1750                            mIDRFound = true;
1751                        }
1752                        if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1753                            mVideoBuffer->queueAccessUnit(accessUnit);
1754                            FSLOGV(stream, "saving AVC video AccessUnit");
1755                        }
1756                    }
1757                    if (!startTimeReached || (isAvc && !mIDRFound)) {
1758                        continue;
1759                    }
1760                }
1761            }
1762
1763            if (mStartTimeUsNotify != NULL) {
1764                uint32_t streamMask = 0;
1765                mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1766                if ((mStreamTypeMask & mPacketSources.keyAt(i))
1767                        && !(streamMask & mPacketSources.keyAt(i))) {
1768                    streamMask |= mPacketSources.keyAt(i);
1769                    mStartTimeUsNotify->setInt32("streamMask", streamMask);
1770                    FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1771                            (long long)timeUs, streamMask);
1772
1773                    if (streamMask == mStreamTypeMask) {
1774                        FLOGV("found start point for all streams");
1775                        mStartup = false;
1776                    }
1777                }
1778            }
1779
1780            if (mStopParams != NULL) {
1781                int32_t discontinuitySeq;
1782                int64_t stopTimeUs;
1783                if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1784                        || discontinuitySeq > mDiscontinuitySeq
1785                        || !mStopParams->findInt64(key, &stopTimeUs)
1786                        || (discontinuitySeq == mDiscontinuitySeq
1787                                && timeUs >= stopTimeUs)) {
1788                    FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1789                    mStreamTypeMask &= ~stream;
1790                    mPacketSources.removeItemsAt(i);
1791                    break;
1792                }
1793            }
1794
1795            if (stream == LiveSession::STREAMTYPE_VIDEO) {
1796                const bool discard = true;
1797                status_t status;
1798                while (mVideoBuffer->hasBufferAvailable(&status)) {
1799                    sp<ABuffer> videoBuffer;
1800                    mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1801                    setAccessUnitProperties(videoBuffer, source, discard);
1802                    packetSource->queueAccessUnit(videoBuffer);
1803                    int64_t bufferTimeUs;
1804                    CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1805                    FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1806                            (long long)bufferTimeUs);
1807                }
1808            } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1809                mHasMetadata = true;
1810                sp<AMessage> notify = mNotify->dup();
1811                notify->setInt32("what", kWhatMetadataDetected);
1812                notify->post();
1813            }
1814
1815            setAccessUnitProperties(accessUnit, source);
1816            packetSource->queueAccessUnit(accessUnit);
1817            FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1818        }
1819
1820        if (err != OK) {
1821            break;
1822        }
1823    }
1824
1825    if (err != OK) {
1826        for (size_t i = mPacketSources.size(); i > 0;) {
1827            i--;
1828            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1829            packetSource->clear();
1830        }
1831        return err;
1832    }
1833
1834    if (!mStreamTypeMask) {
1835        // Signal gap is filled between original and new stream.
1836        FLOGV("reached stop point for all streams");
1837        return ERROR_OUT_OF_RANGE;
1838    }
1839
1840    return OK;
1841}
1842
1843/* static */
1844bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1845        const sp<ABuffer> &buffer) {
1846    size_t pos = 0;
1847
1848    // skip possible BOM
1849    if (buffer->size() >= pos + 3 &&
1850            !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1851        pos += 3;
1852    }
1853
1854    // accept WEBVTT followed by SPACE, TAB or (CR) LF
1855    if (buffer->size() < pos + 6 ||
1856            memcmp("WEBVTT", buffer->data() + pos, 6)) {
1857        return false;
1858    }
1859    pos += 6;
1860
1861    if (buffer->size() == pos) {
1862        return true;
1863    }
1864
1865    uint8_t sep = buffer->data()[pos];
1866    return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1867}
1868
1869status_t PlaylistFetcher::extractAndQueueAccessUnits(
1870        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1871    if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1872        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1873            ALOGE("This stream only contains subtitles.");
1874            return ERROR_MALFORMED;
1875        }
1876
1877        const sp<AnotherPacketSource> packetSource =
1878            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1879
1880        int64_t durationUs;
1881        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1882        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1883        buffer->meta()->setInt64("durationUs", durationUs);
1884        buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1885        buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1886        buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1887        packetSource->queueAccessUnit(buffer);
1888        return OK;
1889    }
1890
1891    if (mNextPTSTimeUs >= 0ll) {
1892        mNextPTSTimeUs = -1ll;
1893    }
1894
1895    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1896    // stream prefixed by an ID3 tag.
1897
1898    bool firstID3Tag = true;
1899    uint64_t PTS = 0;
1900
1901    for (;;) {
1902        // Make sure to skip all ID3 tags preceding the audio data.
1903        // At least one must be present to provide the PTS timestamp.
1904
1905        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1906        if (!id3.isValid()) {
1907            if (firstID3Tag) {
1908                ALOGE("Unable to parse ID3 tag.");
1909                return ERROR_MALFORMED;
1910            } else {
1911                break;
1912            }
1913        }
1914
1915        if (firstID3Tag) {
1916            bool found = false;
1917
1918            ID3::Iterator it(id3, "PRIV");
1919            while (!it.done()) {
1920                size_t length;
1921                const uint8_t *data = it.getData(&length);
1922                if (!data) {
1923                    return ERROR_MALFORMED;
1924                }
1925
1926                static const char *kMatchName =
1927                    "com.apple.streaming.transportStreamTimestamp";
1928                static const size_t kMatchNameLen = strlen(kMatchName);
1929
1930                if (length == kMatchNameLen + 1 + 8
1931                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1932                    found = true;
1933                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1934                }
1935
1936                it.next();
1937            }
1938
1939            if (!found) {
1940                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1941                return ERROR_MALFORMED;
1942            }
1943        }
1944
1945        // skip the ID3 tag
1946        buffer->setRange(
1947                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1948
1949        firstID3Tag = false;
1950    }
1951
1952    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1953        ALOGW("This stream only contains audio data!");
1954
1955        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1956
1957        if (mStreamTypeMask == 0) {
1958            return OK;
1959        }
1960    }
1961
1962    sp<AnotherPacketSource> packetSource =
1963        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1964
1965    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1966        ABitReader bits(buffer->data(), buffer->size());
1967
1968        // adts_fixed_header
1969
1970        CHECK_EQ(bits.getBits(12), 0xfffu);
1971        bits.skipBits(3);  // ID, layer
1972        bool protection_absent __unused = bits.getBits(1) != 0;
1973
1974        unsigned profile = bits.getBits(2);
1975        CHECK_NE(profile, 3u);
1976        unsigned sampling_freq_index = bits.getBits(4);
1977        bits.getBits(1);  // private_bit
1978        unsigned channel_configuration = bits.getBits(3);
1979        CHECK_NE(channel_configuration, 0u);
1980        bits.skipBits(2);  // original_copy, home
1981
1982        sp<MetaData> meta = MakeAACCodecSpecificData(
1983                profile, sampling_freq_index, channel_configuration);
1984
1985        meta->setInt32(kKeyIsADTS, true);
1986
1987        packetSource->setFormat(meta);
1988    }
1989
1990    int64_t numSamples = 0ll;
1991    int32_t sampleRate;
1992    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1993
1994    int64_t timeUs = (PTS * 100ll) / 9ll;
1995    if (mStartup && !mFirstPTSValid) {
1996        mFirstPTSValid = true;
1997        mFirstTimeUs = timeUs;
1998    }
1999
2000    if (mSegmentFirstPTS < 0ll) {
2001        mSegmentFirstPTS = timeUs;
2002        if (!mStartTimeUsRelative) {
2003            // Duplicated logic from how we handle .ts playlists.
2004            if (mStartup && mSegmentStartTimeUs >= 0
2005                    && adjustSeqNumberWithAnchorTime(timeUs)) {
2006                mSegmentStartTimeUs = -1;
2007                return -EAGAIN;
2008            }
2009        }
2010    }
2011
2012    size_t offset = 0;
2013    while (offset < buffer->size()) {
2014        const uint8_t *adtsHeader = buffer->data() + offset;
2015        CHECK_LT(offset + 5, buffer->size());
2016
2017        unsigned aac_frame_length =
2018            ((adtsHeader[3] & 3) << 11)
2019            | (adtsHeader[4] << 3)
2020            | (adtsHeader[5] >> 5);
2021
2022        if (aac_frame_length == 0) {
2023            const uint8_t *id3Header = adtsHeader;
2024            if (!memcmp(id3Header, "ID3", 3)) {
2025                ID3 id3(id3Header, buffer->size() - offset, true);
2026                if (id3.isValid()) {
2027                    offset += id3.rawSize();
2028                    continue;
2029                };
2030            }
2031            return ERROR_MALFORMED;
2032        }
2033
2034        CHECK_LE(offset + aac_frame_length, buffer->size());
2035
2036        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2037        offset += aac_frame_length;
2038
2039        // Each AAC frame encodes 1024 samples.
2040        numSamples += 1024;
2041
2042        if (mStartup) {
2043            int64_t startTimeUs = unitTimeUs;
2044            if (mStartTimeUsRelative) {
2045                startTimeUs -= mFirstTimeUs;
2046                if (startTimeUs  < 0) {
2047                    startTimeUs = 0;
2048                }
2049            }
2050            if (startTimeUs < mStartTimeUs) {
2051                continue;
2052            }
2053
2054            if (mStartTimeUsNotify != NULL) {
2055                mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2056                mStartup = false;
2057            }
2058        }
2059
2060        if (mStopParams != NULL) {
2061            int32_t discontinuitySeq;
2062            int64_t stopTimeUs;
2063            if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2064                    || discontinuitySeq > mDiscontinuitySeq
2065                    || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2066                    || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2067                mStreamTypeMask = 0;
2068                mPacketSources.clear();
2069                return ERROR_OUT_OF_RANGE;
2070            }
2071        }
2072
2073        sp<ABuffer> unit = new ABuffer(aac_frame_length);
2074        memcpy(unit->data(), adtsHeader, aac_frame_length);
2075
2076        unit->meta()->setInt64("timeUs", unitTimeUs);
2077        setAccessUnitProperties(unit, packetSource);
2078        packetSource->queueAccessUnit(unit);
2079    }
2080
2081    return OK;
2082}
2083
2084void PlaylistFetcher::updateDuration() {
2085    int64_t durationUs = 0ll;
2086    for (size_t index = 0; index < mPlaylist->size(); ++index) {
2087        sp<AMessage> itemMeta;
2088        CHECK(mPlaylist->itemAt(
2089                    index, NULL /* uri */, &itemMeta));
2090
2091        int64_t itemDurationUs;
2092        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2093
2094        durationUs += itemDurationUs;
2095    }
2096
2097    sp<AMessage> msg = mNotify->dup();
2098    msg->setInt32("what", kWhatDurationUpdate);
2099    msg->setInt64("durationUs", durationUs);
2100    msg->post();
2101}
2102
2103void PlaylistFetcher::updateTargetDuration() {
2104    sp<AMessage> msg = mNotify->dup();
2105    msg->setInt32("what", kWhatTargetDurationUpdate);
2106    msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2107    msg->post();
2108}
2109
2110}  // namespace android
2111