MediaCodecSource.cpp revision 16e79115e497386eaf010af388627f94314a55a3
1/*
2 * Copyright 2014, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MediaCodecSource"
19#define DEBUG_DRIFT_TIME 0
20
21#include <inttypes.h>
22
23#include <gui/IGraphicBufferProducer.h>
24#include <gui/Surface.h>
25#include <media/ICrypto.h>
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaBuffer.h>
31#include <media/stagefright/MediaCodec.h>
32#include <media/stagefright/MetaData.h>
33#include <media/stagefright/MediaErrors.h>
34#include <media/stagefright/MediaSource.h>
35#include <media/stagefright/MediaCodecSource.h>
36#include <media/stagefright/Utils.h>
37
38namespace android {
39
40static void ReleaseMediaBufferReference(const sp<ABuffer> &accessUnit) {
41    void *mbuf;
42    if (accessUnit->meta()->findPointer("mediaBuffer", &mbuf)
43            && mbuf != NULL) {
44        ALOGV("releasing mbuf %p", mbuf);
45
46        accessUnit->meta()->setPointer("mediaBuffer", NULL);
47
48        static_cast<MediaBuffer *>(mbuf)->release();
49        mbuf = NULL;
50    }
51}
52
53struct MediaCodecSource::Puller : public AHandler {
54    Puller(const sp<MediaSource> &source);
55
56    status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
57    void stop();
58
59    void pause();
60    void resume();
61
62protected:
63    virtual void onMessageReceived(const sp<AMessage> &msg);
64    virtual ~Puller();
65
66private:
67    enum {
68        kWhatStart = 'msta',
69        kWhatStop,
70        kWhatPull,
71        kWhatPause,
72        kWhatResume,
73    };
74
75    sp<MediaSource> mSource;
76    sp<AMessage> mNotify;
77    sp<ALooper> mLooper;
78    int32_t mPullGeneration;
79    bool mIsAudio;
80    bool mPaused;
81    bool mReachedEOS;
82
83    status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
84    void schedulePull();
85    void handleEOS();
86
87    DISALLOW_EVIL_CONSTRUCTORS(Puller);
88};
89
90MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
91    : mSource(source),
92      mLooper(new ALooper()),
93      mPullGeneration(0),
94      mIsAudio(false),
95      mPaused(false),
96      mReachedEOS(false) {
97    sp<MetaData> meta = source->getFormat();
98    const char *mime;
99    CHECK(meta->findCString(kKeyMIMEType, &mime));
100
101    mIsAudio = !strncasecmp(mime, "audio/", 6);
102
103    mLooper->setName("pull_looper");
104}
105
106MediaCodecSource::Puller::~Puller() {
107    mLooper->unregisterHandler(id());
108    mLooper->stop();
109}
110
111status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
112        const sp<AMessage> &msg) {
113    sp<AMessage> response;
114    status_t err = msg->postAndAwaitResponse(&response);
115
116    if (err != OK) {
117        return err;
118    }
119
120    if (!response->findInt32("err", &err)) {
121        err = OK;
122    }
123
124    return err;
125}
126
127status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta,
128        const sp<AMessage> &notify) {
129    ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
130    mLooper->start(
131            false /* runOnCallingThread */,
132            false /* canCallJava */,
133            PRIORITY_AUDIO);
134    mLooper->registerHandler(this);
135    mNotify = notify;
136
137    sp<AMessage> msg = new AMessage(kWhatStart, id());
138    msg->setObject("meta", meta);
139    return postSynchronouslyAndReturnError(msg);
140}
141
142void MediaCodecSource::Puller::stop() {
143    // Stop source from caller's thread instead of puller's looper.
144    // mSource->stop() is thread-safe, doing it outside the puller's
145    // looper allows us to at least stop if source gets stuck.
146    // If source gets stuck in read(), the looper would never
147    // be able to process the stop(), which could lead to ANR.
148
149    ALOGV("source (%s) stopping", mIsAudio ? "audio" : "video");
150    mSource->stop();
151    ALOGV("source (%s) stopped", mIsAudio ? "audio" : "video");
152
153    (new AMessage(kWhatStop, id()))->post();
154}
155
156void MediaCodecSource::Puller::pause() {
157    (new AMessage(kWhatPause, id()))->post();
158}
159
160void MediaCodecSource::Puller::resume() {
161    (new AMessage(kWhatResume, id()))->post();
162}
163
164void MediaCodecSource::Puller::schedulePull() {
165    sp<AMessage> msg = new AMessage(kWhatPull, id());
166    msg->setInt32("generation", mPullGeneration);
167    msg->post();
168}
169
170void MediaCodecSource::Puller::handleEOS() {
171    if (!mReachedEOS) {
172        ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
173        mReachedEOS = true;
174        sp<AMessage> notify = mNotify->dup();
175        notify->setPointer("accessUnit", NULL);
176        notify->post();
177    }
178}
179
180void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
181    switch (msg->what()) {
182        case kWhatStart:
183        {
184            sp<RefBase> obj;
185            CHECK(msg->findObject("meta", &obj));
186
187            mReachedEOS = false;
188
189            status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
190
191            if (err == OK) {
192                schedulePull();
193            }
194
195            sp<AMessage> response = new AMessage;
196            response->setInt32("err", err);
197
198            uint32_t replyID;
199            CHECK(msg->senderAwaitsResponse(&replyID));
200            response->postReply(replyID);
201            break;
202        }
203
204        case kWhatStop:
205        {
206            ++mPullGeneration;
207
208            handleEOS();
209            break;
210        }
211
212        case kWhatPull:
213        {
214            int32_t generation;
215            CHECK(msg->findInt32("generation", &generation));
216
217            if (generation != mPullGeneration) {
218                break;
219            }
220
221            MediaBuffer *mbuf;
222            status_t err = mSource->read(&mbuf);
223
224            if (mPaused) {
225                if (err == OK) {
226                    mbuf->release();
227                    mbuf = NULL;
228                }
229
230                msg->post();
231                break;
232            }
233
234            if (err != OK) {
235                if (err == ERROR_END_OF_STREAM) {
236                    ALOGV("stream ended, mbuf %p", mbuf);
237                } else {
238                    ALOGE("error %d reading stream.", err);
239                }
240                handleEOS();
241            } else {
242                sp<AMessage> notify = mNotify->dup();
243
244                notify->setPointer("accessUnit", mbuf);
245                notify->post();
246
247                msg->post();
248            }
249            break;
250        }
251
252        case kWhatPause:
253        {
254            mPaused = true;
255            break;
256        }
257
258        case kWhatResume:
259        {
260            mPaused = false;
261            break;
262        }
263
264        default:
265            TRESPASS();
266    }
267}
268
269// static
270sp<MediaCodecSource> MediaCodecSource::Create(
271        const sp<ALooper> &looper,
272        const sp<AMessage> &format,
273        const sp<MediaSource> &source,
274        uint32_t flags) {
275    sp<MediaCodecSource> mediaSource =
276            new MediaCodecSource(looper, format, source, flags);
277
278    if (mediaSource->init() == OK) {
279        return mediaSource;
280    }
281    return NULL;
282}
283
284status_t MediaCodecSource::start(MetaData* params) {
285    sp<AMessage> msg = new AMessage(kWhatStart, mReflector->id());
286    msg->setObject("meta", params);
287    return postSynchronouslyAndReturnError(msg);
288}
289
290status_t MediaCodecSource::stop() {
291    sp<AMessage> msg = new AMessage(kWhatStop, mReflector->id());
292    status_t err = postSynchronouslyAndReturnError(msg);
293
294    // mPuller->stop() needs to be done outside MediaCodecSource's looper,
295    // as it contains a synchronous call to stop the underlying MediaSource,
296    // which often waits for all outstanding MediaBuffers to return, but
297    // MediaBuffers are only returned when MediaCodecSource looper gets
298    // to process them.
299
300    if (mPuller != NULL) {
301        ALOGI("puller (%s) stopping", mIsVideo ? "video" : "audio");
302        mPuller->stop();
303        ALOGI("puller (%s) stopped", mIsVideo ? "video" : "audio");
304    }
305
306    return err;
307}
308
309status_t MediaCodecSource::pause() {
310    (new AMessage(kWhatPause, mReflector->id()))->post();
311    return OK;
312}
313
314sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {
315    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
316    return mGraphicBufferProducer;
317}
318
319status_t MediaCodecSource::read(
320        MediaBuffer** buffer, const ReadOptions* /* options */) {
321    Mutex::Autolock autolock(mOutputBufferLock);
322
323    *buffer = NULL;
324    while (mOutputBufferQueue.size() == 0 && !mEncoderReachedEOS) {
325        mOutputBufferCond.wait(mOutputBufferLock);
326    }
327    if (!mEncoderReachedEOS) {
328        *buffer = *mOutputBufferQueue.begin();
329        mOutputBufferQueue.erase(mOutputBufferQueue.begin());
330        return OK;
331    }
332    return mErrorCode;
333}
334
335void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) {
336    buffer->setObserver(0);
337    buffer->release();
338}
339
340MediaCodecSource::MediaCodecSource(
341        const sp<ALooper> &looper,
342        const sp<AMessage> &outputFormat,
343        const sp<MediaSource> &source,
344        uint32_t flags)
345    : mLooper(looper),
346      mOutputFormat(outputFormat),
347      mMeta(new MetaData),
348      mFlags(flags),
349      mIsVideo(false),
350      mStarted(false),
351      mStopping(false),
352      mDoMoreWorkPending(false),
353      mFirstSampleTimeUs(-1ll),
354      mEncoderReachedEOS(false),
355      mErrorCode(OK) {
356    CHECK(mLooper != NULL);
357
358    AString mime;
359    CHECK(mOutputFormat->findString("mime", &mime));
360
361    if (!strncasecmp("video/", mime.c_str(), 6)) {
362        mIsVideo = true;
363    }
364
365    if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
366        mPuller = new Puller(source);
367    }
368}
369
370MediaCodecSource::~MediaCodecSource() {
371    releaseEncoder();
372
373    mCodecLooper->stop();
374    mLooper->unregisterHandler(mReflector->id());
375}
376
377status_t MediaCodecSource::init() {
378    status_t err = initEncoder();
379
380    if (err != OK) {
381        releaseEncoder();
382    }
383
384    return err;
385}
386
387status_t MediaCodecSource::initEncoder() {
388    mReflector = new AHandlerReflector<MediaCodecSource>(this);
389    mLooper->registerHandler(mReflector);
390
391    mCodecLooper = new ALooper;
392    mCodecLooper->setName("codec_looper");
393    mCodecLooper->start();
394
395    if (mFlags & FLAG_USE_METADATA_INPUT) {
396        mOutputFormat->setInt32("store-metadata-in-buffers", 1);
397    }
398
399    if (mFlags & FLAG_USE_SURFACE_INPUT) {
400        mOutputFormat->setInt32("create-input-buffers-suspended", 1);
401    }
402
403    AString outputMIME;
404    CHECK(mOutputFormat->findString("mime", &outputMIME));
405
406    mEncoder = MediaCodec::CreateByType(
407            mCodecLooper, outputMIME.c_str(), true /* encoder */);
408
409    if (mEncoder == NULL) {
410        return NO_INIT;
411    }
412
413    ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str());
414
415    status_t err = mEncoder->configure(
416                mOutputFormat,
417                NULL /* nativeWindow */,
418                NULL /* crypto */,
419                MediaCodec::CONFIGURE_FLAG_ENCODE);
420
421    if (err != OK) {
422        return err;
423    }
424
425    mEncoder->getOutputFormat(&mOutputFormat);
426    convertMessageToMetaData(mOutputFormat, mMeta);
427
428    if (mFlags & FLAG_USE_SURFACE_INPUT) {
429        CHECK(mIsVideo);
430
431        err = mEncoder->createInputSurface(&mGraphicBufferProducer);
432
433        if (err != OK) {
434            return err;
435        }
436    }
437
438    err = mEncoder->start();
439
440    if (err != OK) {
441        return err;
442    }
443
444    err = mEncoder->getInputBuffers(&mEncoderInputBuffers);
445
446    if (err != OK) {
447        return err;
448    }
449
450    err = mEncoder->getOutputBuffers(&mEncoderOutputBuffers);
451
452    if (err != OK) {
453        return err;
454    }
455
456    mEncoderReachedEOS = false;
457    mErrorCode = OK;
458
459    return OK;
460}
461
462void MediaCodecSource::releaseEncoder() {
463    if (mEncoder == NULL) {
464        return;
465    }
466
467    mEncoder->release();
468    mEncoder.clear();
469
470    while (!mInputBufferQueue.empty()) {
471        MediaBuffer *mbuf = *mInputBufferQueue.begin();
472        mInputBufferQueue.erase(mInputBufferQueue.begin());
473        if (mbuf != NULL) {
474            mbuf->release();
475        }
476    }
477
478    for (size_t i = 0; i < mEncoderInputBuffers.size(); ++i) {
479        sp<ABuffer> accessUnit = mEncoderInputBuffers.itemAt(i);
480        ReleaseMediaBufferReference(accessUnit);
481    }
482
483    mEncoderInputBuffers.clear();
484    mEncoderOutputBuffers.clear();
485}
486
487status_t MediaCodecSource::postSynchronouslyAndReturnError(
488        const sp<AMessage> &msg) {
489    sp<AMessage> response;
490    status_t err = msg->postAndAwaitResponse(&response);
491
492    if (err != OK) {
493        return err;
494    }
495
496    if (!response->findInt32("err", &err)) {
497        err = OK;
498    }
499
500    return err;
501}
502
503void MediaCodecSource::signalEOS(status_t err) {
504    if (!mEncoderReachedEOS) {
505        ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
506        {
507            Mutex::Autolock autoLock(mOutputBufferLock);
508            // release all unread media buffers
509            for (List<MediaBuffer*>::iterator it = mOutputBufferQueue.begin();
510                    it != mOutputBufferQueue.end(); it++) {
511                (*it)->release();
512            }
513            mOutputBufferQueue.clear();
514            mEncoderReachedEOS = true;
515            mErrorCode = err;
516            mOutputBufferCond.signal();
517        }
518
519        releaseEncoder();
520    }
521    if (mStopping && mEncoderReachedEOS) {
522        ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
523        // posting reply to everyone that's waiting
524        List<uint32_t>::iterator it;
525        for (it = mStopReplyIDQueue.begin();
526                it != mStopReplyIDQueue.end(); it++) {
527            (new AMessage)->postReply(*it);
528        }
529        mStopReplyIDQueue.clear();
530        mStopping = false;
531    }
532}
533
534void MediaCodecSource::suspend() {
535    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
536    if (mEncoder != NULL) {
537        sp<AMessage> params = new AMessage;
538        params->setInt32("drop-input-frames", true);
539        mEncoder->setParameters(params);
540    }
541}
542
543void MediaCodecSource::resume(int64_t skipFramesBeforeUs) {
544    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
545    if (mEncoder != NULL) {
546        sp<AMessage> params = new AMessage;
547        params->setInt32("drop-input-frames", false);
548        if (skipFramesBeforeUs > 0) {
549            params->setInt64("skip-frames-before", skipFramesBeforeUs);
550        }
551        mEncoder->setParameters(params);
552    }
553}
554
555void MediaCodecSource::scheduleDoMoreWork() {
556    if (mDoMoreWorkPending) {
557        return;
558    }
559
560    mDoMoreWorkPending = true;
561
562    if (mEncoderActivityNotify == NULL) {
563        mEncoderActivityNotify = new AMessage(
564                kWhatEncoderActivity, mReflector->id());
565    }
566    mEncoder->requestActivityNotification(mEncoderActivityNotify);
567}
568
569status_t MediaCodecSource::feedEncoderInputBuffers() {
570    while (!mInputBufferQueue.empty()
571            && !mAvailEncoderInputIndices.empty()) {
572        MediaBuffer* mbuf = *mInputBufferQueue.begin();
573        mInputBufferQueue.erase(mInputBufferQueue.begin());
574
575        size_t bufferIndex = *mAvailEncoderInputIndices.begin();
576        mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
577
578        int64_t timeUs = 0ll;
579        uint32_t flags = 0;
580        size_t size = 0;
581
582        if (mbuf != NULL) {
583            CHECK(mbuf->meta_data()->findInt64(kKeyTime, &timeUs));
584
585            // push decoding time for video, or drift time for audio
586            if (mIsVideo) {
587                mDecodingTimeQueue.push_back(timeUs);
588            } else {
589#if DEBUG_DRIFT_TIME
590                if (mFirstSampleTimeUs < 0ll) {
591                    mFirstSampleTimeUs = timeUs;
592                }
593
594                int64_t driftTimeUs = 0;
595                if (mbuf->meta_data()->findInt64(kKeyDriftTime, &driftTimeUs)
596                        && driftTimeUs) {
597                    driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs;
598                }
599                mDriftTimeQueue.push_back(driftTimeUs);
600#endif // DEBUG_DRIFT_TIME
601            }
602
603            size = mbuf->size();
604
605            memcpy(mEncoderInputBuffers.itemAt(bufferIndex)->data(),
606                   mbuf->data(), size);
607
608            if (mIsVideo) {
609                // video encoder will release MediaBuffer when done
610                // with underlying data.
611                mEncoderInputBuffers.itemAt(bufferIndex)->meta()
612                        ->setPointer("mediaBuffer", mbuf);
613            } else {
614                mbuf->release();
615            }
616        } else {
617            flags = MediaCodec::BUFFER_FLAG_EOS;
618        }
619
620        status_t err = mEncoder->queueInputBuffer(
621                bufferIndex, 0, size, timeUs, flags);
622
623        if (err != OK) {
624            return err;
625        }
626    }
627
628    return OK;
629}
630
631status_t MediaCodecSource::doMoreWork() {
632    status_t err;
633
634    if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
635        for (;;) {
636            size_t bufferIndex;
637            err = mEncoder->dequeueInputBuffer(&bufferIndex);
638
639            if (err != OK) {
640                break;
641            }
642
643            mAvailEncoderInputIndices.push_back(bufferIndex);
644        }
645
646        feedEncoderInputBuffers();
647    }
648
649    for (;;) {
650        size_t bufferIndex;
651        size_t offset;
652        size_t size;
653        int64_t timeUs;
654        uint32_t flags;
655        native_handle_t* handle = NULL;
656        err = mEncoder->dequeueOutputBuffer(
657                &bufferIndex, &offset, &size, &timeUs, &flags);
658
659        if (err != OK) {
660            if (err == INFO_FORMAT_CHANGED) {
661                continue;
662            } else if (err == INFO_OUTPUT_BUFFERS_CHANGED) {
663                mEncoder->getOutputBuffers(&mEncoderOutputBuffers);
664                continue;
665            }
666
667            if (err == -EAGAIN) {
668                err = OK;
669            }
670            break;
671        }
672        if (!(flags & MediaCodec::BUFFER_FLAG_EOS)) {
673            sp<ABuffer> outbuf = mEncoderOutputBuffers.itemAt(bufferIndex);
674
675            MediaBuffer *mbuf = new MediaBuffer(outbuf->size());
676            memcpy(mbuf->data(), outbuf->data(), outbuf->size());
677
678            if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) {
679                if (mIsVideo) {
680                    int64_t decodingTimeUs;
681                    if (mFlags & FLAG_USE_SURFACE_INPUT) {
682                        // GraphicBufferSource is supposed to discard samples
683                        // queued before start, and offset timeUs by start time
684                        CHECK_GE(timeUs, 0ll);
685                        // TODO:
686                        // Decoding time for surface source is unavailable,
687                        // use presentation time for now. May need to move
688                        // this logic into MediaCodec.
689                        decodingTimeUs = timeUs;
690                    } else {
691                        CHECK(!mDecodingTimeQueue.empty());
692                        decodingTimeUs = *(mDecodingTimeQueue.begin());
693                        mDecodingTimeQueue.erase(mDecodingTimeQueue.begin());
694                    }
695                    mbuf->meta_data()->setInt64(kKeyDecodingTime, decodingTimeUs);
696
697                    ALOGV("[video] time %" PRId64 " us (%.2f secs), dts/pts diff %" PRId64,
698                            timeUs, timeUs / 1E6, decodingTimeUs - timeUs);
699                } else {
700                    int64_t driftTimeUs = 0;
701#if DEBUG_DRIFT_TIME
702                    CHECK(!mDriftTimeQueue.empty());
703                    driftTimeUs = *(mDriftTimeQueue.begin());
704                    mDriftTimeQueue.erase(mDriftTimeQueue.begin());
705                    mbuf->meta_data()->setInt64(kKeyDriftTime, driftTimeUs);
706#endif // DEBUG_DRIFT_TIME
707                    ALOGV("[audio] time %" PRId64 " us (%.2f secs), drift %" PRId64,
708                            timeUs, timeUs / 1E6, driftTimeUs);
709                }
710                mbuf->meta_data()->setInt64(kKeyTime, timeUs);
711            } else {
712                mbuf->meta_data()->setInt32(kKeyIsCodecConfig, true);
713            }
714            if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) {
715                mbuf->meta_data()->setInt32(kKeyIsSyncFrame, true);
716            }
717            mbuf->setObserver(this);
718            mbuf->add_ref();
719
720            {
721                Mutex::Autolock autoLock(mOutputBufferLock);
722                mOutputBufferQueue.push_back(mbuf);
723                mOutputBufferCond.signal();
724            }
725        }
726
727        mEncoder->releaseOutputBuffer(bufferIndex);
728
729        if (flags & MediaCodec::BUFFER_FLAG_EOS) {
730            err = ERROR_END_OF_STREAM;
731            break;
732        }
733    }
734
735    return err;
736}
737
738status_t MediaCodecSource::onStart(MetaData *params) {
739    if (mStopping) {
740        ALOGE("Failed to start while we're stopping");
741        return INVALID_OPERATION;
742    }
743
744    if (mStarted) {
745        ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio");
746        if (mFlags & FLAG_USE_SURFACE_INPUT) {
747            resume();
748        } else {
749            CHECK(mPuller != NULL);
750            mPuller->resume();
751        }
752        return OK;
753    }
754
755    ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio");
756
757    status_t err = OK;
758
759    if (mFlags & FLAG_USE_SURFACE_INPUT) {
760        int64_t startTimeUs;
761        if (!params || !params->findInt64(kKeyTime, &startTimeUs)) {
762            startTimeUs = -1ll;
763        }
764        resume(startTimeUs);
765        scheduleDoMoreWork();
766    } else {
767        CHECK(mPuller != NULL);
768        sp<AMessage> notify = new AMessage(
769                kWhatPullerNotify, mReflector->id());
770        err = mPuller->start(params, notify);
771        if (err != OK) {
772            return err;
773        }
774    }
775
776    ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio");
777
778    mStarted = true;
779    return OK;
780}
781
782void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
783    switch (msg->what()) {
784    case kWhatPullerNotify:
785    {
786        MediaBuffer *mbuf;
787        CHECK(msg->findPointer("accessUnit", (void**)&mbuf));
788
789        if (mbuf == NULL) {
790            ALOGV("puller (%s) reached EOS",
791                    mIsVideo ? "video" : "audio");
792            signalEOS();
793        }
794
795        if (mEncoder == NULL) {
796            ALOGV("got msg '%s' after encoder shutdown.",
797                  msg->debugString().c_str());
798
799            if (mbuf != NULL) {
800                mbuf->release();
801            }
802
803            break;
804        }
805
806        mInputBufferQueue.push_back(mbuf);
807
808        feedEncoderInputBuffers();
809        scheduleDoMoreWork();
810
811        break;
812    }
813    case kWhatEncoderActivity:
814    {
815        mDoMoreWorkPending = false;
816
817        if (mEncoder == NULL) {
818            break;
819        }
820
821        status_t err = doMoreWork();
822
823        if (err == OK) {
824            scheduleDoMoreWork();
825        } else {
826            // reached EOS, or error
827            signalEOS(err);
828        }
829
830        break;
831    }
832    case kWhatStart:
833    {
834        uint32_t replyID;
835        CHECK(msg->senderAwaitsResponse(&replyID));
836
837        sp<RefBase> obj;
838        CHECK(msg->findObject("meta", &obj));
839        MetaData *params = static_cast<MetaData *>(obj.get());
840
841        sp<AMessage> response = new AMessage;
842        response->setInt32("err", onStart(params));
843        response->postReply(replyID);
844        break;
845    }
846    case kWhatStop:
847    {
848        ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio");
849
850        uint32_t replyID;
851        CHECK(msg->senderAwaitsResponse(&replyID));
852
853        if (mEncoderReachedEOS) {
854            // if we already reached EOS, reply and return now
855            ALOGI("encoder (%s) already stopped",
856                    mIsVideo ? "video" : "audio");
857            (new AMessage)->postReply(replyID);
858            break;
859        }
860
861        mStopReplyIDQueue.push_back(replyID);
862        if (mStopping) {
863            // nothing to do if we're already stopping, reply will be posted
864            // to all when we're stopped.
865            break;
866        }
867
868        mStopping = true;
869
870        // if using surface, signal source EOS and wait for EOS to come back.
871        // otherwise, release encoder and post EOS if haven't done already
872        if (mFlags & FLAG_USE_SURFACE_INPUT) {
873            mEncoder->signalEndOfInputStream();
874        } else {
875            signalEOS();
876        }
877        break;
878    }
879    case kWhatPause:
880    {
881        if (mFlags && FLAG_USE_SURFACE_INPUT) {
882            suspend();
883        } else {
884            CHECK(mPuller != NULL);
885            mPuller->pause();
886        }
887        break;
888    }
889    default:
890        TRESPASS();
891    }
892}
893
894} // namespace android
895