MediaCodecSource.cpp revision 901ca36bf02726ca07d83820c93c76d696549a03
1/*
2 * Copyright 2014, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MediaCodecSource"
19#define DEBUG_DRIFT_TIME 0
20
21#include <inttypes.h>
22
23#include <gui/IGraphicBufferConsumer.h>
24#include <gui/IGraphicBufferProducer.h>
25#include <gui/Surface.h>
26#include <media/ICrypto.h>
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/ALooper.h>
30#include <media/stagefright/foundation/AMessage.h>
31#include <media/stagefright/MediaBuffer.h>
32#include <media/stagefright/MediaCodec.h>
33#include <media/stagefright/MediaCodecList.h>
34#include <media/stagefright/MediaCodecSource.h>
35#include <media/stagefright/MediaErrors.h>
36#include <media/stagefright/MediaSource.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/PersistentSurface.h>
39#include <media/stagefright/Utils.h>
40
41namespace android {
42
43const int32_t kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
44const int32_t kDefaultHwVideoEncoderFormat = HAL_PIXEL_FORMAT_IMPLEMENTATION_DEFINED;
45const int32_t kDefaultVideoEncoderDataSpace = HAL_DATASPACE_V0_BT709;
46
47const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder
48
49struct MediaCodecSource::Puller : public AHandler {
50    Puller(const sp<MediaSource> &source);
51
52    void interruptSource();
53    status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
54    void stop();
55    void stopSource();
56    void pause();
57    void resume();
58
59    bool readBuffer(MediaBuffer **buffer);
60
61protected:
62    virtual void onMessageReceived(const sp<AMessage> &msg);
63    virtual ~Puller();
64
65private:
66    enum {
67        kWhatStart = 'msta',
68        kWhatStop,
69        kWhatPull,
70    };
71
72    sp<MediaSource> mSource;
73    sp<AMessage> mNotify;
74    sp<ALooper> mLooper;
75    bool mIsAudio;
76
77    struct Queue {
78        Queue()
79            : mReadPendingSince(0),
80              mPaused(false),
81              mPulling(false) { }
82        int64_t mReadPendingSince;
83        bool mPaused;
84        bool mPulling;
85        Vector<MediaBuffer *> mReadBuffers;
86
87        void flush();
88        // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
89        // buffer from front of the queue, place it into *|buffer| and return true.
90        bool readBuffer(MediaBuffer **buffer);
91        // add a buffer to the back of the queue
92        void pushBuffer(MediaBuffer *mbuf);
93    };
94    Mutexed<Queue> mQueue;
95
96    status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
97    void schedulePull();
98    void handleEOS();
99
100    DISALLOW_EVIL_CONSTRUCTORS(Puller);
101};
102
103MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
104    : mSource(source),
105      mLooper(new ALooper()),
106      mIsAudio(false)
107{
108    sp<MetaData> meta = source->getFormat();
109    const char *mime;
110    CHECK(meta->findCString(kKeyMIMEType, &mime));
111
112    mIsAudio = !strncasecmp(mime, "audio/", 6);
113
114    mLooper->setName("pull_looper");
115}
116
117MediaCodecSource::Puller::~Puller() {
118    mLooper->unregisterHandler(id());
119    mLooper->stop();
120}
121
122void MediaCodecSource::Puller::Queue::pushBuffer(MediaBuffer *mbuf) {
123    mReadBuffers.push_back(mbuf);
124}
125
126bool MediaCodecSource::Puller::Queue::readBuffer(MediaBuffer **mbuf) {
127    if (mReadBuffers.empty()) {
128        *mbuf = NULL;
129        return false;
130    }
131    *mbuf = *mReadBuffers.begin();
132    mReadBuffers.erase(mReadBuffers.begin());
133    return true;
134}
135
136void MediaCodecSource::Puller::Queue::flush() {
137    MediaBuffer *mbuf;
138    while (readBuffer(&mbuf)) {
139        // there are no null buffers in the queue
140        mbuf->release();
141    }
142}
143
144bool MediaCodecSource::Puller::readBuffer(MediaBuffer **mbuf) {
145    Mutexed<Queue>::Locked queue(mQueue);
146    return queue->readBuffer(mbuf);
147}
148
149status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
150        const sp<AMessage> &msg) {
151    sp<AMessage> response;
152    status_t err = msg->postAndAwaitResponse(&response);
153
154    if (err != OK) {
155        return err;
156    }
157
158    if (!response->findInt32("err", &err)) {
159        err = OK;
160    }
161
162    return err;
163}
164
165status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> &notify) {
166    ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
167    mLooper->start(
168            false /* runOnCallingThread */,
169            false /* canCallJava */,
170            PRIORITY_AUDIO);
171    mLooper->registerHandler(this);
172    mNotify = notify;
173
174    sp<AMessage> msg = new AMessage(kWhatStart, this);
175    msg->setObject("meta", meta);
176    return postSynchronouslyAndReturnError(msg);
177}
178
179void MediaCodecSource::Puller::stop() {
180    bool interrupt = false;
181    {
182        // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
183        // stop.
184        Mutexed<Queue>::Locked queue(mQueue);
185        queue->mPulling = false;
186        interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
187        queue->flush(); // flush any unprocessed pulled buffers
188    }
189
190    if (interrupt) {
191        interruptSource();
192    }
193}
194
195void MediaCodecSource::Puller::interruptSource() {
196    // call source->stop if read has been pending for over a second
197    // We have to call this outside the looper as looper is pending on the read.
198    mSource->stop();
199}
200
201void MediaCodecSource::Puller::stopSource() {
202    sp<AMessage> msg = new AMessage(kWhatStop, this);
203    (void)postSynchronouslyAndReturnError(msg);
204}
205
206void MediaCodecSource::Puller::pause() {
207    Mutexed<Queue>::Locked queue(mQueue);
208    queue->mPaused = true;
209}
210
211void MediaCodecSource::Puller::resume() {
212    Mutexed<Queue>::Locked queue(mQueue);
213    queue->mPaused = false;
214}
215
216void MediaCodecSource::Puller::schedulePull() {
217    (new AMessage(kWhatPull, this))->post();
218}
219
220void MediaCodecSource::Puller::handleEOS() {
221    ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
222    sp<AMessage> msg = mNotify->dup();
223    msg->setInt32("eos", 1);
224    msg->post();
225}
226
227void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
228    switch (msg->what()) {
229        case kWhatStart:
230        {
231            sp<RefBase> obj;
232            CHECK(msg->findObject("meta", &obj));
233
234            {
235                Mutexed<Queue>::Locked queue(mQueue);
236                queue->mPulling = true;
237            }
238
239            status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
240
241            if (err == OK) {
242                schedulePull();
243            }
244
245            sp<AMessage> response = new AMessage;
246            response->setInt32("err", err);
247
248            sp<AReplyToken> replyID;
249            CHECK(msg->senderAwaitsResponse(&replyID));
250            response->postReply(replyID);
251            break;
252        }
253
254        case kWhatStop:
255        {
256            mSource->stop();
257
258            sp<AMessage> response = new AMessage;
259            response->setInt32("err", OK);
260
261            sp<AReplyToken> replyID;
262            CHECK(msg->senderAwaitsResponse(&replyID));
263            response->postReply(replyID);
264            break;
265        }
266
267        case kWhatPull:
268        {
269            Mutexed<Queue>::Locked queue(mQueue);
270            queue->mReadPendingSince = ALooper::GetNowUs();
271            if (!queue->mPulling) {
272                handleEOS();
273                break;
274            }
275
276            queue.unlock();
277            MediaBuffer *mbuf = NULL;
278            status_t err = mSource->read(&mbuf);
279            queue.lock();
280
281            queue->mReadPendingSince = 0;
282            // if we need to discard buffer
283            if (!queue->mPulling || queue->mPaused || err != OK) {
284                if (mbuf != NULL) {
285                    mbuf->release();
286                    mbuf = NULL;
287                }
288                if (queue->mPulling && err == OK) {
289                    msg->post(); // if simply paused, keep pulling source
290                    break;
291                } else if (err == ERROR_END_OF_STREAM) {
292                    ALOGV("stream ended, mbuf %p", mbuf);
293                } else if (err != OK) {
294                    ALOGE("error %d reading stream.", err);
295                }
296            }
297
298            if (mbuf != NULL) {
299                queue->pushBuffer(mbuf);
300            }
301
302            queue.unlock();
303
304            if (mbuf != NULL) {
305                mNotify->post();
306                msg->post();
307            } else {
308                handleEOS();
309            }
310            break;
311        }
312
313        default:
314            TRESPASS();
315    }
316}
317
318MediaCodecSource::Output::Output()
319    : mEncoderReachedEOS(false),
320      mErrorCode(OK) {
321}
322
323// static
324sp<MediaCodecSource> MediaCodecSource::Create(
325        const sp<ALooper> &looper,
326        const sp<AMessage> &format,
327        const sp<MediaSource> &source,
328        const sp<IGraphicBufferConsumer> &consumer,
329        uint32_t flags) {
330    sp<MediaCodecSource> mediaSource =
331            new MediaCodecSource(looper, format, source, consumer, flags);
332
333    if (mediaSource->init() == OK) {
334        return mediaSource;
335    }
336    return NULL;
337}
338
339void MediaCodecSource::setInputBufferTimeOffset(int64_t timeOffsetUs) {
340    sp<AMessage> msg = new AMessage(kWhatSetInputBufferTimeOffset, mReflector);
341    msg->setInt64("time-offset-us", timeOffsetUs);
342    postSynchronouslyAndReturnError(msg);
343}
344
345status_t MediaCodecSource::start(MetaData* params) {
346    sp<AMessage> msg = new AMessage(kWhatStart, mReflector);
347    msg->setObject("meta", params);
348    return postSynchronouslyAndReturnError(msg);
349}
350
351status_t MediaCodecSource::stop() {
352    sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
353    return postSynchronouslyAndReturnError(msg);
354}
355
356status_t MediaCodecSource::pause() {
357    (new AMessage(kWhatPause, mReflector))->post();
358    return OK;
359}
360
361sp<MetaData> MediaCodecSource::getFormat() {
362    Mutexed<sp<MetaData>>::Locked meta(mMeta);
363    return *meta;
364}
365
366sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {
367    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
368    return mGraphicBufferProducer;
369}
370
371status_t MediaCodecSource::read(
372        MediaBuffer** buffer, const ReadOptions* /* options */) {
373    Mutexed<Output>::Locked output(mOutput);
374
375    *buffer = NULL;
376    while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
377        output.waitForCondition(output->mCond);
378    }
379    if (!output->mEncoderReachedEOS) {
380        *buffer = *output->mBufferQueue.begin();
381        output->mBufferQueue.erase(output->mBufferQueue.begin());
382        return OK;
383    }
384    return output->mErrorCode;
385}
386
387void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) {
388    buffer->setObserver(0);
389    buffer->release();
390}
391
392MediaCodecSource::MediaCodecSource(
393        const sp<ALooper> &looper,
394        const sp<AMessage> &outputFormat,
395        const sp<MediaSource> &source,
396        const sp<IGraphicBufferConsumer> &consumer,
397        uint32_t flags)
398    : mLooper(looper),
399      mOutputFormat(outputFormat),
400      mMeta(new MetaData),
401      mFlags(flags),
402      mIsVideo(false),
403      mStarted(false),
404      mStopping(false),
405      mDoMoreWorkPending(false),
406      mSetEncoderFormat(false),
407      mEncoderFormat(0),
408      mEncoderDataSpace(0),
409      mGraphicBufferConsumer(consumer),
410      mInputBufferTimeOffsetUs(0),
411      mFirstSampleTimeUs(-1ll),
412      mGeneration(0) {
413    CHECK(mLooper != NULL);
414
415    AString mime;
416    CHECK(mOutputFormat->findString("mime", &mime));
417
418    if (!strncasecmp("video/", mime.c_str(), 6)) {
419        mIsVideo = true;
420    }
421
422    if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
423        mPuller = new Puller(source);
424    }
425}
426
427MediaCodecSource::~MediaCodecSource() {
428    releaseEncoder();
429
430    mCodecLooper->stop();
431    mLooper->unregisterHandler(mReflector->id());
432}
433
434status_t MediaCodecSource::init() {
435    status_t err = initEncoder();
436
437    if (err != OK) {
438        releaseEncoder();
439    }
440
441    return err;
442}
443
444status_t MediaCodecSource::initEncoder() {
445    mReflector = new AHandlerReflector<MediaCodecSource>(this);
446    mLooper->registerHandler(mReflector);
447
448    mCodecLooper = new ALooper;
449    mCodecLooper->setName("codec_looper");
450    mCodecLooper->start();
451
452    if (mFlags & FLAG_USE_SURFACE_INPUT) {
453        mOutputFormat->setInt32("create-input-buffers-suspended", 1);
454    }
455
456    AString outputMIME;
457    CHECK(mOutputFormat->findString("mime", &outputMIME));
458
459    Vector<AString> matchingCodecs;
460    MediaCodecList::findMatchingCodecs(
461            outputMIME.c_str(), true /* encoder */,
462            ((mFlags & FLAG_PREFER_SOFTWARE_CODEC) ? MediaCodecList::kPreferSoftwareCodecs : 0),
463            &matchingCodecs);
464
465    status_t err = NO_INIT;
466    for (size_t ix = 0; ix < matchingCodecs.size(); ++ix) {
467        mEncoder = MediaCodec::CreateByComponentName(
468                mCodecLooper, matchingCodecs[ix]);
469
470        if (mEncoder == NULL) {
471            continue;
472        }
473
474        ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str());
475
476        mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
477        mEncoder->setCallback(mEncoderActivityNotify);
478
479        err = mEncoder->configure(
480                    mOutputFormat,
481                    NULL /* nativeWindow */,
482                    NULL /* crypto */,
483                    MediaCodec::CONFIGURE_FLAG_ENCODE);
484
485        if (err == OK) {
486            break;
487        }
488        mEncoder->release();
489        mEncoder = NULL;
490    }
491
492    if (err != OK) {
493        return err;
494    }
495
496    mEncoder->getOutputFormat(&mOutputFormat);
497    sp<MetaData> meta = new MetaData;
498    convertMessageToMetaData(mOutputFormat, meta);
499    mMeta.lock().set(meta);
500
501    if (mFlags & FLAG_USE_SURFACE_INPUT) {
502        CHECK(mIsVideo);
503
504        if (mGraphicBufferConsumer != NULL) {
505            // When using persistent surface, we are only interested in the
506            // consumer, but have to use PersistentSurface as a wrapper to
507            // pass consumer over messages (similar to BufferProducerWrapper)
508            err = mEncoder->setInputSurface(
509                    new PersistentSurface(NULL, mGraphicBufferConsumer));
510        } else {
511            err = mEncoder->createInputSurface(&mGraphicBufferProducer);
512        }
513
514        if (err != OK) {
515            return err;
516        }
517    }
518
519    sp<AMessage> inputFormat;
520    int32_t usingSwReadOften;
521    mSetEncoderFormat = false;
522    if (mEncoder->getInputFormat(&inputFormat) == OK) {
523        mSetEncoderFormat = true;
524        if (inputFormat->findInt32("using-sw-read-often", &usingSwReadOften)
525                && usingSwReadOften) {
526            // this is a SW encoder; signal source to allocate SW readable buffers
527            mEncoderFormat = kDefaultSwVideoEncoderFormat;
528        } else {
529            mEncoderFormat = kDefaultHwVideoEncoderFormat;
530        }
531        if (!inputFormat->findInt32("android._dataspace", &mEncoderDataSpace)) {
532            mEncoderDataSpace = kDefaultVideoEncoderDataSpace;
533        }
534        ALOGV("setting dataspace %#x, format %#x", mEncoderDataSpace, mEncoderFormat);
535    }
536
537    err = mEncoder->start();
538
539    if (err != OK) {
540        return err;
541    }
542
543    {
544        Mutexed<Output>::Locked output(mOutput);
545        output->mEncoderReachedEOS = false;
546        output->mErrorCode = OK;
547    }
548
549    return OK;
550}
551
552void MediaCodecSource::releaseEncoder() {
553    if (mEncoder == NULL) {
554        return;
555    }
556
557    mEncoder->release();
558    mEncoder.clear();
559}
560
561status_t MediaCodecSource::postSynchronouslyAndReturnError(
562        const sp<AMessage> &msg) {
563    sp<AMessage> response;
564    status_t err = msg->postAndAwaitResponse(&response);
565
566    if (err != OK) {
567        return err;
568    }
569
570    if (!response->findInt32("err", &err)) {
571        err = OK;
572    }
573
574    return err;
575}
576
577void MediaCodecSource::signalEOS(status_t err) {
578    bool reachedEOS = false;
579    {
580        Mutexed<Output>::Locked output(mOutput);
581        reachedEOS = output->mEncoderReachedEOS;
582        if (!reachedEOS) {
583            ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
584            // release all unread media buffers
585            for (List<MediaBuffer*>::iterator it = output->mBufferQueue.begin();
586                    it != output->mBufferQueue.end(); it++) {
587                (*it)->release();
588            }
589            output->mBufferQueue.clear();
590            output->mEncoderReachedEOS = true;
591            output->mErrorCode = err;
592            output->mCond.signal();
593
594            reachedEOS = true;
595            output.unlock();
596            releaseEncoder();
597        }
598    }
599
600    if (mStopping && reachedEOS) {
601        ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
602        mPuller->stopSource();
603        ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
604        // posting reply to everyone that's waiting
605        List<sp<AReplyToken>>::iterator it;
606        for (it = mStopReplyIDQueue.begin();
607                it != mStopReplyIDQueue.end(); it++) {
608            (new AMessage)->postReply(*it);
609        }
610        mStopReplyIDQueue.clear();
611        mStopping = false;
612        ++mGeneration;
613    }
614}
615
616void MediaCodecSource::suspend() {
617    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
618    if (mEncoder != NULL) {
619        sp<AMessage> params = new AMessage;
620        params->setInt32("drop-input-frames", true);
621        mEncoder->setParameters(params);
622    }
623}
624
625void MediaCodecSource::resume(int64_t skipFramesBeforeUs) {
626    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
627    if (mEncoder != NULL) {
628        sp<AMessage> params = new AMessage;
629        params->setInt32("drop-input-frames", false);
630        if (skipFramesBeforeUs > 0) {
631            params->setInt64("skip-frames-before", skipFramesBeforeUs);
632        }
633        mEncoder->setParameters(params);
634    }
635}
636
637status_t MediaCodecSource::feedEncoderInputBuffers() {
638    MediaBuffer* mbuf = NULL;
639    while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
640        size_t bufferIndex = *mAvailEncoderInputIndices.begin();
641        mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
642
643        int64_t timeUs = 0ll;
644        uint32_t flags = 0;
645        size_t size = 0;
646
647        if (mbuf != NULL) {
648            CHECK(mbuf->meta_data()->findInt64(kKeyTime, &timeUs));
649            timeUs += mInputBufferTimeOffsetUs;
650
651            // Due to the extra delay adjustment at the beginning of start/resume,
652            // the adjusted timeUs may be negative if MediaCodecSource goes into pause
653            // state before feeding any buffers to the encoder. Drop the buffer in this
654            // case.
655            if (timeUs < 0) {
656                mbuf->release();
657                return OK;
658            }
659
660            // push decoding time for video, or drift time for audio
661            if (mIsVideo) {
662                mDecodingTimeQueue.push_back(timeUs);
663            } else {
664#if DEBUG_DRIFT_TIME
665                if (mFirstSampleTimeUs < 0ll) {
666                    mFirstSampleTimeUs = timeUs;
667                }
668
669                int64_t driftTimeUs = 0;
670                if (mbuf->meta_data()->findInt64(kKeyDriftTime, &driftTimeUs)
671                        && driftTimeUs) {
672                    driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs;
673                }
674                mDriftTimeQueue.push_back(driftTimeUs);
675#endif // DEBUG_DRIFT_TIME
676            }
677
678            sp<ABuffer> inbuf;
679            status_t err = mEncoder->getInputBuffer(bufferIndex, &inbuf);
680            if (err != OK || inbuf == NULL) {
681                mbuf->release();
682                signalEOS();
683                break;
684            }
685
686            size = mbuf->size();
687
688            memcpy(inbuf->data(), mbuf->data(), size);
689
690            if (mIsVideo) {
691                // video encoder will release MediaBuffer when done
692                // with underlying data.
693                inbuf->setMediaBufferBase(mbuf);
694            } else {
695                mbuf->release();
696            }
697        } else {
698            flags = MediaCodec::BUFFER_FLAG_EOS;
699        }
700
701        status_t err = mEncoder->queueInputBuffer(
702                bufferIndex, 0, size, timeUs, flags);
703
704        if (err != OK) {
705            return err;
706        }
707    }
708
709    return OK;
710}
711
712status_t MediaCodecSource::onStart(MetaData *params) {
713    if (mStopping) {
714        ALOGE("Failed to start while we're stopping");
715        return INVALID_OPERATION;
716    }
717
718    if (mStarted) {
719        ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio");
720        if (mIsVideo) {
721            mEncoder->requestIDRFrame();
722        }
723        if (mFlags & FLAG_USE_SURFACE_INPUT) {
724            resume();
725        } else {
726            CHECK(mPuller != NULL);
727            mPuller->resume();
728        }
729        return OK;
730    }
731
732    ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio");
733
734    status_t err = OK;
735
736    if (mFlags & FLAG_USE_SURFACE_INPUT) {
737        int64_t startTimeUs;
738        if (!params || !params->findInt64(kKeyTime, &startTimeUs)) {
739            startTimeUs = -1ll;
740        }
741        resume(startTimeUs);
742    } else {
743        CHECK(mPuller != NULL);
744        sp<MetaData> meta = params;
745        if (mSetEncoderFormat) {
746            if (meta == NULL) {
747                meta = new MetaData;
748            }
749            meta->setInt32(kKeyPixelFormat, mEncoderFormat);
750            meta->setInt32(kKeyColorSpace, mEncoderDataSpace);
751        }
752
753        sp<AMessage> notify = new AMessage(kWhatPullerNotify, mReflector);
754        err = mPuller->start(meta.get(), notify);
755        if (err != OK) {
756            return err;
757        }
758    }
759
760    ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio");
761
762    mStarted = true;
763    return OK;
764}
765
766void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
767    switch (msg->what()) {
768    case kWhatPullerNotify:
769    {
770        int32_t eos = 0;
771        if (msg->findInt32("eos", &eos) && eos) {
772            ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
773            signalEOS();
774            break;
775        }
776
777        if (mEncoder == NULL) {
778            ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
779            break;
780        }
781
782        feedEncoderInputBuffers();
783        break;
784    }
785    case kWhatEncoderActivity:
786    {
787        if (mEncoder == NULL) {
788            break;
789        }
790
791        int32_t cbID;
792        CHECK(msg->findInt32("callbackID", &cbID));
793        if (cbID == MediaCodec::CB_INPUT_AVAILABLE) {
794            int32_t index;
795            CHECK(msg->findInt32("index", &index));
796
797            mAvailEncoderInputIndices.push_back(index);
798            feedEncoderInputBuffers();
799        } else if (cbID == MediaCodec::CB_OUTPUT_FORMAT_CHANGED) {
800            status_t err = mEncoder->getOutputFormat(&mOutputFormat);
801            if (err != OK) {
802                signalEOS(err);
803                break;
804            }
805            sp<MetaData> meta = new MetaData;
806            convertMessageToMetaData(mOutputFormat, meta);
807            mMeta.lock().set(meta);
808        } else if (cbID == MediaCodec::CB_OUTPUT_AVAILABLE) {
809            int32_t index;
810            size_t offset;
811            size_t size;
812            int64_t timeUs;
813            int32_t flags;
814
815            CHECK(msg->findInt32("index", &index));
816            CHECK(msg->findSize("offset", &offset));
817            CHECK(msg->findSize("size", &size));
818            CHECK(msg->findInt64("timeUs", &timeUs));
819            CHECK(msg->findInt32("flags", &flags));
820
821            if (flags & MediaCodec::BUFFER_FLAG_EOS) {
822                mEncoder->releaseOutputBuffer(index);
823                signalEOS();
824                break;
825            }
826
827            sp<ABuffer> outbuf;
828            status_t err = mEncoder->getOutputBuffer(index, &outbuf);
829            if (err != OK || outbuf == NULL) {
830                signalEOS();
831                break;
832            }
833
834            MediaBuffer *mbuf = new MediaBuffer(outbuf->size());
835            memcpy(mbuf->data(), outbuf->data(), outbuf->size());
836
837            if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) {
838                if (mIsVideo) {
839                    int64_t decodingTimeUs;
840                    if (mFlags & FLAG_USE_SURFACE_INPUT) {
841                        // Time offset is not applied at
842                        // feedEncoderInputBuffer() in surface input case.
843                        timeUs += mInputBufferTimeOffsetUs;
844
845                        // Due to the extra delay adjustment at the beginning of
846                        // start/resume, the adjusted timeUs may be negative if
847                        // MediaCodecSource goes into pause state before feeding
848                        // any buffers to the encoder. Drop the buffer in this case.
849                        if (timeUs < 0) {
850                            mEncoder->releaseOutputBuffer(index);
851                            break;
852                        }
853
854                        // TODO:
855                        // Decoding time for surface source is unavailable,
856                        // use presentation time for now. May need to move
857                        // this logic into MediaCodec.
858                        decodingTimeUs = timeUs;
859                    } else {
860                        CHECK(!mDecodingTimeQueue.empty());
861                        decodingTimeUs = *(mDecodingTimeQueue.begin());
862                        mDecodingTimeQueue.erase(mDecodingTimeQueue.begin());
863                    }
864                    mbuf->meta_data()->setInt64(kKeyDecodingTime, decodingTimeUs);
865
866                    ALOGV("[video] time %" PRId64 " us (%.2f secs), dts/pts diff %" PRId64,
867                            timeUs, timeUs / 1E6, decodingTimeUs - timeUs);
868                } else {
869                    int64_t driftTimeUs = 0;
870#if DEBUG_DRIFT_TIME
871                    CHECK(!mDriftTimeQueue.empty());
872                    driftTimeUs = *(mDriftTimeQueue.begin());
873                    mDriftTimeQueue.erase(mDriftTimeQueue.begin());
874                    mbuf->meta_data()->setInt64(kKeyDriftTime, driftTimeUs);
875#endif // DEBUG_DRIFT_TIME
876                    ALOGV("[audio] time %" PRId64 " us (%.2f secs), drift %" PRId64,
877                            timeUs, timeUs / 1E6, driftTimeUs);
878                }
879                mbuf->meta_data()->setInt64(kKeyTime, timeUs);
880            } else {
881                mbuf->meta_data()->setInt32(kKeyIsCodecConfig, true);
882            }
883            if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) {
884                mbuf->meta_data()->setInt32(kKeyIsSyncFrame, true);
885            }
886            mbuf->setObserver(this);
887            mbuf->add_ref();
888
889            {
890                Mutexed<Output>::Locked output(mOutput);
891                output->mBufferQueue.push_back(mbuf);
892                output->mCond.signal();
893            }
894
895            mEncoder->releaseOutputBuffer(index);
896       } else if (cbID == MediaCodec::CB_ERROR) {
897            status_t err;
898            CHECK(msg->findInt32("err", &err));
899            ALOGE("Encoder (%s) reported error : 0x%x",
900                    mIsVideo ? "video" : "audio", err);
901            signalEOS();
902       }
903       break;
904    }
905    case kWhatStart:
906    {
907        sp<AReplyToken> replyID;
908        CHECK(msg->senderAwaitsResponse(&replyID));
909
910        sp<RefBase> obj;
911        CHECK(msg->findObject("meta", &obj));
912        MetaData *params = static_cast<MetaData *>(obj.get());
913
914        sp<AMessage> response = new AMessage;
915        response->setInt32("err", onStart(params));
916        response->postReply(replyID);
917        break;
918    }
919    case kWhatStop:
920    {
921        ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio");
922
923        sp<AReplyToken> replyID;
924        CHECK(msg->senderAwaitsResponse(&replyID));
925
926        if (mOutput.lock()->mEncoderReachedEOS) {
927            // if we already reached EOS, reply and return now
928            ALOGI("encoder (%s) already stopped",
929                    mIsVideo ? "video" : "audio");
930            (new AMessage)->postReply(replyID);
931            break;
932        }
933
934        mStopReplyIDQueue.push_back(replyID);
935        if (mStopping) {
936            // nothing to do if we're already stopping, reply will be posted
937            // to all when we're stopped.
938            break;
939        }
940
941        mStopping = true;
942
943        // if using surface, signal source EOS and wait for EOS to come back.
944        // otherwise, stop puller (which also clears the input buffer queue)
945        // and wait for the EOS message. We cannot call source->stop() because
946        // the encoder may still be processing input buffers.
947        if (mFlags & FLAG_USE_SURFACE_INPUT) {
948            mEncoder->signalEndOfInputStream();
949        } else {
950            mPuller->stop();
951        }
952
953        // complete stop even if encoder/puller stalled
954        sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
955        timeoutMsg->setInt32("generation", mGeneration);
956        timeoutMsg->post(kStopTimeoutUs);
957        break;
958    }
959
960    case kWhatStopStalled:
961    {
962        int32_t generation;
963        CHECK(msg->findInt32("generation", &generation));
964        if (generation != mGeneration) {
965             break;
966        }
967
968        if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
969            ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
970            mPuller->interruptSource();
971            ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
972        }
973        signalEOS();
974    }
975
976    case kWhatPause:
977    {
978        if (mFlags & FLAG_USE_SURFACE_INPUT) {
979            suspend();
980        } else {
981            CHECK(mPuller != NULL);
982            mPuller->pause();
983        }
984        break;
985    }
986    case kWhatSetInputBufferTimeOffset:
987    {
988        sp<AReplyToken> replyID;
989        CHECK(msg->senderAwaitsResponse(&replyID));
990
991        CHECK(msg->findInt64("time-offset-us", &mInputBufferTimeOffsetUs));
992
993        sp<AMessage> response = new AMessage;
994        response->postReply(replyID);
995        break;
996    }
997    default:
998        TRESPASS();
999    }
1000}
1001
1002} // namespace android
1003