AnotherPacketSource.cpp revision 7c8708046117e03c0d38006bdd9685139df3ac6b
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <utils/Vector.h>
33
34#include <inttypes.h>
35
36namespace android {
37
38const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
39
40AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
41    : mIsAudio(false),
42      mIsVideo(false),
43      mEnabled(true),
44      mFormat(NULL),
45      mLastQueuedTimeUs(0),
46      mEOSResult(OK),
47      mLatestEnqueuedMeta(NULL),
48      mLatestDequeuedMeta(NULL),
49      mQueuedDiscontinuityCount(0) {
50    setFormat(meta);
51}
52
53void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
54    if (mFormat != NULL) {
55        // Only allowed to be set once. Requires explicit clear to reset.
56        return;
57    }
58
59    mIsAudio = false;
60    mIsVideo = false;
61
62    if (meta == NULL) {
63        return;
64    }
65
66    mFormat = meta;
67    const char *mime;
68    CHECK(meta->findCString(kKeyMIMEType, &mime));
69
70    if (!strncasecmp("audio/", mime, 6)) {
71        mIsAudio = true;
72    } else  if (!strncasecmp("video/", mime, 6)) {
73        mIsVideo = true;
74    } else {
75        CHECK(!strncasecmp("text/", mime, 5));
76    }
77}
78
79AnotherPacketSource::~AnotherPacketSource() {
80}
81
82status_t AnotherPacketSource::start(MetaData * /* params */) {
83    return OK;
84}
85
86status_t AnotherPacketSource::stop() {
87    return OK;
88}
89
90sp<MetaData> AnotherPacketSource::getFormat() {
91    Mutex::Autolock autoLock(mLock);
92    if (mFormat != NULL) {
93        return mFormat;
94    }
95
96    List<sp<ABuffer> >::iterator it = mBuffers.begin();
97    while (it != mBuffers.end()) {
98        sp<ABuffer> buffer = *it;
99        int32_t discontinuity;
100        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
101            sp<RefBase> object;
102            if (buffer->meta()->findObject("format", &object)) {
103                setFormat(static_cast<MetaData*>(object.get()));
104                return mFormat;
105            }
106        }
107
108        ++it;
109    }
110    return NULL;
111}
112
113status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
114    buffer->clear();
115
116    Mutex::Autolock autoLock(mLock);
117    while (mEOSResult == OK && mBuffers.empty()) {
118        mCondition.wait(mLock);
119    }
120
121    if (!mBuffers.empty()) {
122        *buffer = *mBuffers.begin();
123        mBuffers.erase(mBuffers.begin());
124
125        int32_t discontinuity;
126        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
127            if (wasFormatChange(discontinuity)) {
128                mFormat.clear();
129            }
130
131            --mQueuedDiscontinuityCount;
132            return INFO_DISCONTINUITY;
133        }
134
135        mLatestDequeuedMeta = (*buffer)->meta()->dup();
136
137        sp<RefBase> object;
138        if ((*buffer)->meta()->findObject("format", &object)) {
139            setFormat(static_cast<MetaData*>(object.get()));
140        }
141
142        return OK;
143    }
144
145    return mEOSResult;
146}
147
148status_t AnotherPacketSource::read(
149        MediaBuffer **out, const ReadOptions *) {
150    *out = NULL;
151
152    Mutex::Autolock autoLock(mLock);
153    while (mEOSResult == OK && mBuffers.empty()) {
154        mCondition.wait(mLock);
155    }
156
157    if (!mBuffers.empty()) {
158
159        const sp<ABuffer> buffer = *mBuffers.begin();
160        mBuffers.erase(mBuffers.begin());
161
162        int32_t discontinuity;
163        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
164            if (wasFormatChange(discontinuity)) {
165                mFormat.clear();
166            }
167
168            return INFO_DISCONTINUITY;
169        }
170
171        mLatestDequeuedMeta = buffer->meta()->dup();
172
173        sp<RefBase> object;
174        if (buffer->meta()->findObject("format", &object)) {
175            setFormat(static_cast<MetaData*>(object.get()));
176        }
177
178        int64_t timeUs;
179        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
180
181        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
182
183        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
184
185        *out = mediaBuffer;
186        return OK;
187    }
188
189    return mEOSResult;
190}
191
192bool AnotherPacketSource::wasFormatChange(
193        int32_t discontinuityType) const {
194    if (mIsAudio) {
195        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
196    }
197
198    if (mIsVideo) {
199        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
200    }
201
202    return false;
203}
204
205void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
206    int32_t damaged;
207    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
208        // LOG(VERBOSE) << "discarding damaged AU";
209        return;
210    }
211
212    Mutex::Autolock autoLock(mLock);
213    mBuffers.push_back(buffer);
214    mCondition.signal();
215
216    int32_t discontinuity;
217    if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
218        // discontinuity handling needs to be consistent with queueDiscontinuity()
219        ++mQueuedDiscontinuityCount;
220        mLastQueuedTimeUs = 0ll;
221        mEOSResult = OK;
222        mLatestEnqueuedMeta = NULL;
223        return;
224    }
225
226    int64_t lastQueuedTimeUs;
227    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
228    mLastQueuedTimeUs = lastQueuedTimeUs;
229    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
230            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
231
232    if (mLatestEnqueuedMeta == NULL) {
233        mLatestEnqueuedMeta = buffer->meta()->dup();
234    } else {
235        int64_t latestTimeUs = 0;
236        int64_t frameDeltaUs = 0;
237        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
238        if (lastQueuedTimeUs > latestTimeUs) {
239            mLatestEnqueuedMeta = buffer->meta()->dup();
240            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
241            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
242        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
243            // For B frames
244            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
245            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
246        }
247    }
248}
249
250void AnotherPacketSource::clear() {
251    Mutex::Autolock autoLock(mLock);
252
253    mBuffers.clear();
254    mEOSResult = OK;
255    mQueuedDiscontinuityCount = 0;
256
257    mFormat = NULL;
258    mLatestEnqueuedMeta = NULL;
259}
260
261void AnotherPacketSource::queueDiscontinuity(
262        ATSParser::DiscontinuityType type,
263        const sp<AMessage> &extra,
264        bool discard) {
265    Mutex::Autolock autoLock(mLock);
266
267    if (discard) {
268        // Leave only discontinuities in the queue.
269        List<sp<ABuffer> >::iterator it = mBuffers.begin();
270        while (it != mBuffers.end()) {
271            sp<ABuffer> oldBuffer = *it;
272
273            int32_t oldDiscontinuityType;
274            if (!oldBuffer->meta()->findInt32(
275                        "discontinuity", &oldDiscontinuityType)) {
276                it = mBuffers.erase(it);
277                continue;
278            }
279
280            ++it;
281        }
282    }
283
284    mEOSResult = OK;
285    mLastQueuedTimeUs = 0;
286    mLatestEnqueuedMeta = NULL;
287
288    if (type == ATSParser::DISCONTINUITY_NONE) {
289        return;
290    }
291
292    ++mQueuedDiscontinuityCount;
293    sp<ABuffer> buffer = new ABuffer(0);
294    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
295    buffer->meta()->setMessage("extra", extra);
296
297    mBuffers.push_back(buffer);
298    mCondition.signal();
299}
300
301void AnotherPacketSource::signalEOS(status_t result) {
302    CHECK(result != OK);
303
304    Mutex::Autolock autoLock(mLock);
305    mEOSResult = result;
306    mCondition.signal();
307}
308
309bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
310    Mutex::Autolock autoLock(mLock);
311    *finalResult = OK;
312    if (!mEnabled) {
313        return false;
314    }
315    if (!mBuffers.empty()) {
316        return true;
317    }
318
319    *finalResult = mEOSResult;
320    return false;
321}
322
323bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
324    Mutex::Autolock autoLock(mLock);
325    *finalResult = OK;
326    if (!mEnabled) {
327        return false;
328    }
329    List<sp<ABuffer> >::iterator it;
330    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
331        int32_t discontinuity;
332        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
333            return true;
334        }
335    }
336
337    *finalResult = mEOSResult;
338    return false;
339}
340
341int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
342    Mutex::Autolock autoLock(mLock);
343    return getBufferedDurationUs_l(finalResult);
344}
345
346int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
347    *finalResult = mEOSResult;
348
349    if (mBuffers.empty()) {
350        return 0;
351    }
352
353    int64_t time1 = -1;
354    int64_t time2 = -1;
355    int64_t durationUs = 0;
356
357    List<sp<ABuffer> >::iterator it = mBuffers.begin();
358    while (it != mBuffers.end()) {
359        const sp<ABuffer> &buffer = *it;
360
361        int64_t timeUs;
362        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
363            if (time1 < 0 || timeUs < time1) {
364                time1 = timeUs;
365            }
366
367            if (time2 < 0 || timeUs > time2) {
368                time2 = timeUs;
369            }
370        } else {
371            // This is a discontinuity, reset everything.
372            durationUs += time2 - time1;
373            time1 = time2 = -1;
374        }
375
376        ++it;
377    }
378
379    return durationUs + (time2 - time1);
380}
381
382// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
383// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
384int64_t AnotherPacketSource::getEstimatedDurationUs() {
385    Mutex::Autolock autoLock(mLock);
386    if (mBuffers.empty()) {
387        return 0;
388    }
389
390    if (mQueuedDiscontinuityCount > 0) {
391        status_t finalResult;
392        return getBufferedDurationUs_l(&finalResult);
393    }
394
395    List<sp<ABuffer> >::iterator it = mBuffers.begin();
396    sp<ABuffer> buffer = *it;
397
398    int64_t startTimeUs;
399    buffer->meta()->findInt64("timeUs", &startTimeUs);
400    if (startTimeUs < 0) {
401        return 0;
402    }
403
404    it = mBuffers.end();
405    --it;
406    buffer = *it;
407
408    int64_t endTimeUs;
409    buffer->meta()->findInt64("timeUs", &endTimeUs);
410    if (endTimeUs < 0) {
411        return 0;
412    }
413
414    int64_t diffUs;
415    if (endTimeUs > startTimeUs) {
416        diffUs = endTimeUs - startTimeUs;
417    } else {
418        diffUs = startTimeUs - endTimeUs;
419    }
420    return diffUs;
421}
422
423status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
424    *timeUs = 0;
425
426    Mutex::Autolock autoLock(mLock);
427
428    if (mBuffers.empty()) {
429        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
430    }
431
432    sp<ABuffer> buffer = *mBuffers.begin();
433    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
434
435    return OK;
436}
437
438bool AnotherPacketSource::isFinished(int64_t duration) const {
439    if (duration > 0) {
440        int64_t diff = duration - mLastQueuedTimeUs;
441        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
442            ALOGV("Detecting EOS due to near end");
443            return true;
444        }
445    }
446    return (mEOSResult != OK);
447}
448
449sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
450    Mutex::Autolock autoLock(mLock);
451    return mLatestEnqueuedMeta;
452}
453
454sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
455    Mutex::Autolock autoLock(mLock);
456    return mLatestDequeuedMeta;
457}
458
459void AnotherPacketSource::enable(bool enable) {
460    Mutex::Autolock autoLock(mLock);
461    mEnabled = enable;
462}
463
464sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
465    Mutex::Autolock autoLock(mLock);
466    int64_t firstUs = -1;
467    int64_t lastUs = -1;
468    int64_t durationUs = 0;
469
470    List<sp<ABuffer> >::iterator it;
471    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
472        const sp<ABuffer> &buffer = *it;
473        int32_t discontinuity;
474        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
475            durationUs += lastUs - firstUs;
476            firstUs = -1;
477            lastUs = -1;
478            continue;
479        }
480        int64_t timeUs;
481        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
482            if (firstUs < 0) {
483                firstUs = timeUs;
484            }
485            if (lastUs < 0 || timeUs > lastUs) {
486                lastUs = timeUs;
487            }
488            if (durationUs + (lastUs - firstUs) >= delayUs) {
489                return buffer->meta();
490            }
491        }
492    }
493    return mLatestEnqueuedMeta;
494}
495
496void AnotherPacketSource::trimBuffersAfterTimeUs(
497        size_t discontinuitySeq, int64_t timeUs) {
498    ALOGV("trimBuffersAfterTimeUs: discontinuitySeq %zu, timeUs %lld",
499            discontinuitySeq, (long long)timeUs);
500
501    Mutex::Autolock autoLock(mLock);
502    if (mBuffers.empty()) {
503        return;
504    }
505
506    List<sp<ABuffer> >::iterator it;
507    sp<AMessage> newLatestEnqueuedMeta = NULL;
508    int64_t newLastQueuedTimeUs = 0;
509    size_t newDiscontinuityCount = 0;
510    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
511        const sp<ABuffer> &buffer = *it;
512        int32_t discontinuity;
513        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
514            newDiscontinuityCount++;
515            continue;
516        }
517        size_t curDiscontinuitySeq;
518        int64_t curTimeUs;
519        CHECK(buffer->meta()->findInt32(
520                "discontinuitySeq", (int32_t*)&curDiscontinuitySeq));
521        CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs));
522        if ((curDiscontinuitySeq > discontinuitySeq
523                || (curDiscontinuitySeq == discontinuitySeq
524                        && curTimeUs >= timeUs))) {
525            ALOGI("trimming from %lld (inclusive) to end",
526                    (long long)curTimeUs);
527            break;
528        }
529        newLatestEnqueuedMeta = buffer->meta();
530        newLastQueuedTimeUs = curTimeUs;
531    }
532    mBuffers.erase(it, mBuffers.end());
533    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
534    mLastQueuedTimeUs = newLastQueuedTimeUs;
535    mQueuedDiscontinuityCount = newDiscontinuityCount;
536}
537
538sp<AMessage> AnotherPacketSource::trimBuffersBeforeTimeUs(
539        size_t discontinuitySeq, int64_t timeUs) {
540    ALOGV("trimBuffersBeforeTimeUs: discontinuitySeq %zu, timeUs %lld",
541            discontinuitySeq, (long long)timeUs);
542    sp<AMessage> meta;
543    Mutex::Autolock autoLock(mLock);
544    if (mBuffers.empty()) {
545        return NULL;
546    }
547
548    sp<MetaData> format;
549    bool isAvc = false;
550
551    List<sp<ABuffer> >::iterator it;
552    size_t discontinuityCount = 0;
553    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
554        const sp<ABuffer> &buffer = *it;
555        int32_t discontinuity;
556        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
557            format = NULL;
558            isAvc = false;
559            discontinuityCount++;
560            continue;
561        }
562        if (format == NULL) {
563            sp<RefBase> object;
564            if (buffer->meta()->findObject("format", &object)) {
565                const char* mime;
566                format = static_cast<MetaData*>(object.get());
567                isAvc = format != NULL
568                        && format->findCString(kKeyMIMEType, &mime)
569                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
570            }
571        }
572        if (isAvc && !IsIDR(buffer)) {
573            continue;
574        }
575        size_t curDiscontinuitySeq;
576        int64_t curTimeUs;
577        CHECK(buffer->meta()->findInt32(
578                "discontinuitySeq", (int32_t*)&curDiscontinuitySeq));
579        CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs));
580        if ((curDiscontinuitySeq > discontinuitySeq
581                || (curDiscontinuitySeq == discontinuitySeq
582                        && curTimeUs > timeUs))) {
583            ALOGI("trimming from beginning to %lld (not inclusive)",
584                    (long long)curTimeUs);
585            meta = buffer->meta();
586            break;
587        }
588    }
589    mBuffers.erase(mBuffers.begin(), it);
590    mQueuedDiscontinuityCount -= discontinuityCount;
591    mLatestDequeuedMeta = NULL;
592    return meta;
593}
594
595}  // namespace android
596