AnotherPacketSource.cpp revision 4123d6db0642cd13e69230705b12d6b6fee6f73f
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "AnotherPacketSource"
19
20#include "AnotherPacketSource.h"
21
22#include "include/avc_utils.h"
23
24#include <media/stagefright/foundation/ABuffer.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/foundation/AString.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaBuffer.h>
30#include <media/stagefright/MediaDefs.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33#include <utils/Vector.h>
34
35#include <inttypes.h>
36
37namespace android {
38
39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42    : mIsAudio(false),
43      mIsVideo(false),
44      mEnabled(true),
45      mFormat(NULL),
46      mLastQueuedTimeUs(0),
47      mEOSResult(OK),
48      mLatestEnqueuedMeta(NULL),
49      mLatestDequeuedMeta(NULL),
50      mQueuedDiscontinuityCount(0) {
51    setFormat(meta);
52}
53
54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
55    if (mFormat != NULL) {
56        // Only allowed to be set once. Requires explicit clear to reset.
57        return;
58    }
59
60    mIsAudio = false;
61    mIsVideo = false;
62
63    if (meta == NULL) {
64        return;
65    }
66
67    mFormat = meta;
68    const char *mime;
69    CHECK(meta->findCString(kKeyMIMEType, &mime));
70
71    if (!strncasecmp("audio/", mime, 6)) {
72        mIsAudio = true;
73    } else  if (!strncasecmp("video/", mime, 6)) {
74        mIsVideo = true;
75    } else {
76        CHECK(!strncasecmp("text/", mime, 5));
77    }
78}
79
80AnotherPacketSource::~AnotherPacketSource() {
81}
82
83status_t AnotherPacketSource::start(MetaData * /* params */) {
84    return OK;
85}
86
87status_t AnotherPacketSource::stop() {
88    return OK;
89}
90
91sp<MetaData> AnotherPacketSource::getFormat() {
92    Mutex::Autolock autoLock(mLock);
93    if (mFormat != NULL) {
94        return mFormat;
95    }
96
97    List<sp<ABuffer> >::iterator it = mBuffers.begin();
98    while (it != mBuffers.end()) {
99        sp<ABuffer> buffer = *it;
100        int32_t discontinuity;
101        if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
102            sp<RefBase> object;
103            if (buffer->meta()->findObject("format", &object)) {
104                setFormat(static_cast<MetaData*>(object.get()));
105                return mFormat;
106            }
107        }
108
109        ++it;
110    }
111    return NULL;
112}
113
114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
115    buffer->clear();
116
117    Mutex::Autolock autoLock(mLock);
118    while (mEOSResult == OK && mBuffers.empty()) {
119        mCondition.wait(mLock);
120    }
121
122    if (!mBuffers.empty()) {
123        *buffer = *mBuffers.begin();
124        mBuffers.erase(mBuffers.begin());
125
126        int32_t discontinuity;
127        if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
128            if (wasFormatChange(discontinuity)) {
129                mFormat.clear();
130            }
131
132            --mQueuedDiscontinuityCount;
133            return INFO_DISCONTINUITY;
134        }
135
136        mLatestDequeuedMeta = (*buffer)->meta()->dup();
137
138        sp<RefBase> object;
139        if ((*buffer)->meta()->findObject("format", &object)) {
140            setFormat(static_cast<MetaData*>(object.get()));
141        }
142
143        return OK;
144    }
145
146    return mEOSResult;
147}
148
149status_t AnotherPacketSource::read(
150        MediaBuffer **out, const ReadOptions *) {
151    *out = NULL;
152
153    Mutex::Autolock autoLock(mLock);
154    while (mEOSResult == OK && mBuffers.empty()) {
155        mCondition.wait(mLock);
156    }
157
158    if (!mBuffers.empty()) {
159
160        const sp<ABuffer> buffer = *mBuffers.begin();
161        mBuffers.erase(mBuffers.begin());
162
163        int32_t discontinuity;
164        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
165            if (wasFormatChange(discontinuity)) {
166                mFormat.clear();
167            }
168
169            return INFO_DISCONTINUITY;
170        }
171
172        mLatestDequeuedMeta = buffer->meta()->dup();
173
174        sp<RefBase> object;
175        if (buffer->meta()->findObject("format", &object)) {
176            setFormat(static_cast<MetaData*>(object.get()));
177        }
178
179        int64_t timeUs;
180        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
181
182        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
183
184        mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
185
186        int32_t isSync;
187        if (buffer->meta()->findInt32("isSync", &isSync)) {
188            mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
189        }
190
191        *out = mediaBuffer;
192        return OK;
193    }
194
195    return mEOSResult;
196}
197
198bool AnotherPacketSource::wasFormatChange(
199        int32_t discontinuityType) const {
200    if (mIsAudio) {
201        return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
202    }
203
204    if (mIsVideo) {
205        return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
206    }
207
208    return false;
209}
210
211void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
212    int32_t damaged;
213    if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
214        // LOG(VERBOSE) << "discarding damaged AU";
215        return;
216    }
217
218    Mutex::Autolock autoLock(mLock);
219    mBuffers.push_back(buffer);
220    mCondition.signal();
221
222    int32_t discontinuity;
223    if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
224        // discontinuity handling needs to be consistent with queueDiscontinuity()
225        ++mQueuedDiscontinuityCount;
226        mLastQueuedTimeUs = 0ll;
227        mEOSResult = OK;
228        mLatestEnqueuedMeta = NULL;
229        return;
230    }
231
232    int64_t lastQueuedTimeUs;
233    CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
234    mLastQueuedTimeUs = lastQueuedTimeUs;
235    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
236            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
237
238    if (mLatestEnqueuedMeta == NULL) {
239        mLatestEnqueuedMeta = buffer->meta()->dup();
240    } else {
241        int64_t latestTimeUs = 0;
242        int64_t frameDeltaUs = 0;
243        CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
244        if (lastQueuedTimeUs > latestTimeUs) {
245            mLatestEnqueuedMeta = buffer->meta()->dup();
246            frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
247            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
248        } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
249            // For B frames
250            frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
251            mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
252        }
253    }
254}
255
256void AnotherPacketSource::clear() {
257    Mutex::Autolock autoLock(mLock);
258
259    mBuffers.clear();
260    mEOSResult = OK;
261    mQueuedDiscontinuityCount = 0;
262
263    mFormat = NULL;
264    mLatestEnqueuedMeta = NULL;
265}
266
267void AnotherPacketSource::queueDiscontinuity(
268        ATSParser::DiscontinuityType type,
269        const sp<AMessage> &extra,
270        bool discard) {
271    Mutex::Autolock autoLock(mLock);
272
273    if (discard) {
274        // Leave only discontinuities in the queue.
275        List<sp<ABuffer> >::iterator it = mBuffers.begin();
276        while (it != mBuffers.end()) {
277            sp<ABuffer> oldBuffer = *it;
278
279            int32_t oldDiscontinuityType;
280            if (!oldBuffer->meta()->findInt32(
281                        "discontinuity", &oldDiscontinuityType)) {
282                it = mBuffers.erase(it);
283                continue;
284            }
285
286            ++it;
287        }
288    }
289
290    mEOSResult = OK;
291    mLastQueuedTimeUs = 0;
292    mLatestEnqueuedMeta = NULL;
293
294    if (type == ATSParser::DISCONTINUITY_NONE) {
295        return;
296    }
297
298    ++mQueuedDiscontinuityCount;
299    sp<ABuffer> buffer = new ABuffer(0);
300    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
301    buffer->meta()->setMessage("extra", extra);
302
303    mBuffers.push_back(buffer);
304    mCondition.signal();
305}
306
307void AnotherPacketSource::signalEOS(status_t result) {
308    CHECK(result != OK);
309
310    Mutex::Autolock autoLock(mLock);
311    mEOSResult = result;
312    mCondition.signal();
313}
314
315bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
316    Mutex::Autolock autoLock(mLock);
317    *finalResult = OK;
318    if (!mEnabled) {
319        return false;
320    }
321    if (!mBuffers.empty()) {
322        return true;
323    }
324
325    *finalResult = mEOSResult;
326    return false;
327}
328
329bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
330    Mutex::Autolock autoLock(mLock);
331    *finalResult = OK;
332    if (!mEnabled) {
333        return false;
334    }
335    List<sp<ABuffer> >::iterator it;
336    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
337        int32_t discontinuity;
338        if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
339            return true;
340        }
341    }
342
343    *finalResult = mEOSResult;
344    return false;
345}
346
347int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
348    Mutex::Autolock autoLock(mLock);
349    return getBufferedDurationUs_l(finalResult);
350}
351
352int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
353    *finalResult = mEOSResult;
354
355    if (mBuffers.empty()) {
356        return 0;
357    }
358
359    int64_t time1 = -1;
360    int64_t time2 = -1;
361    int64_t durationUs = 0;
362
363    List<sp<ABuffer> >::iterator it;
364    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
365        const sp<ABuffer> &buffer = *it;
366
367        int32_t discard;
368        if (buffer->meta()->findInt32("discard", &discard) && discard) {
369            continue;
370        }
371
372        int64_t timeUs;
373        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
374            if (time1 < 0 || timeUs < time1) {
375                time1 = timeUs;
376            }
377
378            if (time2 < 0 || timeUs > time2) {
379                time2 = timeUs;
380            }
381        } else {
382            // This is a discontinuity, reset everything.
383            durationUs += time2 - time1;
384            time1 = time2 = -1;
385        }
386    }
387
388    return durationUs + (time2 - time1);
389}
390
391// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
392// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
393int64_t AnotherPacketSource::getEstimatedDurationUs() {
394    Mutex::Autolock autoLock(mLock);
395    if (mBuffers.empty()) {
396        return 0;
397    }
398
399    if (mQueuedDiscontinuityCount > 0) {
400        status_t finalResult;
401        return getBufferedDurationUs_l(&finalResult);
402    }
403
404    sp<ABuffer> buffer;
405    int32_t discard;
406    int64_t startTimeUs = -1ll;
407    List<sp<ABuffer> >::iterator it;
408    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
409        buffer = *it;
410        if (buffer->meta()->findInt32("discard", &discard) && discard) {
411            continue;
412        }
413        buffer->meta()->findInt64("timeUs", &startTimeUs);
414        break;
415    }
416
417    if (startTimeUs < 0) {
418        return 0;
419    }
420
421    it = mBuffers.end();
422    --it;
423    buffer = *it;
424
425    int64_t endTimeUs;
426    buffer->meta()->findInt64("timeUs", &endTimeUs);
427    if (endTimeUs < 0) {
428        return 0;
429    }
430
431    int64_t diffUs;
432    if (endTimeUs > startTimeUs) {
433        diffUs = endTimeUs - startTimeUs;
434    } else {
435        diffUs = startTimeUs - endTimeUs;
436    }
437    return diffUs;
438}
439
440status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
441    *timeUs = 0;
442
443    Mutex::Autolock autoLock(mLock);
444
445    if (mBuffers.empty()) {
446        return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
447    }
448
449    sp<ABuffer> buffer = *mBuffers.begin();
450    CHECK(buffer->meta()->findInt64("timeUs", timeUs));
451
452    return OK;
453}
454
455bool AnotherPacketSource::isFinished(int64_t duration) const {
456    if (duration > 0) {
457        int64_t diff = duration - mLastQueuedTimeUs;
458        if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
459            ALOGV("Detecting EOS due to near end");
460            return true;
461        }
462    }
463    return (mEOSResult != OK);
464}
465
466sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
467    Mutex::Autolock autoLock(mLock);
468    return mLatestEnqueuedMeta;
469}
470
471sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
472    Mutex::Autolock autoLock(mLock);
473    return mLatestDequeuedMeta;
474}
475
476void AnotherPacketSource::enable(bool enable) {
477    Mutex::Autolock autoLock(mLock);
478    mEnabled = enable;
479}
480
481/*
482 * returns the sample meta that's delayUs after queue head
483 * (NULL if such sample is unavailable)
484 */
485sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
486    Mutex::Autolock autoLock(mLock);
487    int64_t firstUs = -1;
488    int64_t lastUs = -1;
489    int64_t durationUs = 0;
490
491    List<sp<ABuffer> >::iterator it;
492    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
493        const sp<ABuffer> &buffer = *it;
494        int32_t discontinuity;
495        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
496            durationUs += lastUs - firstUs;
497            firstUs = -1;
498            lastUs = -1;
499            continue;
500        }
501        int64_t timeUs;
502        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
503            if (firstUs < 0) {
504                firstUs = timeUs;
505            }
506            if (lastUs < 0 || timeUs > lastUs) {
507                lastUs = timeUs;
508            }
509            if (durationUs + (lastUs - firstUs) >= delayUs) {
510                return buffer->meta();
511            }
512        }
513    }
514    return NULL;
515}
516
517/*
518 * removes samples with time equal or after meta
519 */
520void AnotherPacketSource::trimBuffersAfterMeta(
521        const sp<AMessage> &meta) {
522    if (meta == NULL) {
523        ALOGW("trimming with NULL meta, ignoring");
524        return;
525    }
526
527    Mutex::Autolock autoLock(mLock);
528    if (mBuffers.empty()) {
529        return;
530    }
531
532    HLSTime stopTime(meta);
533    ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
534            stopTime.mSeq, (long long)stopTime.mTimeUs);
535
536    List<sp<ABuffer> >::iterator it;
537    sp<AMessage> newLatestEnqueuedMeta = NULL;
538    int64_t newLastQueuedTimeUs = 0;
539    size_t newDiscontinuityCount = 0;
540    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
541        const sp<ABuffer> &buffer = *it;
542        int32_t discontinuity;
543        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
544            newDiscontinuityCount++;
545            continue;
546        }
547
548        HLSTime curTime(buffer->meta());
549        if (!(curTime < stopTime)) {
550            ALOGV("trimming from %lld (inclusive) to end",
551                    (long long)curTime.mTimeUs);
552            break;
553        }
554        newLatestEnqueuedMeta = buffer->meta();
555        newLastQueuedTimeUs = curTime.mTimeUs;
556    }
557    mBuffers.erase(it, mBuffers.end());
558    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
559    mLastQueuedTimeUs = newLastQueuedTimeUs;
560    mQueuedDiscontinuityCount = newDiscontinuityCount;
561}
562
563/*
564 * removes samples with time equal or before meta;
565 * returns first sample left in the queue.
566 *
567 * (for AVC, if trim happens, the samples left will always start
568 * at next IDR.)
569 */
570sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
571        const sp<AMessage> &meta) {
572    HLSTime startTime(meta);
573    ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
574            startTime.mSeq, (long long)startTime.mTimeUs);
575
576    sp<AMessage> firstMeta;
577    Mutex::Autolock autoLock(mLock);
578    if (mBuffers.empty()) {
579        return NULL;
580    }
581
582    sp<MetaData> format;
583    bool isAvc = false;
584
585    List<sp<ABuffer> >::iterator it;
586    size_t discontinuityCount = 0;
587    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
588        const sp<ABuffer> &buffer = *it;
589        int32_t discontinuity;
590        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
591            format = NULL;
592            isAvc = false;
593            discontinuityCount++;
594            continue;
595        }
596        if (format == NULL) {
597            sp<RefBase> object;
598            if (buffer->meta()->findObject("format", &object)) {
599                const char* mime;
600                format = static_cast<MetaData*>(object.get());
601                isAvc = format != NULL
602                        && format->findCString(kKeyMIMEType, &mime)
603                        && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
604            }
605        }
606        if (isAvc && !IsIDR(buffer)) {
607            continue;
608        }
609
610        HLSTime curTime(buffer->meta());
611        if (startTime < curTime) {
612            ALOGV("trimming from beginning to %lld (not inclusive)",
613                    (long long)curTime.mTimeUs);
614            firstMeta = buffer->meta();
615            break;
616        }
617    }
618    mBuffers.erase(mBuffers.begin(), it);
619    mQueuedDiscontinuityCount -= discontinuityCount;
620    mLatestDequeuedMeta = NULL;
621    return firstMeta;
622}
623
624}  // namespace android
625