1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33#include <utils/Vector.h>
34
35#include <inttypes.h>
36
37namespace android {
38
39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42    : mIsAudio(false),
43      mIsVideo(false),
44      mEnabled(true),
45      mFormat(NULL),
46      mLastQueuedTimeUs(0),
47      mEOSResult(OK),
48      mLatestEnqueuedMeta(NULL),
49      mLatestDequeuedMeta(NULL) {
50    setFormat(meta);
51
52    mDiscontinuitySegments.push_back(DiscontinuitySegment());
53}
54
55void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56    if (mFormat != NULL) {
57        // Only allowed to be set once. Requires explicit clear to reset.
58        return;
59    }
60
61    mIsAudio = false;
62    mIsVideo = false;
63
64    if (meta == NULL) {
65        return;
66    }
67
68    mFormat = meta;
69    const char *mime;
70    CHECK(meta->findCString(kKeyMIMEType, &mime));
71
72    if (!strncasecmp("audio/", mime, 6)) {
73        mIsAudio = true;
74    } else  if (!strncasecmp("video/", mime, 6)) {
75        mIsVideo = true;
76    } else {
77        CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78    }
79}
80
81AnotherPacketSource::~AnotherPacketSource() {
82}
83
84status_t AnotherPacketSource::start(MetaData * /* params */) {
85    return OK;
86}
87
88status_t AnotherPacketSource::stop() {
89    return OK;
90}
91
92sp<MetaData> AnotherPacketSource::getFormat() {
93    Mutex::Autolock autoLock(mLock);
94    if (mFormat != NULL) {
95        return mFormat;
96    }
97
98    List<sp<ABuffer> >::iterator it = mBuffers.begin();
99    while (it != mBuffers.end()) {
100        sp<ABuffer> buffer = *it;
101        int32_t discontinuity;
102        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103            sp<RefBase> object;
104            if (buffer->meta()->findObject("format", &object)) {
105                setFormat(static_cast<MetaData*>(object.get()));
106                return mFormat;
107            }
108        }
109
110        ++it;
111    }
112    return NULL;
113}
114
115status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116    buffer->clear();
117
118    Mutex::Autolock autoLock(mLock);
119    while (mEOSResult == OK && mBuffers.empty()) {
120        mCondition.wait(mLock);
121    }
122
123    if (!mBuffers.empty()) {
124        *buffer = *mBuffers.begin();
125        mBuffers.erase(mBuffers.begin());
126
127        int32_t discontinuity;
128        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129            if (wasFormatChange(discontinuity)) {
130                mFormat.clear();
131            }
132
133            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134            // CHECK(!mDiscontinuitySegments.empty());
135            return INFO_DISCONTINUITY;
136        }
137
138        // CHECK(!mDiscontinuitySegments.empty());
139        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140
141        int64_t timeUs;
142        mLatestDequeuedMeta = (*buffer)->meta()->dup();
143        CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144        if (timeUs > seg.mMaxDequeTimeUs) {
145            seg.mMaxDequeTimeUs = timeUs;
146        }
147
148        sp<RefBase> object;
149        if ((*buffer)->meta()->findObject("format", &object)) {
150            setFormat(static_cast<MetaData*>(object.get()));
151        }
152
153        return OK;
154    }
155
156    return mEOSResult;
157}
158
159void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160    // TODO: update corresponding book keeping info.
161    Mutex::Autolock autoLock(mLock);
162    mBuffers.push_front(buffer);
163}
164
165status_t AnotherPacketSource::read(
166        MediaBuffer **out, const ReadOptions *) {
167    *out = NULL;
168
169    Mutex::Autolock autoLock(mLock);
170    while (mEOSResult == OK && mBuffers.empty()) {
171        mCondition.wait(mLock);
172    }
173
174    if (!mBuffers.empty()) {
175
176        const sp<ABuffer> buffer = *mBuffers.begin();
177        mBuffers.erase(mBuffers.begin());
178
179        int32_t discontinuity;
180        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181            if (wasFormatChange(discontinuity)) {
182                mFormat.clear();
183            }
184
185            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186            // CHECK(!mDiscontinuitySegments.empty());
187            return INFO_DISCONTINUITY;
188        }
189
190        mLatestDequeuedMeta = buffer->meta()->dup();
191
192        sp<RefBase> object;
193        if (buffer->meta()->findObject("format", &object)) {
194            setFormat(static_cast<MetaData*>(object.get()));
195        }
196
197        int64_t timeUs;
198        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199        // CHECK(!mDiscontinuitySegments.empty());
200        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201        if (timeUs > seg.mMaxDequeTimeUs) {
202            seg.mMaxDequeTimeUs = timeUs;
203        }
204
205        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206
207        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208
209        int32_t isSync;
210        if (buffer->meta()->findInt32("isSync", &isSync)) {
211            mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212        }
213
214        sp<ABuffer> sei;
215        if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216            mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217        }
218
219        *out = mediaBuffer;
220        return OK;
221    }
222
223    return mEOSResult;
224}
225
226bool AnotherPacketSource::wasFormatChange(
227        int32_t discontinuityType) const {
228    if (mIsAudio) {
229        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
230    }
231
232    if (mIsVideo) {
233        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
234    }
235
236    return false;
237}
238
239void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
240    int32_t damaged;
241    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
242        // LOG(VERBOSE) << "discarding damaged AU";
243        return;
244    }
245
246    Mutex::Autolock autoLock(mLock);
247    mBuffers.push_back(buffer);
248    mCondition.signal();
249
250    int32_t discontinuity;
251    if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
252        ALOGV("queueing a discontinuity with queueAccessUnit");
253
254        mLastQueuedTimeUs = 0ll;
255        mEOSResult = OK;
256        mLatestEnqueuedMeta = NULL;
257
258        mDiscontinuitySegments.push_back(DiscontinuitySegment());
259        return;
260    }
261
262    int64_t lastQueuedTimeUs;
263    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
264    mLastQueuedTimeUs = lastQueuedTimeUs;
265    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
266            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
267
268    // CHECK(!mDiscontinuitySegments.empty());
269    DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
270    if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
271        tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
272    }
273    if (tailSeg.mMaxDequeTimeUs == -1) {
274        tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
275    }
276
277    if (mLatestEnqueuedMeta == NULL) {
278        mLatestEnqueuedMeta = buffer->meta()->dup();
279    } else {
280        int64_t latestTimeUs = 0;
281        int64_t frameDeltaUs = 0;
282        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
283        if (lastQueuedTimeUs > latestTimeUs) {
284            mLatestEnqueuedMeta = buffer->meta()->dup();
285            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
286            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
287        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
288            // For B frames
289            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
290            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
291        }
292    }
293}
294
295void AnotherPacketSource::clear() {
296    Mutex::Autolock autoLock(mLock);
297
298    mBuffers.clear();
299    mEOSResult = OK;
300
301    mDiscontinuitySegments.clear();
302    mDiscontinuitySegments.push_back(DiscontinuitySegment());
303
304    mFormat = NULL;
305    mLatestEnqueuedMeta = NULL;
306}
307
308void AnotherPacketSource::queueDiscontinuity(
309        ATSParser::DiscontinuityType type,
310        const sp<AMessage> &extra,
311        bool discard) {
312    Mutex::Autolock autoLock(mLock);
313
314    if (discard) {
315        // Leave only discontinuities in the queue.
316        List<sp<ABuffer> >::iterator it = mBuffers.begin();
317        while (it != mBuffers.end()) {
318            sp<ABuffer> oldBuffer = *it;
319
320            int32_t oldDiscontinuityType;
321            if (!oldBuffer->meta()->findInt32(
322                        "discontinuity", &oldDiscontinuityType)) {
323                it = mBuffers.erase(it);
324                continue;
325            }
326
327            ++it;
328        }
329
330        for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
331                it2 != mDiscontinuitySegments.end();
332                ++it2) {
333            DiscontinuitySegment &seg = *it2;
334            seg.clear();
335        }
336
337    }
338
339    mEOSResult = OK;
340    mLastQueuedTimeUs = 0;
341    mLatestEnqueuedMeta = NULL;
342
343    if (type == ATSParser::DISCONTINUITY_NONE) {
344        return;
345    }
346
347    mDiscontinuitySegments.push_back(DiscontinuitySegment());
348
349    sp<ABuffer> buffer = new ABuffer(0);
350    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
351    buffer->meta()->setMessage("extra", extra);
352
353    mBuffers.push_back(buffer);
354    mCondition.signal();
355}
356
357void AnotherPacketSource::signalEOS(status_t result) {
358    CHECK(result != OK);
359
360    Mutex::Autolock autoLock(mLock);
361    mEOSResult = result;
362    mCondition.signal();
363}
364
365bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
366    Mutex::Autolock autoLock(mLock);
367    *finalResult = OK;
368    if (!mEnabled) {
369        return false;
370    }
371    if (!mBuffers.empty()) {
372        return true;
373    }
374
375    *finalResult = mEOSResult;
376    return false;
377}
378
379bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
380    Mutex::Autolock autoLock(mLock);
381    *finalResult = OK;
382    if (!mEnabled) {
383        return false;
384    }
385    List<sp<ABuffer> >::iterator it;
386    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
387        int32_t discontinuity;
388        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
389            return true;
390        }
391    }
392
393    *finalResult = mEOSResult;
394    return false;
395}
396
397size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
398    Mutex::Autolock autoLock(mLock);
399
400    *finalResult = OK;
401    if (!mEnabled) {
402        return 0;
403    }
404    if (!mBuffers.empty()) {
405        return mBuffers.size();
406    }
407    *finalResult = mEOSResult;
408    return 0;
409}
410
411int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
412    Mutex::Autolock autoLock(mLock);
413    *finalResult = mEOSResult;
414
415    int64_t durationUs = 0;
416    for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
417            it != mDiscontinuitySegments.end();
418            ++it) {
419        const DiscontinuitySegment &seg = *it;
420        // dequeued access units should be a subset of enqueued access units
421        // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
422        durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
423    }
424
425    return durationUs;
426}
427
428status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
429    *timeUs = 0;
430
431    Mutex::Autolock autoLock(mLock);
432
433    if (mBuffers.empty()) {
434        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
435    }
436
437    sp<ABuffer> buffer = *mBuffers.begin();
438    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
439
440    return OK;
441}
442
443bool AnotherPacketSource::isFinished(int64_t duration) const {
444    if (duration > 0) {
445        int64_t diff = duration - mLastQueuedTimeUs;
446        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
447            ALOGV("Detecting EOS due to near end");
448            return true;
449        }
450    }
451    return (mEOSResult != OK);
452}
453
454sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
455    Mutex::Autolock autoLock(mLock);
456    return mLatestEnqueuedMeta;
457}
458
459sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
460    Mutex::Autolock autoLock(mLock);
461    return mLatestDequeuedMeta;
462}
463
464void AnotherPacketSource::enable(bool enable) {
465    Mutex::Autolock autoLock(mLock);
466    mEnabled = enable;
467}
468
469/*
470 * returns the sample meta that's delayUs after queue head
471 * (NULL if such sample is unavailable)
472 */
473sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
474    Mutex::Autolock autoLock(mLock);
475    int64_t firstUs = -1;
476    int64_t lastUs = -1;
477    int64_t durationUs = 0;
478
479    List<sp<ABuffer> >::iterator it;
480    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
481        const sp<ABuffer> &buffer = *it;
482        int32_t discontinuity;
483        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
484            durationUs += lastUs - firstUs;
485            firstUs = -1;
486            lastUs = -1;
487            continue;
488        }
489        int64_t timeUs;
490        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
491            if (firstUs < 0) {
492                firstUs = timeUs;
493            }
494            if (lastUs < 0 || timeUs > lastUs) {
495                lastUs = timeUs;
496            }
497            if (durationUs + (lastUs - firstUs) >= delayUs) {
498                return buffer->meta();
499            }
500        }
501    }
502    return NULL;
503}
504
505/*
506 * removes samples with time equal or after meta
507 */
508void AnotherPacketSource::trimBuffersAfterMeta(
509        const sp<AMessage> &meta) {
510    if (meta == NULL) {
511        ALOGW("trimming with NULL meta, ignoring");
512        return;
513    }
514
515    Mutex::Autolock autoLock(mLock);
516    if (mBuffers.empty()) {
517        return;
518    }
519
520    HLSTime stopTime(meta);
521    ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
522            stopTime.mSeq, (long long)stopTime.mTimeUs);
523
524    List<sp<ABuffer> >::iterator it;
525    List<DiscontinuitySegment >::iterator it2;
526    sp<AMessage> newLatestEnqueuedMeta = NULL;
527    int64_t newLastQueuedTimeUs = 0;
528    for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
529        const sp<ABuffer> &buffer = *it;
530        int32_t discontinuity;
531        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
532            // CHECK(it2 != mDiscontinuitySegments.end());
533            ++it2;
534            continue;
535        }
536
537        HLSTime curTime(buffer->meta());
538        if (!(curTime < stopTime)) {
539            ALOGV("trimming from %lld (inclusive) to end",
540                    (long long)curTime.mTimeUs);
541            break;
542        }
543        newLatestEnqueuedMeta = buffer->meta();
544        newLastQueuedTimeUs = curTime.mTimeUs;
545    }
546
547    mBuffers.erase(it, mBuffers.end());
548    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
549    mLastQueuedTimeUs = newLastQueuedTimeUs;
550
551    DiscontinuitySegment &seg = *it2;
552    if (newLatestEnqueuedMeta != NULL) {
553        seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
554    } else {
555        seg.clear();
556    }
557    mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
558}
559
560/*
561 * removes samples with time equal or before meta;
562 * returns first sample left in the queue.
563 *
564 * (for AVC, if trim happens, the samples left will always start
565 * at next IDR.)
566 */
567sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
568        const sp<AMessage> &meta) {
569    HLSTime startTime(meta);
570    ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
571            startTime.mSeq, (long long)startTime.mTimeUs);
572
573    sp<AMessage> firstMeta;
574    int64_t firstTimeUs = -1;
575    Mutex::Autolock autoLock(mLock);
576    if (mBuffers.empty()) {
577        return NULL;
578    }
579
580    sp<MetaData> format;
581    bool isAvc = false;
582
583    List<sp<ABuffer> >::iterator it;
584    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
585        const sp<ABuffer> &buffer = *it;
586        int32_t discontinuity;
587        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
588            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
589            // CHECK(!mDiscontinuitySegments.empty());
590            format = NULL;
591            isAvc = false;
592            continue;
593        }
594        if (format == NULL) {
595            sp<RefBase> object;
596            if (buffer->meta()->findObject("format", &object)) {
597                const char* mime;
598                format = static_cast<MetaData*>(object.get());
599                isAvc = format != NULL
600                        && format->findCString(kKeyMIMEType, &mime)
601                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
602            }
603        }
604        if (isAvc && !IsIDR(buffer)) {
605            continue;
606        }
607
608        HLSTime curTime(buffer->meta());
609        if (startTime < curTime) {
610            ALOGV("trimming from beginning to %lld (not inclusive)",
611                    (long long)curTime.mTimeUs);
612            firstMeta = buffer->meta();
613            firstTimeUs = curTime.mTimeUs;
614            break;
615        }
616    }
617    mBuffers.erase(mBuffers.begin(), it);
618    mLatestDequeuedMeta = NULL;
619
620    // CHECK(!mDiscontinuitySegments.empty());
621    DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
622    if (firstTimeUs >= 0) {
623        seg.mMaxDequeTimeUs = firstTimeUs;
624    } else {
625        seg.clear();
626    }
627
628    return firstMeta;
629}
630
631}  // namespace android
632