AnotherPacketSource.cpp revision 37746afe186ce04f0f6252900b79726040d89a0d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33#include <utils/Vector.h>
34
35#include <inttypes.h>
36
37namespace android {
38
39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42    : mIsAudio(false),
43      mIsVideo(false),
44      mEnabled(true),
45      mFormat(NULL),
46      mLastQueuedTimeUs(0),
47      mEOSResult(OK),
48      mLatestEnqueuedMeta(NULL),
49      mLatestDequeuedMeta(NULL),
50      mQueuedDiscontinuityCount(0) {
51    setFormat(meta);
52}
53
54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
55    if (mFormat != NULL) {
56        // Only allowed to be set once. Requires explicit clear to reset.
57        return;
58    }
59
60    mIsAudio = false;
61    mIsVideo = false;
62
63    if (meta == NULL) {
64        return;
65    }
66
67    mFormat = meta;
68    const char *mime;
69    CHECK(meta->findCString(kKeyMIMEType, &mime));
70
71    if (!strncasecmp("audio/", mime, 6)) {
72        mIsAudio = true;
73    } else  if (!strncasecmp("video/", mime, 6)) {
74        mIsVideo = true;
75    } else {
76        CHECK(!strncasecmp("text/", mime, 5));
77    }
78}
79
80AnotherPacketSource::~AnotherPacketSource() {
81}
82
83status_t AnotherPacketSource::start(MetaData * /* params */) {
84    return OK;
85}
86
87status_t AnotherPacketSource::stop() {
88    return OK;
89}
90
91sp<MetaData> AnotherPacketSource::getFormat() {
92    Mutex::Autolock autoLock(mLock);
93    if (mFormat != NULL) {
94        return mFormat;
95    }
96
97    List<sp<ABuffer> >::iterator it = mBuffers.begin();
98    while (it != mBuffers.end()) {
99        sp<ABuffer> buffer = *it;
100        int32_t discontinuity;
101        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
102            sp<RefBase> object;
103            if (buffer->meta()->findObject("format", &object)) {
104                setFormat(static_cast<MetaData*>(object.get()));
105                return mFormat;
106            }
107        }
108
109        ++it;
110    }
111    return NULL;
112}
113
114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
115    buffer->clear();
116
117    Mutex::Autolock autoLock(mLock);
118    while (mEOSResult == OK && mBuffers.empty()) {
119        mCondition.wait(mLock);
120    }
121
122    if (!mBuffers.empty()) {
123        *buffer = *mBuffers.begin();
124        mBuffers.erase(mBuffers.begin());
125
126        int32_t discontinuity;
127        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
128            if (wasFormatChange(discontinuity)) {
129                mFormat.clear();
130            }
131
132            --mQueuedDiscontinuityCount;
133            return INFO_DISCONTINUITY;
134        }
135
136        mLatestDequeuedMeta = (*buffer)->meta()->dup();
137
138        sp<RefBase> object;
139        if ((*buffer)->meta()->findObject("format", &object)) {
140            setFormat(static_cast<MetaData*>(object.get()));
141        }
142
143        return OK;
144    }
145
146    return mEOSResult;
147}
148
149status_t AnotherPacketSource::read(
150        MediaBuffer **out, const ReadOptions *) {
151    *out = NULL;
152
153    Mutex::Autolock autoLock(mLock);
154    while (mEOSResult == OK && mBuffers.empty()) {
155        mCondition.wait(mLock);
156    }
157
158    if (!mBuffers.empty()) {
159
160        const sp<ABuffer> buffer = *mBuffers.begin();
161        mBuffers.erase(mBuffers.begin());
162
163        int32_t discontinuity;
164        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
165            if (wasFormatChange(discontinuity)) {
166                mFormat.clear();
167            }
168
169            return INFO_DISCONTINUITY;
170        }
171
172        mLatestDequeuedMeta = buffer->meta()->dup();
173
174        sp<RefBase> object;
175        if (buffer->meta()->findObject("format", &object)) {
176            setFormat(static_cast<MetaData*>(object.get()));
177        }
178
179        int64_t timeUs;
180        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
181
182        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
183
184        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
185
186        int32_t isSync;
187        if (buffer->meta()->findInt32("isSync", &isSync)) {
188            mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
189        }
190
191        *out = mediaBuffer;
192        return OK;
193    }
194
195    return mEOSResult;
196}
197
198bool AnotherPacketSource::wasFormatChange(
199        int32_t discontinuityType) const {
200    if (mIsAudio) {
201        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
202    }
203
204    if (mIsVideo) {
205        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
206    }
207
208    return false;
209}
210
211void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
212    int32_t damaged;
213    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
214        // LOG(VERBOSE) << "discarding damaged AU";
215        return;
216    }
217
218    Mutex::Autolock autoLock(mLock);
219    mBuffers.push_back(buffer);
220    mCondition.signal();
221
222    int32_t discontinuity;
223    if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
224        // discontinuity handling needs to be consistent with queueDiscontinuity()
225        ++mQueuedDiscontinuityCount;
226        mLastQueuedTimeUs = 0ll;
227        mEOSResult = OK;
228        mLatestEnqueuedMeta = NULL;
229        return;
230    }
231
232    int64_t lastQueuedTimeUs;
233    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
234    mLastQueuedTimeUs = lastQueuedTimeUs;
235    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
236            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
237
238    if (mLatestEnqueuedMeta == NULL) {
239        mLatestEnqueuedMeta = buffer->meta()->dup();
240    } else {
241        int64_t latestTimeUs = 0;
242        int64_t frameDeltaUs = 0;
243        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
244        if (lastQueuedTimeUs > latestTimeUs) {
245            mLatestEnqueuedMeta = buffer->meta()->dup();
246            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
247            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
248        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
249            // For B frames
250            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
251            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
252        }
253    }
254}
255
256void AnotherPacketSource::clear() {
257    Mutex::Autolock autoLock(mLock);
258
259    mBuffers.clear();
260    mEOSResult = OK;
261    mQueuedDiscontinuityCount = 0;
262
263    mFormat = NULL;
264    mLatestEnqueuedMeta = NULL;
265}
266
267void AnotherPacketSource::queueDiscontinuity(
268        ATSParser::DiscontinuityType type,
269        const sp<AMessage> &extra,
270        bool discard) {
271    Mutex::Autolock autoLock(mLock);
272
273    if (discard) {
274        // Leave only discontinuities in the queue.
275        List<sp<ABuffer> >::iterator it = mBuffers.begin();
276        while (it != mBuffers.end()) {
277            sp<ABuffer> oldBuffer = *it;
278
279            int32_t oldDiscontinuityType;
280            if (!oldBuffer->meta()->findInt32(
281                        "discontinuity", &oldDiscontinuityType)) {
282                it = mBuffers.erase(it);
283                continue;
284            }
285
286            ++it;
287        }
288    }
289
290    mEOSResult = OK;
291    mLastQueuedTimeUs = 0;
292    mLatestEnqueuedMeta = NULL;
293
294    if (type == ATSParser::DISCONTINUITY_NONE) {
295        return;
296    }
297
298    ++mQueuedDiscontinuityCount;
299    sp<ABuffer> buffer = new ABuffer(0);
300    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
301    buffer->meta()->setMessage("extra", extra);
302
303    mBuffers.push_back(buffer);
304    mCondition.signal();
305}
306
307void AnotherPacketSource::signalEOS(status_t result) {
308    CHECK(result != OK);
309
310    Mutex::Autolock autoLock(mLock);
311    mEOSResult = result;
312    mCondition.signal();
313}
314
315bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
316    Mutex::Autolock autoLock(mLock);
317    *finalResult = OK;
318    if (!mEnabled) {
319        return false;
320    }
321    if (!mBuffers.empty()) {
322        return true;
323    }
324
325    *finalResult = mEOSResult;
326    return false;
327}
328
329bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
330    Mutex::Autolock autoLock(mLock);
331    *finalResult = OK;
332    if (!mEnabled) {
333        return false;
334    }
335    List<sp<ABuffer> >::iterator it;
336    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
337        int32_t discontinuity;
338        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
339            return true;
340        }
341    }
342
343    *finalResult = mEOSResult;
344    return false;
345}
346
347int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
348    Mutex::Autolock autoLock(mLock);
349    return getBufferedDurationUs_l(finalResult);
350}
351
352int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
353    *finalResult = mEOSResult;
354
355    if (mBuffers.empty()) {
356        return 0;
357    }
358
359    int64_t time1 = -1;
360    int64_t time2 = -1;
361    int64_t durationUs = 0;
362
363    List<sp<ABuffer> >::iterator it = mBuffers.begin();
364    while (it != mBuffers.end()) {
365        const sp<ABuffer> &buffer = *it;
366
367        int64_t timeUs;
368        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
369            if (time1 < 0 || timeUs < time1) {
370                time1 = timeUs;
371            }
372
373            if (time2 < 0 || timeUs > time2) {
374                time2 = timeUs;
375            }
376        } else {
377            // This is a discontinuity, reset everything.
378            durationUs += time2 - time1;
379            time1 = time2 = -1;
380        }
381
382        ++it;
383    }
384
385    return durationUs + (time2 - time1);
386}
387
388// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
389// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
390int64_t AnotherPacketSource::getEstimatedDurationUs() {
391    Mutex::Autolock autoLock(mLock);
392    if (mBuffers.empty()) {
393        return 0;
394    }
395
396    if (mQueuedDiscontinuityCount > 0) {
397        status_t finalResult;
398        return getBufferedDurationUs_l(&finalResult);
399    }
400
401    List<sp<ABuffer> >::iterator it = mBuffers.begin();
402    sp<ABuffer> buffer = *it;
403
404    int64_t startTimeUs;
405    buffer->meta()->findInt64("timeUs", &startTimeUs);
406    if (startTimeUs < 0) {
407        return 0;
408    }
409
410    it = mBuffers.end();
411    --it;
412    buffer = *it;
413
414    int64_t endTimeUs;
415    buffer->meta()->findInt64("timeUs", &endTimeUs);
416    if (endTimeUs < 0) {
417        return 0;
418    }
419
420    int64_t diffUs;
421    if (endTimeUs > startTimeUs) {
422        diffUs = endTimeUs - startTimeUs;
423    } else {
424        diffUs = startTimeUs - endTimeUs;
425    }
426    return diffUs;
427}
428
429status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
430    *timeUs = 0;
431
432    Mutex::Autolock autoLock(mLock);
433
434    if (mBuffers.empty()) {
435        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
436    }
437
438    sp<ABuffer> buffer = *mBuffers.begin();
439    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
440
441    return OK;
442}
443
444bool AnotherPacketSource::isFinished(int64_t duration) const {
445    if (duration > 0) {
446        int64_t diff = duration - mLastQueuedTimeUs;
447        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
448            ALOGV("Detecting EOS due to near end");
449            return true;
450        }
451    }
452    return (mEOSResult != OK);
453}
454
455sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
456    Mutex::Autolock autoLock(mLock);
457    return mLatestEnqueuedMeta;
458}
459
460sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
461    Mutex::Autolock autoLock(mLock);
462    return mLatestDequeuedMeta;
463}
464
465void AnotherPacketSource::enable(bool enable) {
466    Mutex::Autolock autoLock(mLock);
467    mEnabled = enable;
468}
469
470/*
471 * returns the sample meta that's delayUs after queue head
472 * (NULL if such sample is unavailable)
473 */
474sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
475    Mutex::Autolock autoLock(mLock);
476    int64_t firstUs = -1;
477    int64_t lastUs = -1;
478    int64_t durationUs = 0;
479
480    List<sp<ABuffer> >::iterator it;
481    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
482        const sp<ABuffer> &buffer = *it;
483        int32_t discontinuity;
484        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
485            durationUs += lastUs - firstUs;
486            firstUs = -1;
487            lastUs = -1;
488            continue;
489        }
490        int64_t timeUs;
491        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
492            if (firstUs < 0) {
493                firstUs = timeUs;
494            }
495            if (lastUs < 0 || timeUs > lastUs) {
496                lastUs = timeUs;
497            }
498            if (durationUs + (lastUs - firstUs) >= delayUs) {
499                return buffer->meta();
500            }
501        }
502    }
503    return NULL;
504}
505
506/*
507 * removes samples with time equal or after meta
508 */
509void AnotherPacketSource::trimBuffersAfterMeta(
510        const sp<AMessage> &meta) {
511    if (meta == NULL) {
512        ALOGW("trimming with NULL meta, ignoring");
513        return;
514    }
515
516    Mutex::Autolock autoLock(mLock);
517    if (mBuffers.empty()) {
518        return;
519    }
520
521    HLSTime stopTime(meta);
522    ALOGV("trimBuffersAfterMeta: discontinuitySeq %zu, timeUs %lld",
523            stopTime.mSeq, (long long)stopTime.mTimeUs);
524
525    List<sp<ABuffer> >::iterator it;
526    sp<AMessage> newLatestEnqueuedMeta = NULL;
527    int64_t newLastQueuedTimeUs = 0;
528    size_t newDiscontinuityCount = 0;
529    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
530        const sp<ABuffer> &buffer = *it;
531        int32_t discontinuity;
532        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
533            newDiscontinuityCount++;
534            continue;
535        }
536
537        HLSTime curTime(buffer->meta());
538        if (!(curTime < stopTime)) {
539            ALOGV("trimming from %lld (inclusive) to end",
540                    (long long)curTime.mTimeUs);
541            break;
542        }
543        newLatestEnqueuedMeta = buffer->meta();
544        newLastQueuedTimeUs = curTime.mTimeUs;
545    }
546    mBuffers.erase(it, mBuffers.end());
547    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
548    mLastQueuedTimeUs = newLastQueuedTimeUs;
549    mQueuedDiscontinuityCount = newDiscontinuityCount;
550}
551
552/*
553 * removes samples with time equal or before meta;
554 * returns first sample left in the queue.
555 *
556 * (for AVC, if trim happens, the samples left will always start
557 * at next IDR.)
558 */
559sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
560        const sp<AMessage> &meta) {
561    HLSTime startTime(meta);
562    ALOGV("trimBuffersBeforeMeta: discontinuitySeq %zu, timeUs %lld",
563            startTime.mSeq, (long long)startTime.mTimeUs);
564
565    sp<AMessage> firstMeta;
566    Mutex::Autolock autoLock(mLock);
567    if (mBuffers.empty()) {
568        return NULL;
569    }
570
571    sp<MetaData> format;
572    bool isAvc = false;
573
574    List<sp<ABuffer> >::iterator it;
575    size_t discontinuityCount = 0;
576    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
577        const sp<ABuffer> &buffer = *it;
578        int32_t discontinuity;
579        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
580            format = NULL;
581            isAvc = false;
582            discontinuityCount++;
583            continue;
584        }
585        if (format == NULL) {
586            sp<RefBase> object;
587            if (buffer->meta()->findObject("format", &object)) {
588                const char* mime;
589                format = static_cast<MetaData*>(object.get());
590                isAvc = format != NULL
591                        && format->findCString(kKeyMIMEType, &mime)
592                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
593            }
594        }
595        if (isAvc && !IsIDR(buffer)) {
596            continue;
597        }
598
599        HLSTime curTime(buffer->meta());
600        if (startTime < curTime) {
601            ALOGV("trimming from beginning to %lld (not inclusive)",
602                    (long long)curTime.mTimeUs);
603            firstMeta = buffer->meta();
604            break;
605        }
606    }
607    mBuffers.erase(mBuffers.begin(), it);
608    mQueuedDiscontinuityCount -= discontinuityCount;
609    mLatestDequeuedMeta = NULL;
610    return firstMeta;
611}
612
613}  // namespace android
614