1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MPEG2TSExtractor"
19
20#include <inttypes.h>
21#include <utils/Log.h>
22
23#include "include/MPEG2TSExtractor.h"
24#include "include/NuCachedSource2.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AUtils.h>
30#include <media/stagefright/DataSource.h>
31#include <media/stagefright/MediaDefs.h>
32#include <media/stagefright/MediaErrors.h>
33#include <media/stagefright/MediaSource.h>
34#include <media/stagefright/MetaData.h>
35#include <media/IStreamSource.h>
36#include <utils/String8.h>
37
38#include "AnotherPacketSource.h"
39#include "ATSParser.h"
40
41namespace android {
42
43static const size_t kTSPacketSize = 188;
44static const int kMaxDurationReadSize = 250000LL;
45static const int kMaxDurationRetry = 6;
46
47struct MPEG2TSSource : public MediaSource {
48    MPEG2TSSource(
49            const sp<MPEG2TSExtractor> &extractor,
50            const sp<AnotherPacketSource> &impl,
51            bool doesSeek);
52
53    virtual status_t start(MetaData *params = NULL);
54    virtual status_t stop();
55    virtual sp<MetaData> getFormat();
56
57    virtual status_t read(
58            MediaBuffer **buffer, const ReadOptions *options = NULL);
59
60private:
61    sp<MPEG2TSExtractor> mExtractor;
62    sp<AnotherPacketSource> mImpl;
63
64    // If there are both audio and video streams, only the video stream
65    // will signal seek on the extractor; otherwise the single stream will seek.
66    bool mDoesSeek;
67
68    DISALLOW_EVIL_CONSTRUCTORS(MPEG2TSSource);
69};
70
71MPEG2TSSource::MPEG2TSSource(
72        const sp<MPEG2TSExtractor> &extractor,
73        const sp<AnotherPacketSource> &impl,
74        bool doesSeek)
75    : mExtractor(extractor),
76      mImpl(impl),
77      mDoesSeek(doesSeek) {
78}
79
80status_t MPEG2TSSource::start(MetaData *params) {
81    return mImpl->start(params);
82}
83
84status_t MPEG2TSSource::stop() {
85    return mImpl->stop();
86}
87
88sp<MetaData> MPEG2TSSource::getFormat() {
89    return mImpl->getFormat();
90}
91
92status_t MPEG2TSSource::read(
93        MediaBuffer **out, const ReadOptions *options) {
94    *out = NULL;
95
96    int64_t seekTimeUs;
97    ReadOptions::SeekMode seekMode;
98    if (mDoesSeek && options && options->getSeekTo(&seekTimeUs, &seekMode)) {
99        // seek is needed
100        status_t err = mExtractor->seek(seekTimeUs, seekMode);
101        if (err != OK) {
102            return err;
103        }
104    }
105
106    if (mExtractor->feedUntilBufferAvailable(mImpl) != OK) {
107        return ERROR_END_OF_STREAM;
108    }
109
110    return mImpl->read(out, options);
111}
112
113////////////////////////////////////////////////////////////////////////////////
114
115MPEG2TSExtractor::MPEG2TSExtractor(const sp<DataSource> &source)
116    : mDataSource(source),
117      mParser(new ATSParser),
118      mLastSyncEvent(0),
119      mOffset(0) {
120    init();
121}
122
123size_t MPEG2TSExtractor::countTracks() {
124    return mSourceImpls.size();
125}
126
127sp<IMediaSource> MPEG2TSExtractor::getTrack(size_t index) {
128    if (index >= mSourceImpls.size()) {
129        return NULL;
130    }
131
132    // The seek reference track (video if present; audio otherwise) performs
133    // seek requests, while other tracks ignore requests.
134    return new MPEG2TSSource(this, mSourceImpls.editItemAt(index),
135            (mSeekSyncPoints == &mSyncPoints.editItemAt(index)));
136}
137
138sp<MetaData> MPEG2TSExtractor::getTrackMetaData(
139        size_t index, uint32_t /* flags */) {
140    return index < mSourceImpls.size()
141        ? mSourceImpls.editItemAt(index)->getFormat() : NULL;
142}
143
144sp<MetaData> MPEG2TSExtractor::getMetaData() {
145    sp<MetaData> meta = new MetaData;
146    meta->setCString(kKeyMIMEType, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
147
148    return meta;
149}
150
151//static
152bool MPEG2TSExtractor::isScrambledFormat(const sp<MetaData> &format) {
153    const char *mime;
154    return format->findCString(kKeyMIMEType, &mime)
155            && (!strcasecmp(MEDIA_MIMETYPE_VIDEO_SCRAMBLED, mime)
156                    || !strcasecmp(MEDIA_MIMETYPE_AUDIO_SCRAMBLED, mime));
157}
158
159status_t MPEG2TSExtractor::setMediaCas(const sp<ICas> &cas) {
160    ALOGD("setMediaCas: %p", cas.get());
161
162    status_t err = mParser->setMediaCas(cas);
163    if (err == OK) {
164        ALOGI("All tracks now have descramblers");
165        init();
166    }
167    return err;
168}
169
170void MPEG2TSExtractor::addSource(const sp<AnotherPacketSource> &impl) {
171    bool found = false;
172    for (size_t i = 0; i < mSourceImpls.size(); i++) {
173        if (mSourceImpls[i] == impl) {
174            found = true;
175            break;
176        }
177    }
178    if (!found) {
179        mSourceImpls.push(impl);
180    }
181}
182
183void MPEG2TSExtractor::init() {
184    bool haveAudio = false;
185    bool haveVideo = false;
186    int64_t startTime = ALooper::GetNowUs();
187
188    status_t err;
189    while ((err = feedMore(true /* isInit */)) == OK
190            || err == ERROR_DRM_DECRYPT_UNIT_NOT_INITIALIZED) {
191        if (haveAudio && haveVideo) {
192            addSyncPoint_l(mLastSyncEvent);
193            mLastSyncEvent.reset();
194            break;
195        }
196        if (!haveVideo) {
197            sp<AnotherPacketSource> impl =
198                (AnotherPacketSource *)mParser->getSource(
199                        ATSParser::VIDEO).get();
200
201            if (impl != NULL) {
202                sp<MetaData> format = impl->getFormat();
203                if (format != NULL) {
204                    haveVideo = true;
205                    addSource(impl);
206                    if (!isScrambledFormat(format)) {
207                        mSyncPoints.push();
208                        mSeekSyncPoints = &mSyncPoints.editTop();
209                    }
210                }
211            }
212        }
213
214        if (!haveAudio) {
215            sp<AnotherPacketSource> impl =
216                (AnotherPacketSource *)mParser->getSource(
217                        ATSParser::AUDIO).get();
218
219            if (impl != NULL) {
220                sp<MetaData> format = impl->getFormat();
221                if (format != NULL) {
222                    haveAudio = true;
223                    addSource(impl);
224                    if (!isScrambledFormat(format)) {
225                        mSyncPoints.push();
226                        if (!haveVideo) {
227                            mSeekSyncPoints = &mSyncPoints.editTop();
228                        }
229                    }
230                }
231            }
232        }
233
234        addSyncPoint_l(mLastSyncEvent);
235        mLastSyncEvent.reset();
236
237        // ERROR_DRM_DECRYPT_UNIT_NOT_INITIALIZED is returned when the mpeg2ts
238        // is scrambled but we don't have a MediaCas object set. The extraction
239        // will only continue when setMediaCas() is called successfully.
240        if (err == ERROR_DRM_DECRYPT_UNIT_NOT_INITIALIZED) {
241            ALOGI("stopped parsing scrambled content, "
242                  "haveAudio=%d, haveVideo=%d, elaspedTime=%" PRId64,
243                    haveAudio, haveVideo, ALooper::GetNowUs() - startTime);
244            return;
245        }
246
247        // Wait only for 2 seconds to detect audio/video streams.
248        if (ALooper::GetNowUs() - startTime > 2000000ll) {
249            break;
250        }
251    }
252
253    off64_t size;
254    if (mDataSource->getSize(&size) == OK && (haveAudio || haveVideo)) {
255        sp<AnotherPacketSource> impl = haveVideo
256                ? (AnotherPacketSource *)mParser->getSource(
257                        ATSParser::VIDEO).get()
258                : (AnotherPacketSource *)mParser->getSource(
259                        ATSParser::AUDIO).get();
260        size_t prevSyncSize = 1;
261        int64_t durationUs = -1;
262        List<int64_t> durations;
263        // Estimate duration --- stabilize until you get <500ms deviation.
264        while (feedMore() == OK
265                && ALooper::GetNowUs() - startTime <= 2000000ll) {
266            if (mSeekSyncPoints->size() > prevSyncSize) {
267                prevSyncSize = mSeekSyncPoints->size();
268                int64_t diffUs = mSeekSyncPoints->keyAt(prevSyncSize - 1)
269                        - mSeekSyncPoints->keyAt(0);
270                off64_t diffOffset = mSeekSyncPoints->valueAt(prevSyncSize - 1)
271                        - mSeekSyncPoints->valueAt(0);
272                int64_t currentDurationUs = size * diffUs / diffOffset;
273                durations.push_back(currentDurationUs);
274                if (durations.size() > 5) {
275                    durations.erase(durations.begin());
276                    int64_t min = *durations.begin();
277                    int64_t max = *durations.begin();
278                    for (auto duration : durations) {
279                        if (min > duration) {
280                            min = duration;
281                        }
282                        if (max < duration) {
283                            max = duration;
284                        }
285                    }
286                    if (max - min < 500 * 1000) {
287                        durationUs = currentDurationUs;
288                        break;
289                    }
290                }
291            }
292        }
293        status_t err;
294        int64_t bufferedDurationUs;
295        bufferedDurationUs = impl->getBufferedDurationUs(&err);
296        if (err == ERROR_END_OF_STREAM) {
297            durationUs = bufferedDurationUs;
298        }
299        if (durationUs > 0) {
300            const sp<MetaData> meta = impl->getFormat();
301            meta->setInt64(kKeyDuration, durationUs);
302            impl->setFormat(meta);
303        } else {
304            estimateDurationsFromTimesUsAtEnd();
305        }
306    }
307
308    ALOGI("haveAudio=%d, haveVideo=%d, elaspedTime=%" PRId64,
309            haveAudio, haveVideo, ALooper::GetNowUs() - startTime);
310}
311
312status_t MPEG2TSExtractor::feedMore(bool isInit) {
313    Mutex::Autolock autoLock(mLock);
314
315    uint8_t packet[kTSPacketSize];
316    ssize_t n = mDataSource->readAt(mOffset, packet, kTSPacketSize);
317
318    if (n < (ssize_t)kTSPacketSize) {
319        if (n >= 0) {
320            mParser->signalEOS(ERROR_END_OF_STREAM);
321        }
322        return (n < 0) ? (status_t)n : ERROR_END_OF_STREAM;
323    }
324
325    ATSParser::SyncEvent event(mOffset);
326    mOffset += n;
327    status_t err = mParser->feedTSPacket(packet, kTSPacketSize, &event);
328    if (event.hasReturnedData()) {
329        if (isInit) {
330            mLastSyncEvent = event;
331        } else {
332            addSyncPoint_l(event);
333        }
334    }
335    return err;
336}
337
338void MPEG2TSExtractor::addSyncPoint_l(const ATSParser::SyncEvent &event) {
339    if (!event.hasReturnedData()) {
340        return;
341    }
342
343    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
344        if (mSourceImpls[i].get() == event.getMediaSource().get()) {
345            KeyedVector<int64_t, off64_t> *syncPoints = &mSyncPoints.editItemAt(i);
346            syncPoints->add(event.getTimeUs(), event.getOffset());
347            // We're keeping the size of the sync points at most 5mb per a track.
348            size_t size = syncPoints->size();
349            if (size >= 327680) {
350                int64_t firstTimeUs = syncPoints->keyAt(0);
351                int64_t lastTimeUs = syncPoints->keyAt(size - 1);
352                if (event.getTimeUs() - firstTimeUs > lastTimeUs - event.getTimeUs()) {
353                    syncPoints->removeItemsAt(0, 4096);
354                } else {
355                    syncPoints->removeItemsAt(size - 4096, 4096);
356                }
357            }
358            break;
359        }
360    }
361}
362
363status_t MPEG2TSExtractor::estimateDurationsFromTimesUsAtEnd()  {
364    if (!(mDataSource->flags() & DataSource::kIsLocalFileSource)) {
365        return ERROR_UNSUPPORTED;
366    }
367
368    off64_t size = 0;
369    status_t err = mDataSource->getSize(&size);
370    if (err != OK) {
371        return err;
372    }
373
374    uint8_t packet[kTSPacketSize];
375    const off64_t zero = 0;
376    off64_t offset = max(zero, size - kMaxDurationReadSize);
377    if (mDataSource->readAt(offset, &packet, 0) < 0) {
378        return ERROR_IO;
379    }
380
381    int retry = 0;
382    bool allDurationsFound = false;
383    int64_t timeAnchorUs = mParser->getFirstPTSTimeUs();
384    do {
385        int bytesRead = 0;
386        sp<ATSParser> parser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
387        ATSParser::SyncEvent ev(0);
388        offset = max(zero, size - (kMaxDurationReadSize << retry));
389        offset = (offset / kTSPacketSize) * kTSPacketSize;
390        for (;;) {
391            if (bytesRead >= kMaxDurationReadSize << max(0, retry - 1)) {
392                break;
393            }
394
395            ssize_t n = mDataSource->readAt(offset, packet, kTSPacketSize);
396            if (n < 0) {
397                return n;
398            } else if (n < (ssize_t)kTSPacketSize) {
399                break;
400            }
401
402            offset += kTSPacketSize;
403            bytesRead += kTSPacketSize;
404            err = parser->feedTSPacket(packet, kTSPacketSize, &ev);
405            if (err != OK) {
406                return err;
407            }
408
409            if (ev.hasReturnedData()) {
410                int64_t durationUs = ev.getTimeUs();
411                ATSParser::SourceType type = ev.getType();
412                ev.reset();
413
414                int64_t firstTimeUs;
415                sp<AnotherPacketSource> src =
416                    (AnotherPacketSource *)mParser->getSource(type).get();
417                if (src == NULL || src->nextBufferTime(&firstTimeUs) != OK) {
418                    continue;
419                }
420                durationUs += src->getEstimatedBufferDurationUs();
421                durationUs -= timeAnchorUs;
422                durationUs -= firstTimeUs;
423                if (durationUs > 0) {
424                    int64_t origDurationUs, lastDurationUs;
425                    const sp<MetaData> meta = src->getFormat();
426                    const uint32_t kKeyLastDuration = 'ldur';
427                    // Require two consecutive duration calculations to be within 1 sec before
428                    // updating; use MetaData to store previous duration estimate in per-stream
429                    // context.
430                    if (!meta->findInt64(kKeyDuration, &origDurationUs)
431                            || !meta->findInt64(kKeyLastDuration, &lastDurationUs)
432                            || (origDurationUs < durationUs
433                             && abs(durationUs - lastDurationUs) < 60000000)) {
434                        meta->setInt64(kKeyDuration, durationUs);
435                    }
436                    meta->setInt64(kKeyLastDuration, durationUs);
437                }
438            }
439        }
440
441        if (!allDurationsFound) {
442            allDurationsFound = true;
443            for (auto t: {ATSParser::VIDEO, ATSParser::AUDIO}) {
444                sp<AnotherPacketSource> src = (AnotherPacketSource *)mParser->getSource(t).get();
445                if (src == NULL) {
446                    continue;
447                }
448                int64_t durationUs;
449                const sp<MetaData> meta = src->getFormat();
450                if (!meta->findInt64(kKeyDuration, &durationUs)) {
451                    allDurationsFound = false;
452                    break;
453                }
454            }
455        }
456
457        ++retry;
458    } while(!allDurationsFound && offset > 0 && retry <= kMaxDurationRetry);
459
460    return allDurationsFound? OK : ERROR_UNSUPPORTED;
461}
462
463uint32_t MPEG2TSExtractor::flags() const {
464    return CAN_PAUSE | CAN_SEEK_BACKWARD | CAN_SEEK_FORWARD;
465}
466
467status_t MPEG2TSExtractor::seek(int64_t seekTimeUs,
468        const MediaSource::ReadOptions::SeekMode &seekMode) {
469    if (mSeekSyncPoints == NULL || mSeekSyncPoints->isEmpty()) {
470        ALOGW("No sync point to seek to.");
471        // ... and therefore we have nothing useful to do here.
472        return OK;
473    }
474
475    // Determine whether we're seeking beyond the known area.
476    bool shouldSeekBeyond =
477            (seekTimeUs > mSeekSyncPoints->keyAt(mSeekSyncPoints->size() - 1));
478
479    // Determine the sync point to seek.
480    size_t index = 0;
481    for (; index < mSeekSyncPoints->size(); ++index) {
482        int64_t timeUs = mSeekSyncPoints->keyAt(index);
483        if (timeUs > seekTimeUs) {
484            break;
485        }
486    }
487
488    switch (seekMode) {
489        case MediaSource::ReadOptions::SEEK_NEXT_SYNC:
490            if (index == mSeekSyncPoints->size()) {
491                ALOGW("Next sync not found; starting from the latest sync.");
492                --index;
493            }
494            break;
495        case MediaSource::ReadOptions::SEEK_CLOSEST_SYNC:
496        case MediaSource::ReadOptions::SEEK_CLOSEST:
497            ALOGW("seekMode not supported: %d; falling back to PREVIOUS_SYNC",
498                    seekMode);
499            // fall-through
500        case MediaSource::ReadOptions::SEEK_PREVIOUS_SYNC:
501            if (index == 0) {
502                ALOGW("Previous sync not found; starting from the earliest "
503                        "sync.");
504            } else {
505                --index;
506            }
507            break;
508    }
509    if (!shouldSeekBeyond || mOffset <= mSeekSyncPoints->valueAt(index)) {
510        int64_t actualSeekTimeUs = mSeekSyncPoints->keyAt(index);
511        mOffset = mSeekSyncPoints->valueAt(index);
512        status_t err = queueDiscontinuityForSeek(actualSeekTimeUs);
513        if (err != OK) {
514            return err;
515        }
516    }
517
518    if (shouldSeekBeyond) {
519        status_t err = seekBeyond(seekTimeUs);
520        if (err != OK) {
521            return err;
522        }
523    }
524
525    // Fast-forward to sync frame.
526    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
527        const sp<AnotherPacketSource> &impl = mSourceImpls[i];
528        status_t err;
529        feedUntilBufferAvailable(impl);
530        while (impl->hasBufferAvailable(&err)) {
531            sp<AMessage> meta = impl->getMetaAfterLastDequeued(0);
532            sp<ABuffer> buffer;
533            if (meta == NULL) {
534                return UNKNOWN_ERROR;
535            }
536            int32_t sync;
537            if (meta->findInt32("isSync", &sync) && sync) {
538                break;
539            }
540            err = impl->dequeueAccessUnit(&buffer);
541            if (err != OK) {
542                return err;
543            }
544            feedUntilBufferAvailable(impl);
545        }
546    }
547
548    return OK;
549}
550
551status_t MPEG2TSExtractor::queueDiscontinuityForSeek(int64_t actualSeekTimeUs) {
552    // Signal discontinuity
553    sp<AMessage> extra(new AMessage);
554    extra->setInt64(IStreamListener::kKeyMediaTimeUs, actualSeekTimeUs);
555    mParser->signalDiscontinuity(ATSParser::DISCONTINUITY_TIME, extra);
556
557    // After discontinuity, impl should only have discontinuities
558    // with the last being what we queued. Dequeue them all here.
559    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
560        const sp<AnotherPacketSource> &impl = mSourceImpls.itemAt(i);
561        sp<ABuffer> buffer;
562        status_t err;
563        while (impl->hasBufferAvailable(&err)) {
564            if (err != OK) {
565                return err;
566            }
567            err = impl->dequeueAccessUnit(&buffer);
568            // If the source contains anything but discontinuity, that's
569            // a programming mistake.
570            CHECK(err == INFO_DISCONTINUITY);
571        }
572    }
573
574    // Feed until we have a buffer for each source.
575    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
576        const sp<AnotherPacketSource> &impl = mSourceImpls.itemAt(i);
577        sp<ABuffer> buffer;
578        status_t err = feedUntilBufferAvailable(impl);
579        if (err != OK) {
580            return err;
581        }
582    }
583
584    return OK;
585}
586
587status_t MPEG2TSExtractor::seekBeyond(int64_t seekTimeUs) {
588    // If we're seeking beyond where we know --- read until we reach there.
589    size_t syncPointsSize = mSeekSyncPoints->size();
590
591    while (seekTimeUs > mSeekSyncPoints->keyAt(
592            mSeekSyncPoints->size() - 1)) {
593        status_t err;
594        if (syncPointsSize < mSeekSyncPoints->size()) {
595            syncPointsSize = mSeekSyncPoints->size();
596            int64_t syncTimeUs = mSeekSyncPoints->keyAt(syncPointsSize - 1);
597            // Dequeue buffers before sync point in order to avoid too much
598            // cache building up.
599            sp<ABuffer> buffer;
600            for (size_t i = 0; i < mSourceImpls.size(); ++i) {
601                const sp<AnotherPacketSource> &impl = mSourceImpls[i];
602                int64_t timeUs;
603                while ((err = impl->nextBufferTime(&timeUs)) == OK) {
604                    if (timeUs < syncTimeUs) {
605                        impl->dequeueAccessUnit(&buffer);
606                    } else {
607                        break;
608                    }
609                }
610                if (err != OK && err != -EWOULDBLOCK) {
611                    return err;
612                }
613            }
614        }
615        if (feedMore() != OK) {
616            return ERROR_END_OF_STREAM;
617        }
618    }
619
620    return OK;
621}
622
623status_t MPEG2TSExtractor::feedUntilBufferAvailable(
624        const sp<AnotherPacketSource> &impl) {
625    status_t finalResult;
626    while (!impl->hasBufferAvailable(&finalResult)) {
627        if (finalResult != OK) {
628            return finalResult;
629        }
630
631        status_t err = feedMore();
632        if (err != OK) {
633            impl->signalEOS(err);
634        }
635    }
636    return OK;
637}
638
639////////////////////////////////////////////////////////////////////////////////
640
641bool SniffMPEG2TS(
642        const sp<DataSource> &source, String8 *mimeType, float *confidence,
643        sp<AMessage> *) {
644    for (int i = 0; i < 5; ++i) {
645        char header;
646        if (source->readAt(kTSPacketSize * i, &header, 1) != 1
647                || header != 0x47) {
648            return false;
649        }
650    }
651
652    *confidence = 0.1f;
653    mimeType->setTo(MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
654
655    return true;
656}
657
658}  // namespace android
659