PlaylistFetcher.cpp revision 5ce50c1931e1e3d8f113394bbe2c9f99354f4c5f
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <openssl/aes.h>
44#include <openssl/md5.h>
45
46namespace android {
47
48// static
49const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
50const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
51const int32_t PlaylistFetcher::kNumSkipFrames = 10;
52
53PlaylistFetcher::PlaylistFetcher(
54        const sp<AMessage> &notify,
55        const sp<LiveSession> &session,
56        const char *uri)
57    : mNotify(notify),
58      mStartTimeUsNotify(notify->dup()),
59      mSession(session),
60      mURI(uri),
61      mStreamTypeMask(0),
62      mStartTimeUs(-1ll),
63      mMinStartTimeUs(0ll),
64      mStopParams(NULL),
65      mLastPlaylistFetchTimeUs(-1ll),
66      mSeqNumber(-1),
67      mNumRetries(0),
68      mStartup(true),
69      mPrepared(false),
70      mNextPTSTimeUs(-1ll),
71      mMonitorQueueGeneration(0),
72      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
73      mFirstPTSValid(false),
74      mAbsoluteTimeAnchorUs(0ll) {
75    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
76    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
77    mStartTimeUsNotify->setInt32("streamMask", 0);
78}
79
80PlaylistFetcher::~PlaylistFetcher() {
81}
82
83int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
84    CHECK(mPlaylist != NULL);
85
86    int32_t firstSeqNumberInPlaylist;
87    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
88                "media-sequence", &firstSeqNumberInPlaylist)) {
89        firstSeqNumberInPlaylist = 0;
90    }
91
92    int32_t lastSeqNumberInPlaylist =
93        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
94
95    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
96    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
97
98    int64_t segmentStartUs = 0ll;
99    for (int32_t index = 0;
100            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
101        sp<AMessage> itemMeta;
102        CHECK(mPlaylist->itemAt(
103                    index, NULL /* uri */, &itemMeta));
104
105        int64_t itemDurationUs;
106        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
107
108        segmentStartUs += itemDurationUs;
109    }
110
111    return segmentStartUs;
112}
113
114int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
115    int64_t nowUs = ALooper::GetNowUs();
116
117    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
118        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
119        return 0ll;
120    }
121
122    if (mPlaylist->isComplete()) {
123        return (~0llu >> 1);
124    }
125
126    int32_t targetDurationSecs;
127    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
128
129    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
130
131    int64_t minPlaylistAgeUs;
132
133    switch (mRefreshState) {
134        case INITIAL_MINIMUM_RELOAD_DELAY:
135        {
136            size_t n = mPlaylist->size();
137            if (n > 0) {
138                sp<AMessage> itemMeta;
139                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
140
141                int64_t itemDurationUs;
142                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
143
144                minPlaylistAgeUs = itemDurationUs;
145                break;
146            }
147
148            // fall through
149        }
150
151        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
152        {
153            minPlaylistAgeUs = targetDurationUs / 2;
154            break;
155        }
156
157        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
158        {
159            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
160            break;
161        }
162
163        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
164        {
165            minPlaylistAgeUs = targetDurationUs * 3;
166            break;
167        }
168
169        default:
170            TRESPASS();
171            break;
172    }
173
174    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
175    return delayUs > 0ll ? delayUs : 0ll;
176}
177
178status_t PlaylistFetcher::decryptBuffer(
179        size_t playlistIndex, const sp<ABuffer> &buffer,
180        bool first) {
181    sp<AMessage> itemMeta;
182    bool found = false;
183    AString method;
184
185    for (ssize_t i = playlistIndex; i >= 0; --i) {
186        AString uri;
187        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
188
189        if (itemMeta->findString("cipher-method", &method)) {
190            found = true;
191            break;
192        }
193    }
194
195    if (!found) {
196        method = "NONE";
197    }
198    buffer->meta()->setString("cipher-method", method.c_str());
199
200    if (method == "NONE") {
201        return OK;
202    } else if (!(method == "AES-128")) {
203        ALOGE("Unsupported cipher method '%s'", method.c_str());
204        return ERROR_UNSUPPORTED;
205    }
206
207    AString keyURI;
208    if (!itemMeta->findString("cipher-uri", &keyURI)) {
209        ALOGE("Missing key uri");
210        return ERROR_MALFORMED;
211    }
212
213    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
214
215    sp<ABuffer> key;
216    if (index >= 0) {
217        key = mAESKeyForURI.valueAt(index);
218    } else {
219        status_t err = mSession->fetchFile(keyURI.c_str(), &key);
220
221        if (err != OK) {
222            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
223            return ERROR_IO;
224        } else if (key->size() != 16) {
225            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
226            return ERROR_MALFORMED;
227        }
228
229        mAESKeyForURI.add(keyURI, key);
230    }
231
232    AES_KEY aes_key;
233    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
234        ALOGE("failed to set AES decryption key.");
235        return UNKNOWN_ERROR;
236    }
237
238    size_t n = buffer->size();
239    if (!n) {
240        return OK;
241    }
242    CHECK(n % 16 == 0);
243
244    if (first) {
245        // If decrypting the first block in a file, read the iv from the manifest
246        // or derive the iv from the file's sequence number.
247
248        AString iv;
249        if (itemMeta->findString("cipher-iv", &iv)) {
250            if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
251                    || iv.size() != 16 * 2 + 2) {
252                ALOGE("malformed cipher IV '%s'.", iv.c_str());
253                return ERROR_MALFORMED;
254            }
255
256            memset(mAESInitVec, 0, sizeof(mAESInitVec));
257            for (size_t i = 0; i < 16; ++i) {
258                char c1 = tolower(iv.c_str()[2 + 2 * i]);
259                char c2 = tolower(iv.c_str()[3 + 2 * i]);
260                if (!isxdigit(c1) || !isxdigit(c2)) {
261                    ALOGE("malformed cipher IV '%s'.", iv.c_str());
262                    return ERROR_MALFORMED;
263                }
264                uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
265                uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
266
267                mAESInitVec[i] = nibble1 << 4 | nibble2;
268            }
269        } else {
270            memset(mAESInitVec, 0, sizeof(mAESInitVec));
271            mAESInitVec[15] = mSeqNumber & 0xff;
272            mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
273            mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
274            mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
275        }
276    }
277
278    AES_cbc_encrypt(
279            buffer->data(), buffer->data(), buffer->size(),
280            &aes_key, mAESInitVec, AES_DECRYPT);
281
282    return OK;
283}
284
285status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
286    status_t err;
287    AString method;
288    CHECK(buffer->meta()->findString("cipher-method", &method));
289    if (method == "NONE") {
290        return OK;
291    }
292
293    uint8_t padding = 0;
294    if (buffer->size() > 0) {
295        padding = buffer->data()[buffer->size() - 1];
296    }
297
298    if (padding > 16) {
299        return ERROR_MALFORMED;
300    }
301
302    for (size_t i = buffer->size() - padding; i < padding; i++) {
303        if (buffer->data()[i] != padding) {
304            return ERROR_MALFORMED;
305        }
306    }
307
308    buffer->setRange(buffer->offset(), buffer->size() - padding);
309    return OK;
310}
311
312void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
313    int64_t maxDelayUs = delayUsToRefreshPlaylist();
314    if (maxDelayUs < minDelayUs) {
315        maxDelayUs = minDelayUs;
316    }
317    if (delayUs > maxDelayUs) {
318        ALOGV("Need to refresh playlist in %lld", maxDelayUs);
319        delayUs = maxDelayUs;
320    }
321    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
322    msg->setInt32("generation", mMonitorQueueGeneration);
323    msg->post(delayUs);
324}
325
326void PlaylistFetcher::cancelMonitorQueue() {
327    ++mMonitorQueueGeneration;
328}
329
330void PlaylistFetcher::startAsync(
331        const sp<AnotherPacketSource> &audioSource,
332        const sp<AnotherPacketSource> &videoSource,
333        const sp<AnotherPacketSource> &subtitleSource,
334        int64_t startTimeUs,
335        int64_t minStartTimeUs,
336        int32_t startSeqNumberHint) {
337    sp<AMessage> msg = new AMessage(kWhatStart, id());
338
339    uint32_t streamTypeMask = 0ul;
340
341    if (audioSource != NULL) {
342        msg->setPointer("audioSource", audioSource.get());
343        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
344    }
345
346    if (videoSource != NULL) {
347        msg->setPointer("videoSource", videoSource.get());
348        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
349    }
350
351    if (subtitleSource != NULL) {
352        msg->setPointer("subtitleSource", subtitleSource.get());
353        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
354    }
355
356    msg->setInt32("streamTypeMask", streamTypeMask);
357    msg->setInt64("startTimeUs", startTimeUs);
358    msg->setInt64("minStartTimeUs", minStartTimeUs);
359    msg->setInt32("startSeqNumberHint", startSeqNumberHint);
360    msg->post();
361}
362
363void PlaylistFetcher::pauseAsync() {
364    (new AMessage(kWhatPause, id()))->post();
365}
366
367void PlaylistFetcher::stopAsync(bool selfTriggered) {
368    sp<AMessage> msg = new AMessage(kWhatStop, id());
369    msg->setInt32("selfTriggered", selfTriggered);
370    msg->post();
371}
372
373void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
374    AMessage* msg = new AMessage(kWhatResumeUntil, id());
375    msg->setMessage("params", params);
376    msg->post();
377}
378
379void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
380    switch (msg->what()) {
381        case kWhatStart:
382        {
383            status_t err = onStart(msg);
384
385            sp<AMessage> notify = mNotify->dup();
386            notify->setInt32("what", kWhatStarted);
387            notify->setInt32("err", err);
388            notify->post();
389            break;
390        }
391
392        case kWhatPause:
393        {
394            onPause();
395
396            sp<AMessage> notify = mNotify->dup();
397            notify->setInt32("what", kWhatPaused);
398            notify->post();
399            break;
400        }
401
402        case kWhatStop:
403        {
404            onStop(msg);
405
406            sp<AMessage> notify = mNotify->dup();
407            notify->setInt32("what", kWhatStopped);
408            notify->post();
409            break;
410        }
411
412        case kWhatMonitorQueue:
413        case kWhatDownloadNext:
414        {
415            int32_t generation;
416            CHECK(msg->findInt32("generation", &generation));
417
418            if (generation != mMonitorQueueGeneration) {
419                // Stale event
420                break;
421            }
422
423            if (msg->what() == kWhatMonitorQueue) {
424                onMonitorQueue();
425            } else {
426                onDownloadNext();
427            }
428            break;
429        }
430
431        case kWhatResumeUntil:
432        {
433            onResumeUntil(msg);
434            break;
435        }
436
437        default:
438            TRESPASS();
439    }
440}
441
442status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
443    mPacketSources.clear();
444
445    uint32_t streamTypeMask;
446    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
447
448    int64_t startTimeUs;
449    int32_t startSeqNumberHint;
450    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
451    CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
452    CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
453
454    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
455        void *ptr;
456        CHECK(msg->findPointer("audioSource", &ptr));
457
458        mPacketSources.add(
459                LiveSession::STREAMTYPE_AUDIO,
460                static_cast<AnotherPacketSource *>(ptr));
461    }
462
463    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
464        void *ptr;
465        CHECK(msg->findPointer("videoSource", &ptr));
466
467        mPacketSources.add(
468                LiveSession::STREAMTYPE_VIDEO,
469                static_cast<AnotherPacketSource *>(ptr));
470    }
471
472    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
473        void *ptr;
474        CHECK(msg->findPointer("subtitleSource", &ptr));
475
476        mPacketSources.add(
477                LiveSession::STREAMTYPE_SUBTITLES,
478                static_cast<AnotherPacketSource *>(ptr));
479    }
480
481    mStreamTypeMask = streamTypeMask;
482    mStartTimeUs = startTimeUs;
483
484    if (mStartTimeUs >= 0ll) {
485        mSeqNumber = -1;
486        mStartup = true;
487        mPrepared = false;
488    }
489
490    if (startSeqNumberHint >= 0) {
491        mSeqNumber = startSeqNumberHint;
492    }
493
494    postMonitorQueue();
495
496    return OK;
497}
498
499void PlaylistFetcher::onPause() {
500    cancelMonitorQueue();
501}
502
503void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
504    cancelMonitorQueue();
505
506    int32_t selfTriggered;
507    CHECK(msg->findInt32("selfTriggered", &selfTriggered));
508    if (!selfTriggered) {
509        // Self triggered stops only happen during switching, in which case we do not want
510        // to clear the discontinuities queued at the end of packet sources.
511        for (size_t i = 0; i < mPacketSources.size(); i++) {
512            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
513            packetSource->clear();
514        }
515    }
516
517    mPacketSources.clear();
518    mStreamTypeMask = 0;
519}
520
521// Resume until we have reached the boundary timestamps listed in `msg`; when
522// the remaining time is too short (within a resume threshold) stop immediately
523// instead.
524status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
525    sp<AMessage> params;
526    CHECK(msg->findMessage("params", &params));
527
528    bool stop = false;
529    for (size_t i = 0; i < mPacketSources.size(); i++) {
530        sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
531
532        const char *stopKey;
533        int streamType = mPacketSources.keyAt(i);
534        switch (streamType) {
535        case LiveSession::STREAMTYPE_VIDEO:
536            stopKey = "timeUsVideo";
537            break;
538
539        case LiveSession::STREAMTYPE_AUDIO:
540            stopKey = "timeUsAudio";
541            break;
542
543        case LiveSession::STREAMTYPE_SUBTITLES:
544            stopKey = "timeUsSubtitle";
545            break;
546
547        default:
548            TRESPASS();
549        }
550
551        // Don't resume if we would stop within a resume threshold.
552        int64_t latestTimeUs = 0, stopTimeUs = 0;
553        sp<AMessage> latestMeta = packetSource->getLatestMeta();
554        if (latestMeta != NULL
555                && (latestMeta->findInt64("timeUs", &latestTimeUs)
556                && params->findInt64(stopKey, &stopTimeUs))) {
557            int64_t diffUs = stopTimeUs - latestTimeUs;
558            if (diffUs < resumeThreshold(latestMeta)) {
559                stop = true;
560            }
561        }
562    }
563
564    if (stop) {
565        for (size_t i = 0; i < mPacketSources.size(); i++) {
566            mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
567        }
568        stopAsync(/* selfTriggered = */ true);
569        return OK;
570    }
571
572    mStopParams = params;
573    postMonitorQueue();
574
575    return OK;
576}
577
578void PlaylistFetcher::notifyError(status_t err) {
579    sp<AMessage> notify = mNotify->dup();
580    notify->setInt32("what", kWhatError);
581    notify->setInt32("err", err);
582    notify->post();
583}
584
585void PlaylistFetcher::queueDiscontinuity(
586        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
587    for (size_t i = 0; i < mPacketSources.size(); ++i) {
588        mPacketSources.valueAt(i)->queueDiscontinuity(type, extra);
589    }
590}
591
592void PlaylistFetcher::onMonitorQueue() {
593    bool downloadMore = false;
594    refreshPlaylist();
595
596    int32_t targetDurationSecs;
597    int64_t targetDurationUs = kMinBufferedDurationUs;
598    if (mPlaylist != NULL) {
599        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
600        targetDurationUs = targetDurationSecs * 1000000ll;
601    }
602
603    // buffer at least 3 times the target duration, or up to 10 seconds
604    int64_t durationToBufferUs = targetDurationUs * 3;
605    if (durationToBufferUs > kMinBufferedDurationUs)  {
606        durationToBufferUs = kMinBufferedDurationUs;
607    }
608
609    int64_t bufferedDurationUs = 0ll;
610    status_t finalResult = NOT_ENOUGH_DATA;
611    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
612        sp<AnotherPacketSource> packetSource =
613            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
614
615        bufferedDurationUs =
616                packetSource->getBufferedDurationUs(&finalResult);
617        finalResult = OK;
618    } else {
619        // Use max stream duration to prevent us from waiting on a non-existent stream;
620        // when we cannot make out from the manifest what streams are included in a playlist
621        // we might assume extra streams.
622        for (size_t i = 0; i < mPacketSources.size(); ++i) {
623            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
624                continue;
625            }
626
627            int64_t bufferedStreamDurationUs =
628                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
629            ALOGV("buffered %lld for stream %d",
630                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
631            if (bufferedStreamDurationUs > bufferedDurationUs) {
632                bufferedDurationUs = bufferedStreamDurationUs;
633            }
634        }
635    }
636    downloadMore = (bufferedDurationUs < durationToBufferUs);
637
638    // signal start if buffered up at least the target size
639    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
640        mPrepared = true;
641
642        ALOGV("prepared, buffered=%lld > %lld",
643                bufferedDurationUs, targetDurationUs);
644        sp<AMessage> msg = mNotify->dup();
645        msg->setInt32("what", kWhatTemporarilyDoneFetching);
646        msg->post();
647    }
648
649    if (finalResult == OK && downloadMore) {
650        ALOGV("monitoring, buffered=%lld < %lld",
651                bufferedDurationUs, durationToBufferUs);
652        // delay the next download slightly; hopefully this gives other concurrent fetchers
653        // a better chance to run.
654        // onDownloadNext();
655        sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
656        msg->setInt32("generation", mMonitorQueueGeneration);
657        msg->post(1000l);
658    } else {
659        // Nothing to do yet, try again in a second.
660
661        sp<AMessage> msg = mNotify->dup();
662        msg->setInt32("what", kWhatTemporarilyDoneFetching);
663        msg->post();
664
665        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
666        ALOGV("pausing for %lld, buffered=%lld > %lld",
667                delayUs, bufferedDurationUs, durationToBufferUs);
668        // :TRICKY: need to enforce minimum delay because the delay to
669        // refresh the playlist will become 0
670        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
671    }
672}
673
674status_t PlaylistFetcher::refreshPlaylist() {
675    if (delayUsToRefreshPlaylist() <= 0) {
676        bool unchanged;
677        sp<M3UParser> playlist = mSession->fetchPlaylist(
678                mURI.c_str(), mPlaylistHash, &unchanged);
679
680        if (playlist == NULL) {
681            if (unchanged) {
682                // We succeeded in fetching the playlist, but it was
683                // unchanged from the last time we tried.
684
685                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
686                    mRefreshState = (RefreshState)(mRefreshState + 1);
687                }
688            } else {
689                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
690                notifyError(ERROR_IO);
691                return ERROR_IO;
692            }
693        } else {
694            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
695            mPlaylist = playlist;
696
697            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
698                updateDuration();
699            }
700        }
701
702        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
703    }
704    return OK;
705}
706
707void PlaylistFetcher::onDownloadNext() {
708    if (refreshPlaylist() != OK) {
709        return;
710    }
711
712    int32_t firstSeqNumberInPlaylist;
713    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
714                "media-sequence", &firstSeqNumberInPlaylist)) {
715        firstSeqNumberInPlaylist = 0;
716    }
717
718    bool seekDiscontinuity = false;
719    bool explicitDiscontinuity = false;
720
721    const int32_t lastSeqNumberInPlaylist =
722        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
723
724    if (mStartup && mSeqNumber >= 0
725            && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
726        // in case we guessed wrong during reconfiguration, try fetching the latest content.
727        mSeqNumber = lastSeqNumberInPlaylist;
728    }
729
730    if (mSeqNumber < 0) {
731        CHECK_GE(mStartTimeUs, 0ll);
732
733        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
734            mSeqNumber = getSeqNumberForTime(mStartTimeUs);
735            ALOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
736                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
737                    lastSeqNumberInPlaylist);
738        } else {
739            // If this is a live session, start 3 segments from the end.
740            mSeqNumber = lastSeqNumberInPlaylist - 3;
741            if (mSeqNumber < firstSeqNumberInPlaylist) {
742                mSeqNumber = firstSeqNumberInPlaylist;
743            }
744            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
745                    mSeqNumber, firstSeqNumberInPlaylist,
746                    lastSeqNumberInPlaylist);
747        }
748
749        mStartTimeUs = -1ll;
750    }
751
752    if (mSeqNumber < firstSeqNumberInPlaylist
753            || mSeqNumber > lastSeqNumberInPlaylist) {
754        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
755            ++mNumRetries;
756
757            if (mSeqNumber > lastSeqNumberInPlaylist) {
758                // refresh in increasing fraction (1/2, 1/3, ...) of the
759                // playlist's target duration or 3 seconds, whichever is less
760                int32_t targetDurationSecs;
761                CHECK(mPlaylist->meta()->findInt32(
762                        "target-duration", &targetDurationSecs));
763                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
764                        1000000ll / (1 + mNumRetries);
765                if (delayUs > kMaxMonitorDelayUs) {
766                    delayUs = kMaxMonitorDelayUs;
767                }
768                ALOGV("sequence number high: %d from (%d .. %d), "
769                      "monitor in %lld (retry=%d)",
770                        mSeqNumber, firstSeqNumberInPlaylist,
771                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
772                postMonitorQueue(delayUs);
773                return;
774            }
775
776            // we've missed the boat, let's start from the lowest sequence
777            // number available and signal a discontinuity.
778
779            ALOGI("We've missed the boat, restarting playback."
780                  "  mStartup=%d, was  looking for %d in %d-%d",
781                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
782                    lastSeqNumberInPlaylist);
783            mSeqNumber = lastSeqNumberInPlaylist - 3;
784            if (mSeqNumber < firstSeqNumberInPlaylist) {
785                mSeqNumber = firstSeqNumberInPlaylist;
786            }
787            explicitDiscontinuity = true;
788
789            // fall through
790        } else {
791            ALOGE("Cannot find sequence number %d in playlist "
792                 "(contains %d - %d)",
793                 mSeqNumber, firstSeqNumberInPlaylist,
794                 firstSeqNumberInPlaylist + mPlaylist->size() - 1);
795
796            notifyError(ERROR_END_OF_STREAM);
797            return;
798        }
799    }
800
801    mNumRetries = 0;
802
803    AString uri;
804    sp<AMessage> itemMeta;
805    CHECK(mPlaylist->itemAt(
806                mSeqNumber - firstSeqNumberInPlaylist,
807                &uri,
808                &itemMeta));
809
810    int32_t val;
811    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
812        explicitDiscontinuity = true;
813    }
814
815    int64_t range_offset, range_length;
816    if (!itemMeta->findInt64("range-offset", &range_offset)
817            || !itemMeta->findInt64("range-length", &range_length)) {
818        range_offset = 0;
819        range_length = -1;
820    }
821
822    ALOGV("fetching segment %d from (%d .. %d)",
823          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
824
825    ALOGV("fetching '%s'", uri.c_str());
826
827    sp<ABuffer> buffer;
828    status_t err = mSession->fetchFile(
829            uri.c_str(), &buffer, range_offset, range_length);
830
831    if (err != OK) {
832        ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
833        notifyError(err);
834        return;
835    }
836
837    CHECK(buffer != NULL);
838
839    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
840    if (err == OK) {
841        err = checkDecryptPadding(buffer);
842    }
843
844    if (err != OK) {
845        ALOGE("decryptBuffer failed w/ error %d", err);
846
847        notifyError(err);
848        return;
849    }
850
851    if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
852        // Signal discontinuity.
853
854        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
855            // If this was a live event this made no sense since
856            // we don't have access to all the segment before the current
857            // one.
858            mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
859        }
860
861        if (seekDiscontinuity || explicitDiscontinuity) {
862            ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
863                 seekDiscontinuity, explicitDiscontinuity);
864
865            queueDiscontinuity(
866                    explicitDiscontinuity
867                        ? ATSParser::DISCONTINUITY_FORMATCHANGE
868                        : ATSParser::DISCONTINUITY_SEEK,
869                    NULL /* extra */);
870        }
871    }
872
873    err = extractAndQueueAccessUnits(buffer, itemMeta);
874
875    if (err == -EAGAIN) {
876        // bad starting sequence number hint
877        postMonitorQueue();
878        return;
879    }
880
881    if (err == ERROR_OUT_OF_RANGE) {
882        // reached stopping point
883        stopAsync(/* selfTriggered = */ true);
884        return;
885    }
886
887    if (err != OK) {
888        notifyError(err);
889        return;
890    }
891
892    ++mSeqNumber;
893
894    postMonitorQueue();
895
896    mStartup = false;
897}
898
899int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
900    int32_t firstSeqNumberInPlaylist;
901    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
902                "media-sequence", &firstSeqNumberInPlaylist)) {
903        firstSeqNumberInPlaylist = 0;
904    }
905
906    size_t index = 0;
907    int64_t segmentStartUs = 0;
908    while (index < mPlaylist->size()) {
909        sp<AMessage> itemMeta;
910        CHECK(mPlaylist->itemAt(
911                    index, NULL /* uri */, &itemMeta));
912
913        int64_t itemDurationUs;
914        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
915
916        if (timeUs < segmentStartUs + itemDurationUs) {
917            break;
918        }
919
920        segmentStartUs += itemDurationUs;
921        ++index;
922    }
923
924    if (index >= mPlaylist->size()) {
925        index = mPlaylist->size() - 1;
926    }
927
928    return firstSeqNumberInPlaylist + index;
929}
930
931status_t PlaylistFetcher::extractAndQueueAccessUnits(
932        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
933    if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
934        // Let's assume this is an MPEG2 transport stream.
935
936        if ((buffer->size() % 188) != 0) {
937            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
938                  "bytes in length.");
939            return ERROR_MALFORMED;
940        }
941
942        if (mTSParser == NULL) {
943            // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
944            mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
945        }
946
947        if (mNextPTSTimeUs >= 0ll) {
948            sp<AMessage> extra = new AMessage;
949            // Since we are using absolute timestamps, signal an offset of 0 to prevent
950            // ATSParser from skewing the timestamps of access units.
951            extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
952
953            mTSParser->signalDiscontinuity(
954                    ATSParser::DISCONTINUITY_SEEK, extra);
955
956            mNextPTSTimeUs = -1ll;
957        }
958
959        size_t offset = 0;
960        while (offset < buffer->size()) {
961            status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
962
963            if (err != OK) {
964                return err;
965            }
966
967            offset += 188;
968        }
969
970        status_t err = OK;
971        for (size_t i = mPacketSources.size(); i-- > 0;) {
972            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
973
974            const char *key;
975            ATSParser::SourceType type;
976            const LiveSession::StreamType stream = mPacketSources.keyAt(i);
977            switch (stream) {
978
979                case LiveSession::STREAMTYPE_VIDEO:
980                    type = ATSParser::VIDEO;
981                    key = "timeUsVideo";
982                    break;
983
984                case LiveSession::STREAMTYPE_AUDIO:
985                    type = ATSParser::AUDIO;
986                    key = "timeUsAudio";
987                    break;
988
989                case LiveSession::STREAMTYPE_SUBTITLES:
990                {
991                    ALOGE("MPEG2 Transport streams do not contain subtitles.");
992                    return ERROR_MALFORMED;
993                    break;
994                }
995
996                default:
997                    TRESPASS();
998            }
999
1000            sp<AnotherPacketSource> source =
1001                static_cast<AnotherPacketSource *>(
1002                        mTSParser->getSource(type).get());
1003
1004            if (source == NULL) {
1005                ALOGW("MPEG2 Transport stream does not contain %s data.",
1006                      type == ATSParser::VIDEO ? "video" : "audio");
1007
1008                mStreamTypeMask &= ~mPacketSources.keyAt(i);
1009                mPacketSources.removeItemsAt(i);
1010                continue;
1011            }
1012
1013            int64_t timeUs;
1014            sp<ABuffer> accessUnit;
1015            status_t finalResult;
1016            while (source->hasBufferAvailable(&finalResult)
1017                    && source->dequeueAccessUnit(&accessUnit) == OK) {
1018
1019                CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1020                if (mMinStartTimeUs > 0) {
1021                    if (timeUs < mMinStartTimeUs) {
1022                        // TODO untested path
1023                        // try a later ts
1024                        int32_t targetDuration;
1025                        mPlaylist->meta()->findInt32("target-duration", &targetDuration);
1026                        int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
1027                        if (incr == 0) {
1028                            // increment mSeqNumber by at least one
1029                            incr = 1;
1030                        }
1031                        mSeqNumber += incr;
1032                        err = -EAGAIN;
1033                        break;
1034                    } else {
1035                        int64_t startTimeUs;
1036                        if (mStartTimeUsNotify != NULL
1037                                && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
1038                            mStartTimeUsNotify->setInt64(key, timeUs);
1039
1040                            uint32_t streamMask = 0;
1041                            mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1042                            streamMask |= mPacketSources.keyAt(i);
1043                            mStartTimeUsNotify->setInt32("streamMask", streamMask);
1044
1045                            if (streamMask == mStreamTypeMask) {
1046                                mStartTimeUsNotify->post();
1047                                mStartTimeUsNotify.clear();
1048                            }
1049                        }
1050                    }
1051                }
1052
1053                if (mStopParams != NULL) {
1054                    // Queue discontinuity in original stream.
1055                    int64_t stopTimeUs;
1056                    if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
1057                        packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
1058                        mStreamTypeMask &= ~stream;
1059                        mPacketSources.removeItemsAt(i);
1060                        break;
1061                    }
1062                }
1063
1064                // Note that we do NOT dequeue any discontinuities except for format change.
1065
1066                // for simplicity, store a reference to the format in each unit
1067                sp<MetaData> format = source->getFormat();
1068                if (format != NULL) {
1069                    accessUnit->meta()->setObject("format", format);
1070                }
1071
1072                // Stash the sequence number so we can hint future fetchers where to start at.
1073                accessUnit->meta()->setInt32("seq", mSeqNumber);
1074                packetSource->queueAccessUnit(accessUnit);
1075            }
1076
1077            if (err != OK) {
1078                break;
1079            }
1080        }
1081
1082        if (err != OK) {
1083            for (size_t i = mPacketSources.size(); i-- > 0;) {
1084                sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1085                packetSource->clear();
1086            }
1087            return err;
1088        }
1089
1090        if (!mStreamTypeMask) {
1091            // Signal gap is filled between original and new stream.
1092            ALOGV("ERROR OUT OF RANGE");
1093            return ERROR_OUT_OF_RANGE;
1094        }
1095
1096        return OK;
1097    } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
1098        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1099            ALOGE("This stream only contains subtitles.");
1100            return ERROR_MALFORMED;
1101        }
1102
1103        const sp<AnotherPacketSource> packetSource =
1104            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1105
1106        int64_t durationUs;
1107        CHECK(itemMeta->findInt64("durationUs", &durationUs));
1108        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1109        buffer->meta()->setInt64("durationUs", durationUs);
1110        buffer->meta()->setInt32("seq", mSeqNumber);
1111
1112        packetSource->queueAccessUnit(buffer);
1113        return OK;
1114    }
1115
1116    if (mNextPTSTimeUs >= 0ll) {
1117        mFirstPTSValid = false;
1118        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
1119        mNextPTSTimeUs = -1ll;
1120    }
1121
1122    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1123    // stream prefixed by an ID3 tag.
1124
1125    bool firstID3Tag = true;
1126    uint64_t PTS = 0;
1127
1128    for (;;) {
1129        // Make sure to skip all ID3 tags preceding the audio data.
1130        // At least one must be present to provide the PTS timestamp.
1131
1132        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1133        if (!id3.isValid()) {
1134            if (firstID3Tag) {
1135                ALOGE("Unable to parse ID3 tag.");
1136                return ERROR_MALFORMED;
1137            } else {
1138                break;
1139            }
1140        }
1141
1142        if (firstID3Tag) {
1143            bool found = false;
1144
1145            ID3::Iterator it(id3, "PRIV");
1146            while (!it.done()) {
1147                size_t length;
1148                const uint8_t *data = it.getData(&length);
1149
1150                static const char *kMatchName =
1151                    "com.apple.streaming.transportStreamTimestamp";
1152                static const size_t kMatchNameLen = strlen(kMatchName);
1153
1154                if (length == kMatchNameLen + 1 + 8
1155                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1156                    found = true;
1157                    PTS = U64_AT(&data[kMatchNameLen + 1]);
1158                }
1159
1160                it.next();
1161            }
1162
1163            if (!found) {
1164                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1165                return ERROR_MALFORMED;
1166            }
1167        }
1168
1169        // skip the ID3 tag
1170        buffer->setRange(
1171                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1172
1173        firstID3Tag = false;
1174    }
1175
1176    if (!mFirstPTSValid) {
1177        mFirstPTSValid = true;
1178        mFirstPTS = PTS;
1179    }
1180    PTS -= mFirstPTS;
1181
1182    int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs;
1183
1184    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1185        ALOGW("This stream only contains audio data!");
1186
1187        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1188
1189        if (mStreamTypeMask == 0) {
1190            return OK;
1191        }
1192    }
1193
1194    sp<AnotherPacketSource> packetSource =
1195        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1196
1197    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1198        ABitReader bits(buffer->data(), buffer->size());
1199
1200        // adts_fixed_header
1201
1202        CHECK_EQ(bits.getBits(12), 0xfffu);
1203        bits.skipBits(3);  // ID, layer
1204        bool protection_absent = bits.getBits(1) != 0;
1205
1206        unsigned profile = bits.getBits(2);
1207        CHECK_NE(profile, 3u);
1208        unsigned sampling_freq_index = bits.getBits(4);
1209        bits.getBits(1);  // private_bit
1210        unsigned channel_configuration = bits.getBits(3);
1211        CHECK_NE(channel_configuration, 0u);
1212        bits.skipBits(2);  // original_copy, home
1213
1214        sp<MetaData> meta = MakeAACCodecSpecificData(
1215                profile, sampling_freq_index, channel_configuration);
1216
1217        meta->setInt32(kKeyIsADTS, true);
1218
1219        packetSource->setFormat(meta);
1220    }
1221
1222    int64_t numSamples = 0ll;
1223    int32_t sampleRate;
1224    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1225
1226    size_t offset = 0;
1227    while (offset < buffer->size()) {
1228        const uint8_t *adtsHeader = buffer->data() + offset;
1229        CHECK_LT(offset + 5, buffer->size());
1230
1231        unsigned aac_frame_length =
1232            ((adtsHeader[3] & 3) << 11)
1233            | (adtsHeader[4] << 3)
1234            | (adtsHeader[5] >> 5);
1235
1236        CHECK_LE(offset + aac_frame_length, buffer->size());
1237
1238        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1239        memcpy(unit->data(), adtsHeader, aac_frame_length);
1240
1241        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1242        unit->meta()->setInt64("timeUs", unitTimeUs);
1243
1244        // Each AAC frame encodes 1024 samples.
1245        numSamples += 1024;
1246
1247        unit->meta()->setInt32("seq", mSeqNumber);
1248        packetSource->queueAccessUnit(unit);
1249
1250        offset += aac_frame_length;
1251    }
1252
1253    return OK;
1254}
1255
1256void PlaylistFetcher::updateDuration() {
1257    int64_t durationUs = 0ll;
1258    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1259        sp<AMessage> itemMeta;
1260        CHECK(mPlaylist->itemAt(
1261                    index, NULL /* uri */, &itemMeta));
1262
1263        int64_t itemDurationUs;
1264        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1265
1266        durationUs += itemDurationUs;
1267    }
1268
1269    sp<AMessage> msg = mNotify->dup();
1270    msg->setInt32("what", kWhatDurationUpdate);
1271    msg->setInt64("durationUs", durationUs);
1272    msg->post();
1273}
1274
1275int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
1276    int64_t durationUs, threshold;
1277    if (msg->findInt64("durationUs", &durationUs)) {
1278        return kNumSkipFrames * durationUs;
1279    }
1280
1281    sp<RefBase> obj;
1282    msg->findObject("format", &obj);
1283    MetaData *format = static_cast<MetaData *>(obj.get());
1284
1285    const char *mime;
1286    CHECK(format->findCString(kKeyMIMEType, &mime));
1287    bool audio = !strncasecmp(mime, "audio/", 6);
1288    if (audio) {
1289        // Assumes 1000 samples per frame.
1290        int32_t sampleRate;
1291        CHECK(format->findInt32(kKeySampleRate, &sampleRate));
1292        return kNumSkipFrames  /* frames */ * 1000 /* samples */
1293                * (1000000 / sampleRate) /* sample duration (us) */;
1294    } else {
1295        int32_t frameRate;
1296        if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
1297            return kNumSkipFrames * (1000000 / frameRate);
1298        }
1299    }
1300
1301    return 500000ll;
1302}
1303
1304}  // namespace android
1305