AnotherPacketSource.cpp revision d47dfcb5a2e5901c96fc92662cec7aa30f7f8843
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33#include <utils/Vector.h>
34
35#include <inttypes.h>
36
37namespace android {
38
39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42    : mIsAudio(false),
43      mIsVideo(false),
44      mEnabled(true),
45      mFormat(NULL),
46      mLastQueuedTimeUs(0),
47      mEOSResult(OK),
48      mLatestEnqueuedMeta(NULL),
49      mLatestDequeuedMeta(NULL),
50      mQueuedDiscontinuityCount(0) {
51    setFormat(meta);
52}
53
54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
55    if (mFormat != NULL) {
56        // Only allowed to be set once. Requires explicit clear to reset.
57        return;
58    }
59
60    mIsAudio = false;
61    mIsVideo = false;
62
63    if (meta == NULL) {
64        return;
65    }
66
67    mFormat = meta;
68    const char *mime;
69    CHECK(meta->findCString(kKeyMIMEType, &mime));
70
71    if (!strncasecmp("audio/", mime, 6)) {
72        mIsAudio = true;
73    } else  if (!strncasecmp("video/", mime, 6)) {
74        mIsVideo = true;
75    } else {
76        CHECK(!strncasecmp("text/", mime, 5));
77    }
78}
79
80AnotherPacketSource::~AnotherPacketSource() {
81}
82
83status_t AnotherPacketSource::start(MetaData * /* params */) {
84    return OK;
85}
86
87status_t AnotherPacketSource::stop() {
88    return OK;
89}
90
91sp<MetaData> AnotherPacketSource::getFormat() {
92    Mutex::Autolock autoLock(mLock);
93    if (mFormat != NULL) {
94        return mFormat;
95    }
96
97    List<sp<ABuffer> >::iterator it = mBuffers.begin();
98    while (it != mBuffers.end()) {
99        sp<ABuffer> buffer = *it;
100        int32_t discontinuity;
101        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
102            sp<RefBase> object;
103            if (buffer->meta()->findObject("format", &object)) {
104                setFormat(static_cast<MetaData*>(object.get()));
105                return mFormat;
106            }
107        }
108
109        ++it;
110    }
111    return NULL;
112}
113
114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
115    buffer->clear();
116
117    Mutex::Autolock autoLock(mLock);
118    while (mEOSResult == OK && mBuffers.empty()) {
119        mCondition.wait(mLock);
120    }
121
122    if (!mBuffers.empty()) {
123        *buffer = *mBuffers.begin();
124        mBuffers.erase(mBuffers.begin());
125
126        int32_t discontinuity;
127        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
128            if (wasFormatChange(discontinuity)) {
129                mFormat.clear();
130            }
131
132            --mQueuedDiscontinuityCount;
133            return INFO_DISCONTINUITY;
134        }
135
136        mLatestDequeuedMeta = (*buffer)->meta()->dup();
137
138        sp<RefBase> object;
139        if ((*buffer)->meta()->findObject("format", &object)) {
140            setFormat(static_cast<MetaData*>(object.get()));
141        }
142
143        return OK;
144    }
145
146    return mEOSResult;
147}
148
149status_t AnotherPacketSource::read(
150        MediaBuffer **out, const ReadOptions *) {
151    *out = NULL;
152
153    Mutex::Autolock autoLock(mLock);
154    while (mEOSResult == OK && mBuffers.empty()) {
155        mCondition.wait(mLock);
156    }
157
158    if (!mBuffers.empty()) {
159
160        const sp<ABuffer> buffer = *mBuffers.begin();
161        mBuffers.erase(mBuffers.begin());
162
163        int32_t discontinuity;
164        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
165            if (wasFormatChange(discontinuity)) {
166                mFormat.clear();
167            }
168
169            return INFO_DISCONTINUITY;
170        }
171
172        mLatestDequeuedMeta = buffer->meta()->dup();
173
174        sp<RefBase> object;
175        if (buffer->meta()->findObject("format", &object)) {
176            setFormat(static_cast<MetaData*>(object.get()));
177        }
178
179        int64_t timeUs;
180        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
181
182        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
183
184        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
185
186        *out = mediaBuffer;
187        return OK;
188    }
189
190    return mEOSResult;
191}
192
193bool AnotherPacketSource::wasFormatChange(
194        int32_t discontinuityType) const {
195    if (mIsAudio) {
196        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
197    }
198
199    if (mIsVideo) {
200        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
201    }
202
203    return false;
204}
205
206void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
207    int32_t damaged;
208    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
209        // LOG(VERBOSE) << "discarding damaged AU";
210        return;
211    }
212
213    Mutex::Autolock autoLock(mLock);
214    mBuffers.push_back(buffer);
215    mCondition.signal();
216
217    int32_t discontinuity;
218    if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
219        // discontinuity handling needs to be consistent with queueDiscontinuity()
220        ++mQueuedDiscontinuityCount;
221        mLastQueuedTimeUs = 0ll;
222        mEOSResult = OK;
223        mLatestEnqueuedMeta = NULL;
224        return;
225    }
226
227    int64_t lastQueuedTimeUs;
228    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
229    mLastQueuedTimeUs = lastQueuedTimeUs;
230    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
231            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
232
233    if (mLatestEnqueuedMeta == NULL) {
234        mLatestEnqueuedMeta = buffer->meta()->dup();
235    } else {
236        int64_t latestTimeUs = 0;
237        int64_t frameDeltaUs = 0;
238        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
239        if (lastQueuedTimeUs > latestTimeUs) {
240            mLatestEnqueuedMeta = buffer->meta()->dup();
241            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
242            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
243        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
244            // For B frames
245            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
246            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
247        }
248    }
249}
250
251void AnotherPacketSource::clear() {
252    Mutex::Autolock autoLock(mLock);
253
254    mBuffers.clear();
255    mEOSResult = OK;
256    mQueuedDiscontinuityCount = 0;
257
258    mFormat = NULL;
259    mLatestEnqueuedMeta = NULL;
260}
261
262void AnotherPacketSource::queueDiscontinuity(
263        ATSParser::DiscontinuityType type,
264        const sp<AMessage> &extra,
265        bool discard) {
266    Mutex::Autolock autoLock(mLock);
267
268    if (discard) {
269        // Leave only discontinuities in the queue.
270        List<sp<ABuffer> >::iterator it = mBuffers.begin();
271        while (it != mBuffers.end()) {
272            sp<ABuffer> oldBuffer = *it;
273
274            int32_t oldDiscontinuityType;
275            if (!oldBuffer->meta()->findInt32(
276                        "discontinuity", &oldDiscontinuityType)) {
277                it = mBuffers.erase(it);
278                continue;
279            }
280
281            ++it;
282        }
283    }
284
285    mEOSResult = OK;
286    mLastQueuedTimeUs = 0;
287    mLatestEnqueuedMeta = NULL;
288
289    if (type == ATSParser::DISCONTINUITY_NONE) {
290        return;
291    }
292
293    ++mQueuedDiscontinuityCount;
294    sp<ABuffer> buffer = new ABuffer(0);
295    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
296    buffer->meta()->setMessage("extra", extra);
297
298    mBuffers.push_back(buffer);
299    mCondition.signal();
300}
301
302void AnotherPacketSource::signalEOS(status_t result) {
303    CHECK(result != OK);
304
305    Mutex::Autolock autoLock(mLock);
306    mEOSResult = result;
307    mCondition.signal();
308}
309
310bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
311    Mutex::Autolock autoLock(mLock);
312    *finalResult = OK;
313    if (!mEnabled) {
314        return false;
315    }
316    if (!mBuffers.empty()) {
317        return true;
318    }
319
320    *finalResult = mEOSResult;
321    return false;
322}
323
324bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
325    Mutex::Autolock autoLock(mLock);
326    *finalResult = OK;
327    if (!mEnabled) {
328        return false;
329    }
330    List<sp<ABuffer> >::iterator it;
331    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
332        int32_t discontinuity;
333        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
334            return true;
335        }
336    }
337
338    *finalResult = mEOSResult;
339    return false;
340}
341
342int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
343    Mutex::Autolock autoLock(mLock);
344    return getBufferedDurationUs_l(finalResult);
345}
346
347int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
348    *finalResult = mEOSResult;
349
350    if (mBuffers.empty()) {
351        return 0;
352    }
353
354    int64_t time1 = -1;
355    int64_t time2 = -1;
356    int64_t durationUs = 0;
357
358    List<sp<ABuffer> >::iterator it = mBuffers.begin();
359    while (it != mBuffers.end()) {
360        const sp<ABuffer> &buffer = *it;
361
362        int64_t timeUs;
363        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
364            if (time1 < 0 || timeUs < time1) {
365                time1 = timeUs;
366            }
367
368            if (time2 < 0 || timeUs > time2) {
369                time2 = timeUs;
370            }
371        } else {
372            // This is a discontinuity, reset everything.
373            durationUs += time2 - time1;
374            time1 = time2 = -1;
375        }
376
377        ++it;
378    }
379
380    return durationUs + (time2 - time1);
381}
382
383// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
384// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
385int64_t AnotherPacketSource::getEstimatedDurationUs() {
386    Mutex::Autolock autoLock(mLock);
387    if (mBuffers.empty()) {
388        return 0;
389    }
390
391    if (mQueuedDiscontinuityCount > 0) {
392        status_t finalResult;
393        return getBufferedDurationUs_l(&finalResult);
394    }
395
396    List<sp<ABuffer> >::iterator it = mBuffers.begin();
397    sp<ABuffer> buffer = *it;
398
399    int64_t startTimeUs;
400    buffer->meta()->findInt64("timeUs", &startTimeUs);
401    if (startTimeUs < 0) {
402        return 0;
403    }
404
405    it = mBuffers.end();
406    --it;
407    buffer = *it;
408
409    int64_t endTimeUs;
410    buffer->meta()->findInt64("timeUs", &endTimeUs);
411    if (endTimeUs < 0) {
412        return 0;
413    }
414
415    int64_t diffUs;
416    if (endTimeUs > startTimeUs) {
417        diffUs = endTimeUs - startTimeUs;
418    } else {
419        diffUs = startTimeUs - endTimeUs;
420    }
421    return diffUs;
422}
423
424status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
425    *timeUs = 0;
426
427    Mutex::Autolock autoLock(mLock);
428
429    if (mBuffers.empty()) {
430        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
431    }
432
433    sp<ABuffer> buffer = *mBuffers.begin();
434    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
435
436    return OK;
437}
438
439bool AnotherPacketSource::isFinished(int64_t duration) const {
440    if (duration > 0) {
441        int64_t diff = duration - mLastQueuedTimeUs;
442        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
443            ALOGV("Detecting EOS due to near end");
444            return true;
445        }
446    }
447    return (mEOSResult != OK);
448}
449
450sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
451    Mutex::Autolock autoLock(mLock);
452    return mLatestEnqueuedMeta;
453}
454
455sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
456    Mutex::Autolock autoLock(mLock);
457    return mLatestDequeuedMeta;
458}
459
460void AnotherPacketSource::enable(bool enable) {
461    Mutex::Autolock autoLock(mLock);
462    mEnabled = enable;
463}
464
465/*
466 * returns the sample meta that's delayUs after queue head
467 * (NULL if such sample is unavailable)
468 */
469sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
470    Mutex::Autolock autoLock(mLock);
471    int64_t firstUs = -1;
472    int64_t lastUs = -1;
473    int64_t durationUs = 0;
474
475    List<sp<ABuffer> >::iterator it;
476    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
477        const sp<ABuffer> &buffer = *it;
478        int32_t discontinuity;
479        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
480            durationUs += lastUs - firstUs;
481            firstUs = -1;
482            lastUs = -1;
483            continue;
484        }
485        int64_t timeUs;
486        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
487            if (firstUs < 0) {
488                firstUs = timeUs;
489            }
490            if (lastUs < 0 || timeUs > lastUs) {
491                lastUs = timeUs;
492            }
493            if (durationUs + (lastUs - firstUs) >= delayUs) {
494                return buffer->meta();
495            }
496        }
497    }
498    return NULL;
499}
500
501/*
502 * removes samples with time equal or after meta
503 */
504void AnotherPacketSource::trimBuffersAfterMeta(
505        const sp<AMessage> &meta) {
506    if (meta == NULL) {
507        ALOGW("trimming with NULL meta, ignoring");
508        return;
509    }
510
511    Mutex::Autolock autoLock(mLock);
512    if (mBuffers.empty()) {
513        return;
514    }
515
516    HLSTime stopTime(meta);
517    ALOGV("trimBuffersAfterMeta: discontinuitySeq %zu, timeUs %lld",
518            stopTime.mSeq, (long long)stopTime.mTimeUs);
519
520    List<sp<ABuffer> >::iterator it;
521    sp<AMessage> newLatestEnqueuedMeta = NULL;
522    int64_t newLastQueuedTimeUs = 0;
523    size_t newDiscontinuityCount = 0;
524    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
525        const sp<ABuffer> &buffer = *it;
526        int32_t discontinuity;
527        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
528            newDiscontinuityCount++;
529            continue;
530        }
531
532        HLSTime curTime(buffer->meta());
533        if (!(curTime < stopTime)) {
534            ALOGV("trimming from %lld (inclusive) to end",
535                    (long long)curTime.mTimeUs);
536            break;
537        }
538        newLatestEnqueuedMeta = buffer->meta();
539        newLastQueuedTimeUs = curTime.mTimeUs;
540    }
541    mBuffers.erase(it, mBuffers.end());
542    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
543    mLastQueuedTimeUs = newLastQueuedTimeUs;
544    mQueuedDiscontinuityCount = newDiscontinuityCount;
545}
546
547/*
548 * removes samples with time equal or before meta;
549 * returns first sample left in the queue.
550 *
551 * (for AVC, if trim happens, the samples left will always start
552 * at next IDR.)
553 */
554sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
555        const sp<AMessage> &meta) {
556    HLSTime startTime(meta);
557    ALOGV("trimBuffersBeforeMeta: discontinuitySeq %zu, timeUs %lld",
558            startTime.mSeq, (long long)startTime.mTimeUs);
559
560    sp<AMessage> firstMeta;
561    Mutex::Autolock autoLock(mLock);
562    if (mBuffers.empty()) {
563        return NULL;
564    }
565
566    sp<MetaData> format;
567    bool isAvc = false;
568
569    List<sp<ABuffer> >::iterator it;
570    size_t discontinuityCount = 0;
571    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
572        const sp<ABuffer> &buffer = *it;
573        int32_t discontinuity;
574        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
575            format = NULL;
576            isAvc = false;
577            discontinuityCount++;
578            continue;
579        }
580        if (format == NULL) {
581            sp<RefBase> object;
582            if (buffer->meta()->findObject("format", &object)) {
583                const char* mime;
584                format = static_cast<MetaData*>(object.get());
585                isAvc = format != NULL
586                        && format->findCString(kKeyMIMEType, &mime)
587                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
588            }
589        }
590        if (isAvc && !IsIDR(buffer)) {
591            continue;
592        }
593
594        HLSTime curTime(buffer->meta());
595        if (startTime < curTime) {
596            ALOGV("trimming from beginning to %lld (not inclusive)",
597                    (long long)curTime.mTimeUs);
598            firstMeta = buffer->meta();
599            break;
600        }
601    }
602    mBuffers.erase(mBuffers.begin(), it);
603    mQueuedDiscontinuityCount -= discontinuityCount;
604    mLatestDequeuedMeta = NULL;
605    return firstMeta;
606}
607
608}  // namespace android
609