PlaylistFetcher.cpp revision f6d0c1fd6d9e697bb3a891fae14c7e9d4b685de6
1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "PlaylistFetcher"
19#include <utils/Log.h>
20
21#include "PlaylistFetcher.h"
22
23#include "LiveDataSource.h"
24#include "LiveSession.h"
25#include "M3UParser.h"
26
27#include "include/avc_utils.h"
28#include "include/HTTPBase.h"
29#include "include/ID3.h"
30#include "mpeg2ts/AnotherPacketSource.h"
31
32#include <media/IStreamSource.h>
33#include <media/stagefright/foundation/ABitReader.h>
34#include <media/stagefright/foundation/ABuffer.h>
35#include <media/stagefright/foundation/ADebug.h>
36#include <media/stagefright/foundation/hexdump.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaDefs.h>
39#include <media/stagefright/MetaData.h>
40#include <media/stagefright/Utils.h>
41
42#include <ctype.h>
43#include <inttypes.h>
44#include <openssl/aes.h>
45#include <openssl/md5.h>
46
47namespace android {
48
49// static
50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
51
52PlaylistFetcher::PlaylistFetcher(
53        const sp<AMessage> &notify,
54        const sp<LiveSession> &session,
55        const char *uri)
56    : mNotify(notify),
57      mSession(session),
58      mURI(uri),
59      mStreamTypeMask(0),
60      mStartTimeUs(-1ll),
61      mLastPlaylistFetchTimeUs(-1ll),
62      mSeqNumber(-1),
63      mNumRetries(0),
64      mStartup(true),
65      mNextPTSTimeUs(-1ll),
66      mMonitorQueueGeneration(0),
67      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
68      mFirstPTSValid(false),
69      mAbsoluteTimeAnchorUs(0ll) {
70    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
71}
72
73PlaylistFetcher::~PlaylistFetcher() {
74}
75
76int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
77    CHECK(mPlaylist != NULL);
78
79    int32_t firstSeqNumberInPlaylist;
80    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
81                "media-sequence", &firstSeqNumberInPlaylist)) {
82        firstSeqNumberInPlaylist = 0;
83    }
84
85    int32_t lastSeqNumberInPlaylist =
86        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
87
88    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
89    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
90
91    int64_t segmentStartUs = 0ll;
92    for (int32_t index = 0;
93            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
94        sp<AMessage> itemMeta;
95        CHECK(mPlaylist->itemAt(
96                    index, NULL /* uri */, &itemMeta));
97
98        int64_t itemDurationUs;
99        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
100
101        segmentStartUs += itemDurationUs;
102    }
103
104    return segmentStartUs;
105}
106
107bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const {
108    if (mPlaylist == NULL) {
109        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
110        return true;
111    }
112
113    int32_t targetDurationSecs;
114    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
115
116    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
117
118    int64_t minPlaylistAgeUs;
119
120    switch (mRefreshState) {
121        case INITIAL_MINIMUM_RELOAD_DELAY:
122        {
123            size_t n = mPlaylist->size();
124            if (n > 0) {
125                sp<AMessage> itemMeta;
126                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
127
128                int64_t itemDurationUs;
129                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
130
131                minPlaylistAgeUs = itemDurationUs;
132                break;
133            }
134
135            // fall through
136        }
137
138        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
139        {
140            minPlaylistAgeUs = targetDurationUs / 2;
141            break;
142        }
143
144        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
145        {
146            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
147            break;
148        }
149
150        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
151        {
152            minPlaylistAgeUs = targetDurationUs * 3;
153            break;
154        }
155
156        default:
157            TRESPASS();
158            break;
159    }
160
161    return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
162}
163
164status_t PlaylistFetcher::decryptBuffer(
165        size_t playlistIndex, const sp<ABuffer> &buffer) {
166    sp<AMessage> itemMeta;
167    bool found = false;
168    AString method;
169
170    for (ssize_t i = playlistIndex; i >= 0; --i) {
171        AString uri;
172        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
173
174        if (itemMeta->findString("cipher-method", &method)) {
175            found = true;
176            break;
177        }
178    }
179
180    if (!found) {
181        method = "NONE";
182    }
183
184    if (method == "NONE") {
185        return OK;
186    } else if (!(method == "AES-128")) {
187        ALOGE("Unsupported cipher method '%s'", method.c_str());
188        return ERROR_UNSUPPORTED;
189    }
190
191    AString keyURI;
192    if (!itemMeta->findString("cipher-uri", &keyURI)) {
193        ALOGE("Missing key uri");
194        return ERROR_MALFORMED;
195    }
196
197    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
198
199    sp<ABuffer> key;
200    if (index >= 0) {
201        key = mAESKeyForURI.valueAt(index);
202    } else {
203        status_t err = mSession->fetchFile(keyURI.c_str(), &key);
204
205        if (err != OK) {
206            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
207            return ERROR_IO;
208        } else if (key->size() != 16) {
209            ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
210            return ERROR_MALFORMED;
211        }
212
213        mAESKeyForURI.add(keyURI, key);
214    }
215
216    AES_KEY aes_key;
217    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
218        ALOGE("failed to set AES decryption key.");
219        return UNKNOWN_ERROR;
220    }
221
222    unsigned char aes_ivec[16];
223
224    AString iv;
225    if (itemMeta->findString("cipher-iv", &iv)) {
226        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
227                || iv.size() != 16 * 2 + 2) {
228            ALOGE("malformed cipher IV '%s'.", iv.c_str());
229            return ERROR_MALFORMED;
230        }
231
232        memset(aes_ivec, 0, sizeof(aes_ivec));
233        for (size_t i = 0; i < 16; ++i) {
234            char c1 = tolower(iv.c_str()[2 + 2 * i]);
235            char c2 = tolower(iv.c_str()[3 + 2 * i]);
236            if (!isxdigit(c1) || !isxdigit(c2)) {
237                ALOGE("malformed cipher IV '%s'.", iv.c_str());
238                return ERROR_MALFORMED;
239            }
240            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
241            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
242
243            aes_ivec[i] = nibble1 << 4 | nibble2;
244        }
245    } else {
246        memset(aes_ivec, 0, sizeof(aes_ivec));
247        aes_ivec[15] = mSeqNumber & 0xff;
248        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
249        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
250        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
251    }
252
253    AES_cbc_encrypt(
254            buffer->data(), buffer->data(), buffer->size(),
255            &aes_key, aes_ivec, AES_DECRYPT);
256
257    // hexdump(buffer->data(), buffer->size());
258
259    size_t n = buffer->size();
260    CHECK_GT(n, 0u);
261
262    size_t pad = buffer->data()[n - 1];
263
264    CHECK_GT(pad, 0u);
265    CHECK_LE(pad, 16u);
266    CHECK_GE((size_t)n, pad);
267    for (size_t i = 0; i < pad; ++i) {
268        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
269    }
270
271    n -= pad;
272
273    buffer->setRange(buffer->offset(), n);
274
275    return OK;
276}
277
278void PlaylistFetcher::postMonitorQueue(int64_t delayUs) {
279    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
280    msg->setInt32("generation", mMonitorQueueGeneration);
281    msg->post(delayUs);
282}
283
284void PlaylistFetcher::cancelMonitorQueue() {
285    ++mMonitorQueueGeneration;
286}
287
288void PlaylistFetcher::startAsync(
289        const sp<AnotherPacketSource> &audioSource,
290        const sp<AnotherPacketSource> &videoSource,
291        const sp<AnotherPacketSource> &subtitleSource,
292        int64_t startTimeUs) {
293    sp<AMessage> msg = new AMessage(kWhatStart, id());
294
295    uint32_t streamTypeMask = 0ul;
296
297    if (audioSource != NULL) {
298        msg->setPointer("audioSource", audioSource.get());
299        streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
300    }
301
302    if (videoSource != NULL) {
303        msg->setPointer("videoSource", videoSource.get());
304        streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
305    }
306
307    if (subtitleSource != NULL) {
308        msg->setPointer("subtitleSource", subtitleSource.get());
309        streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
310    }
311
312    msg->setInt32("streamTypeMask", streamTypeMask);
313    msg->setInt64("startTimeUs", startTimeUs);
314    msg->post();
315}
316
317void PlaylistFetcher::pauseAsync() {
318    (new AMessage(kWhatPause, id()))->post();
319}
320
321void PlaylistFetcher::stopAsync() {
322    (new AMessage(kWhatStop, id()))->post();
323}
324
325void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
326    switch (msg->what()) {
327        case kWhatStart:
328        {
329            status_t err = onStart(msg);
330
331            sp<AMessage> notify = mNotify->dup();
332            notify->setInt32("what", kWhatStarted);
333            notify->setInt32("err", err);
334            notify->post();
335            break;
336        }
337
338        case kWhatPause:
339        {
340            onPause();
341
342            sp<AMessage> notify = mNotify->dup();
343            notify->setInt32("what", kWhatPaused);
344            notify->post();
345            break;
346        }
347
348        case kWhatStop:
349        {
350            onStop();
351
352            sp<AMessage> notify = mNotify->dup();
353            notify->setInt32("what", kWhatStopped);
354            notify->post();
355            break;
356        }
357
358        case kWhatMonitorQueue:
359        {
360            int32_t generation;
361            CHECK(msg->findInt32("generation", &generation));
362
363            if (generation != mMonitorQueueGeneration) {
364                // Stale event
365                break;
366            }
367
368            onMonitorQueue();
369            break;
370        }
371
372        default:
373            TRESPASS();
374    }
375}
376
377status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
378    mPacketSources.clear();
379
380    uint32_t streamTypeMask;
381    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
382
383    int64_t startTimeUs;
384    CHECK(msg->findInt64("startTimeUs", &startTimeUs));
385
386    if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
387        void *ptr;
388        CHECK(msg->findPointer("audioSource", &ptr));
389
390        mPacketSources.add(
391                LiveSession::STREAMTYPE_AUDIO,
392                static_cast<AnotherPacketSource *>(ptr));
393    }
394
395    if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
396        void *ptr;
397        CHECK(msg->findPointer("videoSource", &ptr));
398
399        mPacketSources.add(
400                LiveSession::STREAMTYPE_VIDEO,
401                static_cast<AnotherPacketSource *>(ptr));
402    }
403
404    if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
405        void *ptr;
406        CHECK(msg->findPointer("subtitleSource", &ptr));
407
408        mPacketSources.add(
409                LiveSession::STREAMTYPE_SUBTITLES,
410                static_cast<AnotherPacketSource *>(ptr));
411    }
412
413    mStreamTypeMask = streamTypeMask;
414    mStartTimeUs = startTimeUs;
415
416    if (mStartTimeUs >= 0ll) {
417        mSeqNumber = -1;
418        mStartup = true;
419    }
420
421    postMonitorQueue();
422
423    return OK;
424}
425
426void PlaylistFetcher::onPause() {
427    cancelMonitorQueue();
428
429    mPacketSources.clear();
430    mStreamTypeMask = 0;
431}
432
433void PlaylistFetcher::onStop() {
434    cancelMonitorQueue();
435
436    for (size_t i = 0; i < mPacketSources.size(); ++i) {
437        mPacketSources.valueAt(i)->clear();
438    }
439
440    mPacketSources.clear();
441    mStreamTypeMask = 0;
442}
443
444void PlaylistFetcher::notifyError(status_t err) {
445    sp<AMessage> notify = mNotify->dup();
446    notify->setInt32("what", kWhatError);
447    notify->setInt32("err", err);
448    notify->post();
449}
450
451void PlaylistFetcher::queueDiscontinuity(
452        ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
453    for (size_t i = 0; i < mPacketSources.size(); ++i) {
454        mPacketSources.valueAt(i)->queueDiscontinuity(type, extra);
455    }
456}
457
458void PlaylistFetcher::onMonitorQueue() {
459    bool downloadMore = false;
460
461    status_t finalResult;
462    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
463        sp<AnotherPacketSource> packetSource =
464            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
465
466        int64_t bufferedDurationUs =
467                packetSource->getBufferedDurationUs(&finalResult);
468
469        downloadMore = (bufferedDurationUs < kMinBufferedDurationUs);
470        finalResult = OK;
471    } else {
472        bool first = true;
473        int64_t minBufferedDurationUs = 0ll;
474
475        for (size_t i = 0; i < mPacketSources.size(); ++i) {
476            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
477                continue;
478            }
479
480            int64_t bufferedDurationUs =
481                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
482
483            if (first || bufferedDurationUs < minBufferedDurationUs) {
484                minBufferedDurationUs = bufferedDurationUs;
485                first = false;
486            }
487        }
488
489        downloadMore =
490            !first && (minBufferedDurationUs < kMinBufferedDurationUs);
491    }
492
493    if (finalResult == OK && downloadMore) {
494        onDownloadNext();
495    } else {
496        // Nothing to do yet, try again in a second.
497
498        sp<AMessage> msg = mNotify->dup();
499        msg->setInt32("what", kWhatTemporarilyDoneFetching);
500        msg->post();
501
502        postMonitorQueue(1000000ll);
503    }
504}
505
506void PlaylistFetcher::onDownloadNext() {
507    int64_t nowUs = ALooper::GetNowUs();
508
509    if (mLastPlaylistFetchTimeUs < 0ll
510            || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
511        bool unchanged;
512        sp<M3UParser> playlist = mSession->fetchPlaylist(
513                mURI.c_str(), mPlaylistHash, &unchanged);
514
515        if (playlist == NULL) {
516            if (unchanged) {
517                // We succeeded in fetching the playlist, but it was
518                // unchanged from the last time we tried.
519
520                if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
521                    mRefreshState = (RefreshState)(mRefreshState + 1);
522                }
523            } else {
524                ALOGE("failed to load playlist at url '%s'", mURI.c_str());
525                notifyError(ERROR_IO);
526                return;
527            }
528        } else {
529            mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
530            mPlaylist = playlist;
531
532            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
533                updateDuration();
534            }
535        }
536
537        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
538    }
539
540    int32_t firstSeqNumberInPlaylist;
541    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
542                "media-sequence", &firstSeqNumberInPlaylist)) {
543        firstSeqNumberInPlaylist = 0;
544    }
545
546    bool seekDiscontinuity = false;
547    bool explicitDiscontinuity = false;
548
549    const int32_t lastSeqNumberInPlaylist =
550        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
551
552    if (mSeqNumber < 0) {
553        CHECK_GE(mStartTimeUs, 0ll);
554
555        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
556            mSeqNumber = getSeqNumberForTime(mStartTimeUs);
557        } else {
558            // If this is a live session, start 3 segments from the end.
559            mSeqNumber = lastSeqNumberInPlaylist - 3;
560            if (mSeqNumber < firstSeqNumberInPlaylist) {
561                mSeqNumber = firstSeqNumberInPlaylist;
562            }
563        }
564
565        mStartTimeUs = -1ll;
566    }
567
568    if (mSeqNumber < firstSeqNumberInPlaylist
569            || mSeqNumber > lastSeqNumberInPlaylist) {
570        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
571            ++mNumRetries;
572
573            if (mSeqNumber > lastSeqNumberInPlaylist) {
574                mLastPlaylistFetchTimeUs = -1;
575                postMonitorQueue(3000000ll);
576                return;
577            }
578
579            // we've missed the boat, let's start from the lowest sequence
580            // number available and signal a discontinuity.
581
582            ALOGI("We've missed the boat, restarting playback.");
583            mSeqNumber = lastSeqNumberInPlaylist;
584            explicitDiscontinuity = true;
585
586            // fall through
587        } else {
588            ALOGE("Cannot find sequence number %d in playlist "
589                 "(contains %d - %d)",
590                 mSeqNumber, firstSeqNumberInPlaylist,
591                  firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
592
593            notifyError(ERROR_END_OF_STREAM);
594            return;
595        }
596    }
597
598    mNumRetries = 0;
599
600    AString uri;
601    sp<AMessage> itemMeta;
602    CHECK(mPlaylist->itemAt(
603                mSeqNumber - firstSeqNumberInPlaylist,
604                &uri,
605                &itemMeta));
606
607    int32_t val;
608    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
609        explicitDiscontinuity = true;
610    }
611
612    int64_t range_offset, range_length;
613    if (!itemMeta->findInt64("range-offset", &range_offset)
614            || !itemMeta->findInt64("range-length", &range_length)) {
615        range_offset = 0;
616        range_length = -1;
617    }
618
619    ALOGV("fetching segment %d from (%d .. %d)",
620          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
621
622    ALOGV("fetching '%s'", uri.c_str());
623
624    sp<ABuffer> buffer;
625    status_t err = mSession->fetchFile(
626            uri.c_str(), &buffer, range_offset, range_length);
627
628    if (err != OK) {
629        ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
630        notifyError(err);
631        return;
632    }
633
634    CHECK(buffer != NULL);
635
636    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
637
638    if (err != OK) {
639        ALOGE("decryptBuffer failed w/ error %d", err);
640
641        notifyError(err);
642        return;
643    }
644
645    if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
646        // Signal discontinuity.
647
648        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
649            // If this was a live event this made no sense since
650            // we don't have access to all the segment before the current
651            // one.
652            mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
653        }
654
655        if (seekDiscontinuity || explicitDiscontinuity) {
656            ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
657                 seekDiscontinuity, explicitDiscontinuity);
658
659            queueDiscontinuity(
660                    explicitDiscontinuity
661                        ? ATSParser::DISCONTINUITY_FORMATCHANGE
662                        : ATSParser::DISCONTINUITY_SEEK,
663                    NULL /* extra */);
664        }
665    }
666
667    err = extractAndQueueAccessUnits(buffer, itemMeta);
668
669    if (err != OK) {
670        notifyError(err);
671        return;
672    }
673
674    ++mSeqNumber;
675
676    postMonitorQueue();
677
678    mStartup = false;
679}
680
681int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
682    int32_t firstSeqNumberInPlaylist;
683    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
684                "media-sequence", &firstSeqNumberInPlaylist)) {
685        firstSeqNumberInPlaylist = 0;
686    }
687
688    size_t index = 0;
689    int64_t segmentStartUs = 0;
690    while (index < mPlaylist->size()) {
691        sp<AMessage> itemMeta;
692        CHECK(mPlaylist->itemAt(
693                    index, NULL /* uri */, &itemMeta));
694
695        int64_t itemDurationUs;
696        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
697
698        if (timeUs < segmentStartUs + itemDurationUs) {
699            break;
700        }
701
702        segmentStartUs += itemDurationUs;
703        ++index;
704    }
705
706    if (index >= mPlaylist->size()) {
707        index = mPlaylist->size() - 1;
708    }
709
710    return firstSeqNumberInPlaylist + index;
711}
712
713status_t PlaylistFetcher::extractAndQueueAccessUnits(
714        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
715    if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
716        // Let's assume this is an MPEG2 transport stream.
717
718        if ((buffer->size() % 188) != 0) {
719            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
720                  "bytes in length.");
721            return ERROR_MALFORMED;
722        }
723
724        if (mTSParser == NULL) {
725            mTSParser = new ATSParser;
726        }
727
728        if (mNextPTSTimeUs >= 0ll) {
729            sp<AMessage> extra = new AMessage;
730            extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
731
732            mTSParser->signalDiscontinuity(
733                    ATSParser::DISCONTINUITY_SEEK, extra);
734
735            mNextPTSTimeUs = -1ll;
736        }
737
738        size_t offset = 0;
739        while (offset < buffer->size()) {
740            status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
741
742            if (err != OK) {
743                return err;
744            }
745
746            offset += 188;
747        }
748
749        for (size_t i = mPacketSources.size(); i-- > 0;) {
750            sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
751
752            ATSParser::SourceType type;
753            switch (mPacketSources.keyAt(i)) {
754                case LiveSession::STREAMTYPE_VIDEO:
755                    type = ATSParser::VIDEO;
756                    break;
757
758                case LiveSession::STREAMTYPE_AUDIO:
759                    type = ATSParser::AUDIO;
760                    break;
761
762                case LiveSession::STREAMTYPE_SUBTITLES:
763                {
764                    ALOGE("MPEG2 Transport streams do not contain subtitles.");
765                    return ERROR_MALFORMED;
766                    break;
767                }
768
769                default:
770                    TRESPASS();
771            }
772
773            sp<AnotherPacketSource> source =
774                static_cast<AnotherPacketSource *>(
775                        mTSParser->getSource(type).get());
776
777            if (source == NULL) {
778                ALOGW("MPEG2 Transport stream does not contain %s data.",
779                      type == ATSParser::VIDEO ? "video" : "audio");
780
781                mStreamTypeMask &= ~mPacketSources.keyAt(i);
782                mPacketSources.removeItemsAt(i);
783                continue;
784            }
785
786            sp<ABuffer> accessUnit;
787            status_t finalResult;
788            while (source->hasBufferAvailable(&finalResult)
789                    && source->dequeueAccessUnit(&accessUnit) == OK) {
790                // Note that we do NOT dequeue any discontinuities.
791
792                packetSource->queueAccessUnit(accessUnit);
793            }
794
795            if (packetSource->getFormat() == NULL) {
796                packetSource->setFormat(source->getFormat());
797            }
798        }
799
800        return OK;
801    } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
802        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
803            ALOGE("This stream only contains subtitles.");
804            return ERROR_MALFORMED;
805        }
806
807        const sp<AnotherPacketSource> packetSource =
808            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
809
810        int64_t durationUs;
811        CHECK(itemMeta->findInt64("durationUs", &durationUs));
812        buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
813        buffer->meta()->setInt64("durationUs", durationUs);
814
815        packetSource->queueAccessUnit(buffer);
816        return OK;
817    }
818
819    if (mNextPTSTimeUs >= 0ll) {
820        mFirstPTSValid = false;
821        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
822        mNextPTSTimeUs = -1ll;
823    }
824
825    // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
826    // stream prefixed by an ID3 tag.
827
828    bool firstID3Tag = true;
829    uint64_t PTS = 0;
830
831    for (;;) {
832        // Make sure to skip all ID3 tags preceding the audio data.
833        // At least one must be present to provide the PTS timestamp.
834
835        ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
836        if (!id3.isValid()) {
837            if (firstID3Tag) {
838                ALOGE("Unable to parse ID3 tag.");
839                return ERROR_MALFORMED;
840            } else {
841                break;
842            }
843        }
844
845        if (firstID3Tag) {
846            bool found = false;
847
848            ID3::Iterator it(id3, "PRIV");
849            while (!it.done()) {
850                size_t length;
851                const uint8_t *data = it.getData(&length);
852
853                static const char *kMatchName =
854                    "com.apple.streaming.transportStreamTimestamp";
855                static const size_t kMatchNameLen = strlen(kMatchName);
856
857                if (length == kMatchNameLen + 1 + 8
858                        && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
859                    found = true;
860                    PTS = U64_AT(&data[kMatchNameLen + 1]);
861                }
862
863                it.next();
864            }
865
866            if (!found) {
867                ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
868                return ERROR_MALFORMED;
869            }
870        }
871
872        // skip the ID3 tag
873        buffer->setRange(
874                buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
875
876        firstID3Tag = false;
877    }
878
879    if (!mFirstPTSValid) {
880        mFirstPTSValid = true;
881        mFirstPTS = PTS;
882    }
883    PTS -= mFirstPTS;
884
885    int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs;
886
887    if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
888        ALOGW("This stream only contains audio data!");
889
890        mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
891
892        if (mStreamTypeMask == 0) {
893            return OK;
894        }
895    }
896
897    sp<AnotherPacketSource> packetSource =
898        mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
899
900    if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
901        ABitReader bits(buffer->data(), buffer->size());
902
903        // adts_fixed_header
904
905        CHECK_EQ(bits.getBits(12), 0xfffu);
906        bits.skipBits(3);  // ID, layer
907        bool protection_absent = bits.getBits(1) != 0;
908
909        unsigned profile = bits.getBits(2);
910        CHECK_NE(profile, 3u);
911        unsigned sampling_freq_index = bits.getBits(4);
912        bits.getBits(1);  // private_bit
913        unsigned channel_configuration = bits.getBits(3);
914        CHECK_NE(channel_configuration, 0u);
915        bits.skipBits(2);  // original_copy, home
916
917        sp<MetaData> meta = MakeAACCodecSpecificData(
918                profile, sampling_freq_index, channel_configuration);
919
920        meta->setInt32(kKeyIsADTS, true);
921
922        packetSource->setFormat(meta);
923    }
924
925    int64_t numSamples = 0ll;
926    int32_t sampleRate;
927    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
928
929    size_t offset = 0;
930    while (offset < buffer->size()) {
931        const uint8_t *adtsHeader = buffer->data() + offset;
932        CHECK_LT(offset + 5, buffer->size());
933
934        unsigned aac_frame_length =
935            ((adtsHeader[3] & 3) << 11)
936            | (adtsHeader[4] << 3)
937            | (adtsHeader[5] >> 5);
938
939        CHECK_LE(offset + aac_frame_length, buffer->size());
940
941        sp<ABuffer> unit = new ABuffer(aac_frame_length);
942        memcpy(unit->data(), adtsHeader, aac_frame_length);
943
944        int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
945        unit->meta()->setInt64("timeUs", unitTimeUs);
946
947        // Each AAC frame encodes 1024 samples.
948        numSamples += 1024;
949
950        packetSource->queueAccessUnit(unit);
951
952        offset += aac_frame_length;
953    }
954
955    return OK;
956}
957
958void PlaylistFetcher::updateDuration() {
959    int64_t durationUs = 0ll;
960    for (size_t index = 0; index < mPlaylist->size(); ++index) {
961        sp<AMessage> itemMeta;
962        CHECK(mPlaylist->itemAt(
963                    index, NULL /* uri */, &itemMeta));
964
965        int64_t itemDurationUs;
966        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
967
968        durationUs += itemDurationUs;
969    }
970
971    sp<AMessage> msg = mNotify->dup();
972    msg->setInt32("what", kWhatDurationUpdate);
973    msg->setInt64("durationUs", durationUs);
974    msg->post();
975}
976
977}  // namespace android
978