1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33#include <utils/Vector.h>
34
35#include <inttypes.h>
36
37namespace android {
38
39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42    : mIsAudio(false),
43      mIsVideo(false),
44      mEnabled(true),
45      mFormat(NULL),
46      mLastQueuedTimeUs(0),
47      mEOSResult(OK),
48      mLatestEnqueuedMeta(NULL),
49      mLatestDequeuedMeta(NULL) {
50    setFormat(meta);
51
52    mDiscontinuitySegments.push_back(DiscontinuitySegment());
53}
54
55void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56    if (mFormat != NULL) {
57        // Only allowed to be set once. Requires explicit clear to reset.
58        return;
59    }
60
61    mIsAudio = false;
62    mIsVideo = false;
63
64    if (meta == NULL) {
65        return;
66    }
67
68    mFormat = meta;
69    const char *mime;
70    CHECK(meta->findCString(kKeyMIMEType, &mime));
71
72    if (!strncasecmp("audio/", mime, 6)) {
73        mIsAudio = true;
74    } else  if (!strncasecmp("video/", mime, 6)) {
75        mIsVideo = true;
76    } else {
77        CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78    }
79}
80
81AnotherPacketSource::~AnotherPacketSource() {
82}
83
84status_t AnotherPacketSource::start(MetaData * /* params */) {
85    return OK;
86}
87
88status_t AnotherPacketSource::stop() {
89    return OK;
90}
91
92sp<MetaData> AnotherPacketSource::getFormat() {
93    Mutex::Autolock autoLock(mLock);
94    if (mFormat != NULL) {
95        return mFormat;
96    }
97
98    List<sp<ABuffer> >::iterator it = mBuffers.begin();
99    while (it != mBuffers.end()) {
100        sp<ABuffer> buffer = *it;
101        int32_t discontinuity;
102        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103            sp<RefBase> object;
104            if (buffer->meta()->findObject("format", &object)) {
105                setFormat(static_cast<MetaData*>(object.get()));
106                return mFormat;
107            }
108        }
109
110        ++it;
111    }
112    return NULL;
113}
114
115status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116    buffer->clear();
117
118    Mutex::Autolock autoLock(mLock);
119    while (mEOSResult == OK && mBuffers.empty()) {
120        mCondition.wait(mLock);
121    }
122
123    if (!mBuffers.empty()) {
124        *buffer = *mBuffers.begin();
125        mBuffers.erase(mBuffers.begin());
126
127        int32_t discontinuity;
128        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129            if (wasFormatChange(discontinuity)) {
130                mFormat.clear();
131            }
132
133            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134            // CHECK(!mDiscontinuitySegments.empty());
135            return INFO_DISCONTINUITY;
136        }
137
138        // CHECK(!mDiscontinuitySegments.empty());
139        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140
141        int64_t timeUs;
142        mLatestDequeuedMeta = (*buffer)->meta()->dup();
143        CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144        if (timeUs > seg.mMaxDequeTimeUs) {
145            seg.mMaxDequeTimeUs = timeUs;
146        }
147
148        sp<RefBase> object;
149        if ((*buffer)->meta()->findObject("format", &object)) {
150            setFormat(static_cast<MetaData*>(object.get()));
151        }
152
153        return OK;
154    }
155
156    return mEOSResult;
157}
158
159void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160    // TODO: update corresponding book keeping info.
161    Mutex::Autolock autoLock(mLock);
162    mBuffers.push_front(buffer);
163}
164
165status_t AnotherPacketSource::read(
166        MediaBuffer **out, const ReadOptions *) {
167    *out = NULL;
168
169    Mutex::Autolock autoLock(mLock);
170    while (mEOSResult == OK && mBuffers.empty()) {
171        mCondition.wait(mLock);
172    }
173
174    if (!mBuffers.empty()) {
175
176        const sp<ABuffer> buffer = *mBuffers.begin();
177        mBuffers.erase(mBuffers.begin());
178
179        int32_t discontinuity;
180        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181            if (wasFormatChange(discontinuity)) {
182                mFormat.clear();
183            }
184
185            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186            // CHECK(!mDiscontinuitySegments.empty());
187            return INFO_DISCONTINUITY;
188        }
189
190        mLatestDequeuedMeta = buffer->meta()->dup();
191
192        sp<RefBase> object;
193        if (buffer->meta()->findObject("format", &object)) {
194            setFormat(static_cast<MetaData*>(object.get()));
195        }
196
197        int64_t timeUs;
198        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199        // CHECK(!mDiscontinuitySegments.empty());
200        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201        if (timeUs > seg.mMaxDequeTimeUs) {
202            seg.mMaxDequeTimeUs = timeUs;
203        }
204
205        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206
207        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208
209        int32_t isSync;
210        if (buffer->meta()->findInt32("isSync", &isSync)) {
211            mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212        }
213
214        sp<ABuffer> sei;
215        if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216            mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217        }
218
219        sp<ABuffer> mpegUserData;
220        if (buffer->meta()->findBuffer("mpegUserData", &mpegUserData) && mpegUserData != NULL) {
221            mediaBuffer->meta_data()->setData(
222                    kKeyMpegUserData, 0, mpegUserData->data(), mpegUserData->size());
223        }
224
225        *out = mediaBuffer;
226        return OK;
227    }
228
229    return mEOSResult;
230}
231
232bool AnotherPacketSource::wasFormatChange(
233        int32_t discontinuityType) const {
234    if (mIsAudio) {
235        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
236    }
237
238    if (mIsVideo) {
239        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
240    }
241
242    return false;
243}
244
245void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
246    int32_t damaged;
247    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
248        // LOG(VERBOSE) << "discarding damaged AU";
249        return;
250    }
251
252    Mutex::Autolock autoLock(mLock);
253    mBuffers.push_back(buffer);
254    mCondition.signal();
255
256    int32_t discontinuity;
257    if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
258        ALOGV("queueing a discontinuity with queueAccessUnit");
259
260        mLastQueuedTimeUs = 0ll;
261        mEOSResult = OK;
262        mLatestEnqueuedMeta = NULL;
263
264        mDiscontinuitySegments.push_back(DiscontinuitySegment());
265        return;
266    }
267
268    int64_t lastQueuedTimeUs;
269    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
270    mLastQueuedTimeUs = lastQueuedTimeUs;
271    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
272            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
273
274    // CHECK(!mDiscontinuitySegments.empty());
275    DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
276    if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
277        tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
278    }
279    if (tailSeg.mMaxDequeTimeUs == -1) {
280        tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
281    }
282
283    if (mLatestEnqueuedMeta == NULL) {
284        mLatestEnqueuedMeta = buffer->meta()->dup();
285    } else {
286        int64_t latestTimeUs = 0;
287        int64_t frameDeltaUs = 0;
288        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
289        if (lastQueuedTimeUs > latestTimeUs) {
290            mLatestEnqueuedMeta = buffer->meta()->dup();
291            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
292            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
293        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
294            // For B frames
295            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
296            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
297        }
298    }
299}
300
301void AnotherPacketSource::clear() {
302    Mutex::Autolock autoLock(mLock);
303
304    mBuffers.clear();
305    mEOSResult = OK;
306
307    mDiscontinuitySegments.clear();
308    mDiscontinuitySegments.push_back(DiscontinuitySegment());
309
310    mFormat = NULL;
311    mLatestEnqueuedMeta = NULL;
312}
313
314void AnotherPacketSource::queueDiscontinuity(
315        ATSParser::DiscontinuityType type,
316        const sp<AMessage> &extra,
317        bool discard) {
318    Mutex::Autolock autoLock(mLock);
319
320    if (discard) {
321        // Leave only discontinuities in the queue.
322        List<sp<ABuffer> >::iterator it = mBuffers.begin();
323        while (it != mBuffers.end()) {
324            sp<ABuffer> oldBuffer = *it;
325
326            int32_t oldDiscontinuityType;
327            if (!oldBuffer->meta()->findInt32(
328                        "discontinuity", &oldDiscontinuityType)) {
329                it = mBuffers.erase(it);
330                continue;
331            }
332
333            ++it;
334        }
335
336        for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
337                it2 != mDiscontinuitySegments.end();
338                ++it2) {
339            DiscontinuitySegment &seg = *it2;
340            seg.clear();
341        }
342
343    }
344
345    mEOSResult = OK;
346    mLastQueuedTimeUs = 0;
347    mLatestEnqueuedMeta = NULL;
348
349    if (type == ATSParser::DISCONTINUITY_NONE) {
350        return;
351    }
352
353    mDiscontinuitySegments.push_back(DiscontinuitySegment());
354
355    sp<ABuffer> buffer = new ABuffer(0);
356    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
357    buffer->meta()->setMessage("extra", extra);
358
359    mBuffers.push_back(buffer);
360    mCondition.signal();
361}
362
363void AnotherPacketSource::signalEOS(status_t result) {
364    CHECK(result != OK);
365
366    Mutex::Autolock autoLock(mLock);
367    mEOSResult = result;
368    mCondition.signal();
369}
370
371bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
372    Mutex::Autolock autoLock(mLock);
373    *finalResult = OK;
374    if (!mEnabled) {
375        return false;
376    }
377    if (!mBuffers.empty()) {
378        return true;
379    }
380
381    *finalResult = mEOSResult;
382    return false;
383}
384
385bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
386    Mutex::Autolock autoLock(mLock);
387    *finalResult = OK;
388    if (!mEnabled) {
389        return false;
390    }
391    List<sp<ABuffer> >::iterator it;
392    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
393        int32_t discontinuity;
394        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
395            return true;
396        }
397    }
398
399    *finalResult = mEOSResult;
400    return false;
401}
402
403size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
404    Mutex::Autolock autoLock(mLock);
405
406    *finalResult = OK;
407    if (!mEnabled) {
408        return 0;
409    }
410    if (!mBuffers.empty()) {
411        return mBuffers.size();
412    }
413    *finalResult = mEOSResult;
414    return 0;
415}
416
417int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
418    Mutex::Autolock autoLock(mLock);
419    *finalResult = mEOSResult;
420
421    int64_t durationUs = 0;
422    for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
423            it != mDiscontinuitySegments.end();
424            ++it) {
425        const DiscontinuitySegment &seg = *it;
426        // dequeued access units should be a subset of enqueued access units
427        // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
428        durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
429    }
430
431    return durationUs;
432}
433
434status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
435    *timeUs = 0;
436
437    Mutex::Autolock autoLock(mLock);
438
439    if (mBuffers.empty()) {
440        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
441    }
442
443    sp<ABuffer> buffer = *mBuffers.begin();
444    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
445
446    return OK;
447}
448
449bool AnotherPacketSource::isFinished(int64_t duration) const {
450    if (duration > 0) {
451        int64_t diff = duration - mLastQueuedTimeUs;
452        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
453            ALOGV("Detecting EOS due to near end");
454            return true;
455        }
456    }
457    return (mEOSResult != OK);
458}
459
460sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
461    Mutex::Autolock autoLock(mLock);
462    return mLatestEnqueuedMeta;
463}
464
465sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
466    Mutex::Autolock autoLock(mLock);
467    return mLatestDequeuedMeta;
468}
469
470void AnotherPacketSource::enable(bool enable) {
471    Mutex::Autolock autoLock(mLock);
472    mEnabled = enable;
473}
474
475/*
476 * returns the sample meta that's delayUs after queue head
477 * (NULL if such sample is unavailable)
478 */
479sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
480    Mutex::Autolock autoLock(mLock);
481    int64_t firstUs = -1;
482    int64_t lastUs = -1;
483    int64_t durationUs = 0;
484
485    List<sp<ABuffer> >::iterator it;
486    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
487        const sp<ABuffer> &buffer = *it;
488        int32_t discontinuity;
489        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
490            durationUs += lastUs - firstUs;
491            firstUs = -1;
492            lastUs = -1;
493            continue;
494        }
495        int64_t timeUs;
496        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
497            if (firstUs < 0) {
498                firstUs = timeUs;
499            }
500            if (lastUs < 0 || timeUs > lastUs) {
501                lastUs = timeUs;
502            }
503            if (durationUs + (lastUs - firstUs) >= delayUs) {
504                return buffer->meta();
505            }
506        }
507    }
508    return NULL;
509}
510
511/*
512 * removes samples with time equal or after meta
513 */
514void AnotherPacketSource::trimBuffersAfterMeta(
515        const sp<AMessage> &meta) {
516    if (meta == NULL) {
517        ALOGW("trimming with NULL meta, ignoring");
518        return;
519    }
520
521    Mutex::Autolock autoLock(mLock);
522    if (mBuffers.empty()) {
523        return;
524    }
525
526    HLSTime stopTime(meta);
527    ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
528            stopTime.mSeq, (long long)stopTime.mTimeUs);
529
530    List<sp<ABuffer> >::iterator it;
531    List<DiscontinuitySegment >::iterator it2;
532    sp<AMessage> newLatestEnqueuedMeta = NULL;
533    int64_t newLastQueuedTimeUs = 0;
534    for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
535        const sp<ABuffer> &buffer = *it;
536        int32_t discontinuity;
537        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
538            // CHECK(it2 != mDiscontinuitySegments.end());
539            ++it2;
540            continue;
541        }
542
543        HLSTime curTime(buffer->meta());
544        if (!(curTime < stopTime)) {
545            ALOGV("trimming from %lld (inclusive) to end",
546                    (long long)curTime.mTimeUs);
547            break;
548        }
549        newLatestEnqueuedMeta = buffer->meta();
550        newLastQueuedTimeUs = curTime.mTimeUs;
551    }
552
553    mBuffers.erase(it, mBuffers.end());
554    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
555    mLastQueuedTimeUs = newLastQueuedTimeUs;
556
557    DiscontinuitySegment &seg = *it2;
558    if (newLatestEnqueuedMeta != NULL) {
559        seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
560    } else {
561        seg.clear();
562    }
563    mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
564}
565
566/*
567 * removes samples with time equal or before meta;
568 * returns first sample left in the queue.
569 *
570 * (for AVC, if trim happens, the samples left will always start
571 * at next IDR.)
572 */
573sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
574        const sp<AMessage> &meta) {
575    HLSTime startTime(meta);
576    ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
577            startTime.mSeq, (long long)startTime.mTimeUs);
578
579    sp<AMessage> firstMeta;
580    int64_t firstTimeUs = -1;
581    Mutex::Autolock autoLock(mLock);
582    if (mBuffers.empty()) {
583        return NULL;
584    }
585
586    sp<MetaData> format;
587    bool isAvc = false;
588
589    List<sp<ABuffer> >::iterator it;
590    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
591        const sp<ABuffer> &buffer = *it;
592        int32_t discontinuity;
593        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
594            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
595            // CHECK(!mDiscontinuitySegments.empty());
596            format = NULL;
597            isAvc = false;
598            continue;
599        }
600        if (format == NULL) {
601            sp<RefBase> object;
602            if (buffer->meta()->findObject("format", &object)) {
603                const char* mime;
604                format = static_cast<MetaData*>(object.get());
605                isAvc = format != NULL
606                        && format->findCString(kKeyMIMEType, &mime)
607                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
608            }
609        }
610        if (isAvc && !IsIDR(buffer)) {
611            continue;
612        }
613
614        HLSTime curTime(buffer->meta());
615        if (startTime < curTime) {
616            ALOGV("trimming from beginning to %lld (not inclusive)",
617                    (long long)curTime.mTimeUs);
618            firstMeta = buffer->meta();
619            firstTimeUs = curTime.mTimeUs;
620            break;
621        }
622    }
623    mBuffers.erase(mBuffers.begin(), it);
624    mLatestDequeuedMeta = NULL;
625
626    // CHECK(!mDiscontinuitySegments.empty());
627    DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
628    if (firstTimeUs >= 0) {
629        seg.mMaxDequeTimeUs = firstTimeUs;
630    } else {
631        seg.clear();
632    }
633
634    return firstMeta;
635}
636
637}  // namespace android
638