PlaylistFetcher.cpp revision 84333e0475bc911adc16417f4ca327c975cf6c36
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <openssl/aes.h>
44#include <openssl/md5.h>
45
46namespace android {
47
48// static
49const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
50const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
51
52PlaylistFetcher::PlaylistFetcher(
53        const sp<AMessage> &notify,
54        const sp<LiveSession> &session,
55        const char *uri)
56    : mNotify(notify),
57      mSession(session),
58      mURI(uri),
59      mStreamTypeMask(0),
60      mStartTimeUs(-1ll),
61      mLastPlaylistFetchTimeUs(-1ll),
62      mSeqNumber(-1),
63      mNumRetries(0),
64      mStartup(true),
65      mPrepared(false),
66      mNextPTSTimeUs(-1ll),
67      mMonitorQueueGeneration(0),
68      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
69      mFirstPTSValid(false),
70      mAbsoluteTimeAnchorUs(0ll) {
71    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
72}
73
74PlaylistFetcher::~PlaylistFetcher() {
75}
76
77int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
78    CHECK(mPlaylist != NULL);
79
80    int32_t firstSeqNumberInPlaylist;
81    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
82                "media-sequence", &firstSeqNumberInPlaylist)) {
83        firstSeqNumberInPlaylist = 0;
84    }
85
86    int32_t lastSeqNumberInPlaylist =
87        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
88
89    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
90    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
91
92    int64_t segmentStartUs = 0ll;
93    for (int32_t index = 0;
94            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
95        sp<AMessage> itemMeta;
96        CHECK(mPlaylist->itemAt(
97                    index, NULL /* uri */, &itemMeta));
98
99        int64_t itemDurationUs;
100        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
101
102        segmentStartUs += itemDurationUs;
103    }
104
105    return segmentStartUs;
106}
107
108int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
109    int64_t nowUs = ALooper::GetNowUs();
110
111    if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
112        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
113        return 0ll;
114    }
115
116    if (mPlaylist->isComplete()) {
117        return (~0llu >> 1);
118    }
119
120    int32_t targetDurationSecs;
121    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
122
123    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
124
125    int64_t minPlaylistAgeUs;
126
127    switch (mRefreshState) {
128        case INITIAL_MINIMUM_RELOAD_DELAY:
129        {
130            size_t n = mPlaylist->size();
131            if (n > 0) {
132                sp<AMessage> itemMeta;
133                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
134
135                int64_t itemDurationUs;
136                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
137
138                minPlaylistAgeUs = itemDurationUs;
139                break;
140            }
141
142            // fall through
143        }
144
145        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
146        {
147            minPlaylistAgeUs = targetDurationUs / 2;
148            break;
149        }
150
151        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
152        {
153            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
154            break;
155        }
156
157        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
158        {
159            minPlaylistAgeUs = targetDurationUs * 3;
160            break;
161        }
162
163        default:
164            TRESPASS();
165            break;
166    }
167
168    int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
169    return delayUs > 0ll ? delayUs : 0ll;
170}
171
172status_t PlaylistFetcher::decryptBuffer(
173        size_t playlistIndex, const sp<ABuffer> &buffer) {
174    sp<AMessage> itemMeta;
175    bool found = false;
176    AString method;
177
178    for (ssize_t i = playlistIndex; i >= 0; --i) {
179        AString uri;
180        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
181
182        if (itemMeta->findString("cipher-method", &method)) {
183            found = true;
184            break;
185        }
186    }
187
188    if (!found) {
189        method = "NONE";
190    }
191
192    if (method == "NONE") {
193        return OK;
194    } else if (!(method == "AES-128")) {
195        ALOGE("Unsupported cipher method '%s'", method.c_str());
196        return ERROR_UNSUPPORTED;
197    }
198
199    AString keyURI;
200    if (!itemMeta->findString("cipher-uri", &keyURI)) {
201        ALOGE("Missing key uri");
202        return ERROR_MALFORMED;
203    }
204
205    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
206
207    sp<ABuffer> key;
208    if (index >= 0) {
209        key = mAESKeyForURI.valueAt(index);
210    } else {
211        status_t err = mSession->fetchFile(keyURI.c_str(), &key);
212
213        if (err != OK) {
214            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
215            return ERROR_IO;
216        } else if (key->size() != 16) {
217            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
218            return ERROR_MALFORMED;
219        }
220
221        mAESKeyForURI.add(keyURI, key);
222    }
223
224    AES_KEY aes_key;
225    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
226        ALOGE("failed to set AES decryption key.");
227        return UNKNOWN_ERROR;
228    }
229
230    unsigned char aes_ivec[16];
231
232    AString iv;
233    if (itemMeta->findString("cipher-iv", &iv)) {
234        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
235                || iv.size() != 16 * 2 + 2) {
236            ALOGE("malformed cipher IV '%s'.", iv.c_str());
237            return ERROR_MALFORMED;
238        }
239
240        memset(aes_ivec, 0, sizeof(aes_ivec));
241        for (size_t i = 0; i < 16; ++i) {
242            char c1 = tolower(iv.c_str()[2 + 2 * i]);
243            char c2 = tolower(iv.c_str()[3 + 2 * i]);
244            if (!isxdigit(c1) || !isxdigit(c2)) {
245                ALOGE("malformed cipher IV '%s'.", iv.c_str());
246                return ERROR_MALFORMED;
247            }
248            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
249            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
250
251            aes_ivec[i] = nibble1 << 4 | nibble2;
252        }
253    } else {
254        memset(aes_ivec, 0, sizeof(aes_ivec));
255        aes_ivec[15] = mSeqNumber & 0xff;
256        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
257        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
258        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
259    }
260
261    AES_cbc_encrypt(
262            buffer->data(), buffer->data(), buffer->size(),
263            &aes_key, aes_ivec, AES_DECRYPT);
264
265    // hexdump(buffer->data(), buffer->size());
266
267    size_t n = buffer->size();
268    CHECK_GT(n, 0u);
269
270    size_t pad = buffer->data()[n - 1];
271
272    CHECK_GT(pad, 0u);
273    CHECK_LE(pad, 16u);
274    CHECK_GE((size_t)n, pad);
275    for (size_t i = 0; i < pad; ++i) {
276        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
277    }
278
279    n -= pad;
280
281    buffer->setRange(buffer->offset(), n);
282
283    return OK;
284}
285
286void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
287    int64_t maxDelayUs = delayUsToRefreshPlaylist();
288    if (maxDelayUs < minDelayUs) {
289        maxDelayUs = minDelayUs;
290    }
291    if (delayUs > maxDelayUs) {
292        ALOGV("Need to refresh playlist in %lld", maxDelayUs);
293        delayUs = maxDelayUs;
294    }
295    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
296    msg->setInt32("generation", mMonitorQueueGeneration);
297    msg->post(delayUs);
298}
299
300void PlaylistFetcher::cancelMonitorQueue() {
301    ++mMonitorQueueGeneration;
302}
303
304void PlaylistFetcher::startAsync(
305        const sp<AnotherPacketSource> &audioSource,
306        const sp<AnotherPacketSource> &videoSource,
307        const sp<AnotherPacketSource> &subtitleSource,
308        int64_t startTimeUs) {
309    sp<AMessage> msg = new AMessage(kWhatStart, id());
310
311    uint32_t streamTypeMask = 0ul;
312
313    if (audioSource != NULL) {
314        msg->setPointer("audioSource", audioSource.get());
315        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
316    }
317
318    if (videoSource != NULL) {
319        msg->setPointer("videoSource", videoSource.get());
320        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
321    }
322
323    if (subtitleSource != NULL) {
324        msg->setPointer("subtitleSource", subtitleSource.get());
325        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
326    }
327
328    msg->setInt32("streamTypeMask", streamTypeMask);
329    msg->setInt64("startTimeUs", startTimeUs);
330    msg->post();
331}
332
333void PlaylistFetcher::pauseAsync() {
334    (new AMessage(kWhatPause, id()))->post();
335}
336
337void PlaylistFetcher::stopAsync() {
338    (new AMessage(kWhatStop, id()))->post();
339}
340
341void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
342    switch (msg->what()) {
343        case kWhatStart:
344        {
345            status_t err = onStart(msg);
346
347            sp<AMessage> notify = mNotify->dup();
348            notify->setInt32("what", kWhatStarted);
349            notify->setInt32("err", err);
350            notify->post();
351            break;
352        }
353
354        case kWhatPause:
355        {
356            onPause();
357
358            sp<AMessage> notify = mNotify->dup();
359            notify->setInt32("what", kWhatPaused);
360            notify->post();
361            break;
362        }
363
364        case kWhatStop:
365        {
366            onStop();
367
368            sp<AMessage> notify = mNotify->dup();
369            notify->setInt32("what", kWhatStopped);
370            notify->post();
371            break;
372        }
373
374        case kWhatMonitorQueue:
375        {
376            int32_t generation;
377            CHECK(msg->findInt32("generation", &generation));
378
379            if (generation != mMonitorQueueGeneration) {
380                // Stale event
381                break;
382            }
383
384            onMonitorQueue();
385            break;
386        }
387
388        default:
389            TRESPASS();
390    }
391}
392
393status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
394    mPacketSources.clear();
395
396    uint32_t streamTypeMask;
397    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
398
399    int64_t startTimeUs;
400    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
401
402    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
403        void *ptr;
404        CHECK(msg->findPointer("audioSource", &ptr));
405
406        mPacketSources.add(
407                LiveSession::STREAMTYPE_AUDIO,
408                static_cast<AnotherPacketSource *>(ptr));
409    }
410
411    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
412        void *ptr;
413        CHECK(msg->findPointer("videoSource", &ptr));
414
415        mPacketSources.add(
416                LiveSession::STREAMTYPE_VIDEO,
417                static_cast<AnotherPacketSource *>(ptr));
418    }
419
420    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
421        void *ptr;
422        CHECK(msg->findPointer("subtitleSource", &ptr));
423
424        mPacketSources.add(
425                LiveSession::STREAMTYPE_SUBTITLES,
426                static_cast<AnotherPacketSource *>(ptr));
427    }
428
429    mStreamTypeMask = streamTypeMask;
430    mStartTimeUs = startTimeUs;
431
432    if (mStartTimeUs >= 0ll) {
433        mSeqNumber = -1;
434        mStartup = true;
435        mPrepared = false;
436    }
437
438    postMonitorQueue();
439
440    return OK;
441}
442
443void PlaylistFetcher::onPause() {
444    cancelMonitorQueue();
445
446    mPacketSources.clear();
447    mStreamTypeMask = 0;
448}
449
450void PlaylistFetcher::onStop() {
451    cancelMonitorQueue();
452
453    for (size_t i = 0; i < mPacketSources.size(); ++i) {
454        mPacketSources.valueAt(i)->clear();
455    }
456
457    mPacketSources.clear();
458    mStreamTypeMask = 0;
459}
460
461void PlaylistFetcher::notifyError(status_t err) {
462    sp<AMessage> notify = mNotify->dup();
463    notify->setInt32("what", kWhatError);
464    notify->setInt32("err", err);
465    notify->post();
466}
467
468void PlaylistFetcher::queueDiscontinuity(
469        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
470    for (size_t i = 0; i < mPacketSources.size(); ++i) {
471        mPacketSources.valueAt(i)->queueDiscontinuity(type, extra);
472    }
473}
474
475void PlaylistFetcher::onMonitorQueue() {
476    bool downloadMore = false;
477    refreshPlaylist();
478
479    int32_t targetDurationSecs;
480    int64_t targetDurationUs = kMinBufferedDurationUs;
481    if (mPlaylist != NULL) {
482        CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
483        targetDurationUs = targetDurationSecs * 1000000ll;
484    }
485
486    // buffer at least 3 times the target duration, or up to 10 seconds
487    int64_t durationToBufferUs = targetDurationUs * 3;
488    if (durationToBufferUs > kMinBufferedDurationUs)  {
489        durationToBufferUs = kMinBufferedDurationUs;
490    }
491
492    int64_t bufferedDurationUs = 0ll;
493    status_t finalResult = NOT_ENOUGH_DATA;
494    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
495        sp<AnotherPacketSource> packetSource =
496            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
497
498        bufferedDurationUs =
499                packetSource->getBufferedDurationUs(&finalResult);
500        finalResult = OK;
501    } else {
502        bool first = true;
503
504        for (size_t i = 0; i < mPacketSources.size(); ++i) {
505            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
506                continue;
507            }
508
509            int64_t bufferedStreamDurationUs =
510                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
511            if (first || bufferedStreamDurationUs < bufferedDurationUs) {
512                bufferedDurationUs = bufferedStreamDurationUs;
513                first = false;
514            }
515        }
516    }
517    downloadMore = (bufferedDurationUs < durationToBufferUs);
518
519    // signal start if buffered up at least the target size
520    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
521        mPrepared = true;
522
523        ALOGV("prepared, buffered=%lld > %lld",
524                bufferedDurationUs, targetDurationUs);
525        sp<AMessage> msg = mNotify->dup();
526        msg->setInt32("what", kWhatTemporarilyDoneFetching);
527        msg->post();
528    }
529
530    if (finalResult == OK && downloadMore) {
531        ALOGV("monitoring, buffered=%lld < %lld",
532                bufferedDurationUs, durationToBufferUs);
533        onDownloadNext();
534    } else {
535        // Nothing to do yet, try again in a second.
536
537        sp<AMessage> msg = mNotify->dup();
538        msg->setInt32("what", kWhatTemporarilyDoneFetching);
539        msg->post();
540
541        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
542        ALOGV("pausing for %lld, buffered=%lld > %lld",
543                delayUs, bufferedDurationUs, durationToBufferUs);
544        // :TRICKY: need to enforce minimum delay because the delay to
545        // refresh the playlist will become 0
546        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
547    }
548}
549
550status_t PlaylistFetcher::refreshPlaylist() {
551    if (delayUsToRefreshPlaylist() <= 0) {
552        bool unchanged;
553        sp<M3UParser> playlist = mSession->fetchPlaylist(
554                mURI.c_str(), mPlaylistHash, &unchanged);
555
556        if (playlist == NULL) {
557            if (unchanged) {
558                // We succeeded in fetching the playlist, but it was
559                // unchanged from the last time we tried.
560
561                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
562                    mRefreshState = (RefreshState)(mRefreshState + 1);
563                }
564            } else {
565                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
566                notifyError(ERROR_IO);
567                return ERROR_IO;
568            }
569        } else {
570            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
571            mPlaylist = playlist;
572
573            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
574                updateDuration();
575            }
576        }
577
578        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
579    }
580    return OK;
581}
582
583void PlaylistFetcher::onDownloadNext() {
584    if (refreshPlaylist() != OK) {
585        return;
586    }
587
588    int32_t firstSeqNumberInPlaylist;
589    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
590                "media-sequence", &firstSeqNumberInPlaylist)) {
591        firstSeqNumberInPlaylist = 0;
592    }
593
594    bool seekDiscontinuity = false;
595    bool explicitDiscontinuity = false;
596
597    const int32_t lastSeqNumberInPlaylist =
598        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
599
600    if (mSeqNumber < 0) {
601        CHECK_GE(mStartTimeUs, 0ll);
602
603        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
604            mSeqNumber = getSeqNumberForTime(mStartTimeUs);
605            ALOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
606                    mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
607                    lastSeqNumberInPlaylist);
608        } else {
609            // If this is a live session, start 3 segments from the end.
610            mSeqNumber = lastSeqNumberInPlaylist - 3;
611            if (mSeqNumber < firstSeqNumberInPlaylist) {
612                mSeqNumber = firstSeqNumberInPlaylist;
613            }
614            ALOGV("Initial sequence number for live event %d from (%d .. %d)",
615                    mSeqNumber, firstSeqNumberInPlaylist,
616                    lastSeqNumberInPlaylist);
617        }
618
619        mStartTimeUs = -1ll;
620    }
621
622    if (mSeqNumber < firstSeqNumberInPlaylist
623            || mSeqNumber > lastSeqNumberInPlaylist) {
624        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
625            ++mNumRetries;
626
627            if (mSeqNumber > lastSeqNumberInPlaylist) {
628                // refresh in increasing fraction (1/2, 1/3, ...) of the
629                // playlist's target duration or 3 seconds, whichever is less
630                int32_t targetDurationSecs;
631                CHECK(mPlaylist->meta()->findInt32(
632                        "target-duration", &targetDurationSecs));
633                int64_t delayUs = mPlaylist->size() * targetDurationSecs *
634                        1000000ll / (1 + mNumRetries);
635                if (delayUs > kMaxMonitorDelayUs) {
636                    delayUs = kMaxMonitorDelayUs;
637                }
638                ALOGV("sequence number high: %d from (%d .. %d), "
639                      "monitor in %lld (retry=%d)",
640                        mSeqNumber, firstSeqNumberInPlaylist,
641                        lastSeqNumberInPlaylist, delayUs, mNumRetries);
642                postMonitorQueue(delayUs);
643                return;
644            }
645
646            // we've missed the boat, let's start from the lowest sequence
647            // number available and signal a discontinuity.
648
649            ALOGI("We've missed the boat, restarting playback."
650                  "  mStartup=%d, was  looking for %d in %d-%d",
651                    mStartup, mSeqNumber, firstSeqNumberInPlaylist,
652                    lastSeqNumberInPlaylist);
653            mSeqNumber = lastSeqNumberInPlaylist - 3;
654            if (mSeqNumber < firstSeqNumberInPlaylist) {
655                mSeqNumber = firstSeqNumberInPlaylist;
656            }
657            explicitDiscontinuity = true;
658
659            // fall through
660        } else {
661            ALOGE("Cannot find sequence number %d in playlist "
662                 "(contains %d - %d)",
663                 mSeqNumber, firstSeqNumberInPlaylist,
664                 firstSeqNumberInPlaylist + mPlaylist->size() - 1);
665
666            notifyError(ERROR_END_OF_STREAM);
667            return;
668        }
669    }
670
671    mNumRetries = 0;
672
673    AString uri;
674    sp<AMessage> itemMeta;
675    CHECK(mPlaylist->itemAt(
676                mSeqNumber - firstSeqNumberInPlaylist,
677                &uri,
678                &itemMeta));
679
680    int32_t val;
681    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
682        explicitDiscontinuity = true;
683    }
684
685    int64_t range_offset, range_length;
686    if (!itemMeta->findInt64("range-offset", &range_offset)
687            || !itemMeta->findInt64("range-length", &range_length)) {
688        range_offset = 0;
689        range_length = -1;
690    }
691
692    ALOGV("fetching segment %d from (%d .. %d)",
693          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
694
695    ALOGV("fetching '%s'", uri.c_str());
696
697    sp<ABuffer> buffer;
698    status_t err = mSession->fetchFile(
699            uri.c_str(), &buffer, range_offset, range_length);
700
701    if (err != OK) {
702        ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
703        notifyError(err);
704        return;
705    }
706
707    CHECK(buffer != NULL);
708
709    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
710
711    if (err != OK) {
712        ALOGE("decryptBuffer failed w/ error %d", err);
713
714        notifyError(err);
715        return;
716    }
717
718    if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
719        // Signal discontinuity.
720
721        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
722            // If this was a live event this made no sense since
723            // we don't have access to all the segment before the current
724            // one.
725            mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
726        }
727
728        if (seekDiscontinuity || explicitDiscontinuity) {
729            ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
730                 seekDiscontinuity, explicitDiscontinuity);
731
732            queueDiscontinuity(
733                    explicitDiscontinuity
734                        ? ATSParser::DISCONTINUITY_FORMATCHANGE
735                        : ATSParser::DISCONTINUITY_SEEK,
736                    NULL /* extra */);
737        }
738    }
739
740    err = extractAndQueueAccessUnits(buffer, itemMeta);
741
742    if (err != OK) {
743        notifyError(err);
744        return;
745    }
746
747    ++mSeqNumber;
748
749    postMonitorQueue();
750
751    mStartup = false;
752}
753
754int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
755    int32_t firstSeqNumberInPlaylist;
756    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
757                "media-sequence", &firstSeqNumberInPlaylist)) {
758        firstSeqNumberInPlaylist = 0;
759    }
760
761    size_t index = 0;
762    int64_t segmentStartUs = 0;
763    while (index < mPlaylist->size()) {
764        sp<AMessage> itemMeta;
765        CHECK(mPlaylist->itemAt(
766                    index, NULL /* uri */, &itemMeta));
767
768        int64_t itemDurationUs;
769        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
770
771        if (timeUs < segmentStartUs + itemDurationUs) {
772            break;
773        }
774
775        segmentStartUs += itemDurationUs;
776        ++index;
777    }
778
779    if (index >= mPlaylist->size()) {
780        index = mPlaylist->size() - 1;
781    }
782
783    return firstSeqNumberInPlaylist + index;
784}
785
786status_t PlaylistFetcher::extractAndQueueAccessUnits(
787        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
788    if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
789        // Let's assume this is an MPEG2 transport stream.
790
791        if ((buffer->size() % 188) != 0) {
792            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
793                  "bytes in length.");
794            return ERROR_MALFORMED;
795        }
796
797        if (mTSParser == NULL) {
798            mTSParser = new ATSParser;
799        }
800
801        if (mNextPTSTimeUs >= 0ll) {
802            sp<AMessage> extra = new AMessage;
803            extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
804
805            mTSParser->signalDiscontinuity(
806                    ATSParser::DISCONTINUITY_SEEK, extra);
807
808            mNextPTSTimeUs = -1ll;
809        }
810
811        size_t offset = 0;
812        while (offset < buffer->size()) {
813            status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
814
815            if (err != OK) {
816                return err;
817            }
818
819            offset += 188;
820        }
821
822        for (size_t i = mPacketSources.size(); i-- > 0;) {
823            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
824
825            ATSParser::SourceType type;
826            switch (mPacketSources.keyAt(i)) {
827                case LiveSession::STREAMTYPE_VIDEO:
828                    type = ATSParser::VIDEO;
829                    break;
830
831                case LiveSession::STREAMTYPE_AUDIO:
832                    type = ATSParser::AUDIO;
833                    break;
834
835                case LiveSession::STREAMTYPE_SUBTITLES:
836                {
837                    ALOGE("MPEG2 Transport streams do not contain subtitles.");
838                    return ERROR_MALFORMED;
839                    break;
840                }
841
842                default:
843                    TRESPASS();
844            }
845
846            sp<AnotherPacketSource> source =
847                static_cast<AnotherPacketSource *>(
848                        mTSParser->getSource(type).get());
849
850            if (source == NULL) {
851                ALOGW("MPEG2 Transport stream does not contain %s data.",
852                      type == ATSParser::VIDEO ? "video" : "audio");
853
854                mStreamTypeMask &= ~mPacketSources.keyAt(i);
855                mPacketSources.removeItemsAt(i);
856                continue;
857            }
858
859            sp<ABuffer> accessUnit;
860            status_t finalResult;
861            while (source->hasBufferAvailable(&finalResult)
862                    && source->dequeueAccessUnit(&accessUnit) == OK) {
863                // Note that we do NOT dequeue any discontinuities.
864
865                // for simplicity, store a reference to the format in each unit
866                sp<MetaData> format = source->getFormat();
867                if (format != NULL) {
868                    accessUnit->meta()->setObject("format", format);
869                }
870                packetSource->queueAccessUnit(accessUnit);
871            }
872        }
873
874        return OK;
875    } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
876        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
877            ALOGE("This stream only contains subtitles.");
878            return ERROR_MALFORMED;
879        }
880
881        const sp<AnotherPacketSource> packetSource =
882            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
883
884        int64_t durationUs;
885        CHECK(itemMeta->findInt64("durationUs", &durationUs));
886        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
887        buffer->meta()->setInt64("durationUs", durationUs);
888
889        packetSource->queueAccessUnit(buffer);
890        return OK;
891    }
892
893    if (mNextPTSTimeUs >= 0ll) {
894        mFirstPTSValid = false;
895        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
896        mNextPTSTimeUs = -1ll;
897    }
898
899    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
900    // stream prefixed by an ID3 tag.
901
902    bool firstID3Tag = true;
903    uint64_t PTS = 0;
904
905    for (;;) {
906        // Make sure to skip all ID3 tags preceding the audio data.
907        // At least one must be present to provide the PTS timestamp.
908
909        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
910        if (!id3.isValid()) {
911            if (firstID3Tag) {
912                ALOGE("Unable to parse ID3 tag.");
913                return ERROR_MALFORMED;
914            } else {
915                break;
916            }
917        }
918
919        if (firstID3Tag) {
920            bool found = false;
921
922            ID3::Iterator it(id3, "PRIV");
923            while (!it.done()) {
924                size_t length;
925                const uint8_t *data = it.getData(&length);
926
927                static const char *kMatchName =
928                    "com.apple.streaming.transportStreamTimestamp";
929                static const size_t kMatchNameLen = strlen(kMatchName);
930
931                if (length == kMatchNameLen + 1 + 8
932                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
933                    found = true;
934                    PTS = U64_AT(&data[kMatchNameLen + 1]);
935                }
936
937                it.next();
938            }
939
940            if (!found) {
941                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
942                return ERROR_MALFORMED;
943            }
944        }
945
946        // skip the ID3 tag
947        buffer->setRange(
948                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
949
950        firstID3Tag = false;
951    }
952
953    if (!mFirstPTSValid) {
954        mFirstPTSValid = true;
955        mFirstPTS = PTS;
956    }
957    PTS -= mFirstPTS;
958
959    int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs;
960
961    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
962        ALOGW("This stream only contains audio data!");
963
964        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
965
966        if (mStreamTypeMask == 0) {
967            return OK;
968        }
969    }
970
971    sp<AnotherPacketSource> packetSource =
972        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
973
974    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
975        ABitReader bits(buffer->data(), buffer->size());
976
977        // adts_fixed_header
978
979        CHECK_EQ(bits.getBits(12), 0xfffu);
980        bits.skipBits(3);  // ID, layer
981        bool protection_absent = bits.getBits(1) != 0;
982
983        unsigned profile = bits.getBits(2);
984        CHECK_NE(profile, 3u);
985        unsigned sampling_freq_index = bits.getBits(4);
986        bits.getBits(1);  // private_bit
987        unsigned channel_configuration = bits.getBits(3);
988        CHECK_NE(channel_configuration, 0u);
989        bits.skipBits(2);  // original_copy, home
990
991        sp<MetaData> meta = MakeAACCodecSpecificData(
992                profile, sampling_freq_index, channel_configuration);
993
994        meta->setInt32(kKeyIsADTS, true);
995
996        packetSource->setFormat(meta);
997    }
998
999    int64_t numSamples = 0ll;
1000    int32_t sampleRate;
1001    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1002
1003    size_t offset = 0;
1004    while (offset < buffer->size()) {
1005        const uint8_t *adtsHeader = buffer->data() + offset;
1006        CHECK_LT(offset + 5, buffer->size());
1007
1008        unsigned aac_frame_length =
1009            ((adtsHeader[3] & 3) << 11)
1010            | (adtsHeader[4] << 3)
1011            | (adtsHeader[5] >> 5);
1012
1013        CHECK_LE(offset + aac_frame_length, buffer->size());
1014
1015        sp<ABuffer> unit = new ABuffer(aac_frame_length);
1016        memcpy(unit->data(), adtsHeader, aac_frame_length);
1017
1018        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
1019        unit->meta()->setInt64("timeUs", unitTimeUs);
1020
1021        // Each AAC frame encodes 1024 samples.
1022        numSamples += 1024;
1023
1024        packetSource->queueAccessUnit(unit);
1025
1026        offset += aac_frame_length;
1027    }
1028
1029    return OK;
1030}
1031
1032void PlaylistFetcher::updateDuration() {
1033    int64_t durationUs = 0ll;
1034    for (size_t index = 0; index < mPlaylist->size(); ++index) {
1035        sp<AMessage> itemMeta;
1036        CHECK(mPlaylist->itemAt(
1037                    index, NULL /* uri */, &itemMeta));
1038
1039        int64_t itemDurationUs;
1040        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1041
1042        durationUs += itemDurationUs;
1043    }
1044
1045    sp<AMessage> msg = mNotify->dup();
1046    msg->setInt32("what", kWhatDurationUpdate);
1047    msg->setInt64("durationUs", durationUs);
1048    msg->post();
1049}
1050
1051}  // namespace android
1052