1/*
2 * Copyright 2014, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MediaCodecSource"
19#define DEBUG_DRIFT_TIME 0
20
21#include <inttypes.h>
22
23#include <gui/IGraphicBufferProducer.h>
24#include <gui/Surface.h>
25#include <media/ICrypto.h>
26#include <media/MediaCodecBuffer.h>
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/ALooper.h>
30#include <media/stagefright/foundation/AMessage.h>
31#include <media/stagefright/MediaBuffer.h>
32#include <media/stagefright/MediaCodec.h>
33#include <media/stagefright/MediaCodecList.h>
34#include <media/stagefright/MediaCodecSource.h>
35#include <media/stagefright/MediaErrors.h>
36#include <media/stagefright/MediaSource.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40namespace android {
41
42const int32_t kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
43const int32_t kDefaultHwVideoEncoderFormat = HAL_PIXEL_FORMAT_IMPLEMENTATION_DEFINED;
44const int32_t kDefaultVideoEncoderDataSpace = HAL_DATASPACE_V0_BT709;
45
46const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder
47
48struct MediaCodecSource::Puller : public AHandler {
49    explicit Puller(const sp<MediaSource> &source);
50
51    void interruptSource();
52    status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
53    void stop();
54    void stopSource();
55    void pause();
56    void resume();
57
58    bool readBuffer(MediaBuffer **buffer);
59
60protected:
61    virtual void onMessageReceived(const sp<AMessage> &msg);
62    virtual ~Puller();
63
64private:
65    enum {
66        kWhatStart = 'msta',
67        kWhatStop,
68        kWhatPull,
69    };
70
71    sp<MediaSource> mSource;
72    sp<AMessage> mNotify;
73    sp<ALooper> mLooper;
74    bool mIsAudio;
75
76    struct Queue {
77        Queue()
78            : mReadPendingSince(0),
79              mPaused(false),
80              mPulling(false) { }
81        int64_t mReadPendingSince;
82        bool mPaused;
83        bool mPulling;
84        Vector<MediaBuffer *> mReadBuffers;
85
86        void flush();
87        // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
88        // buffer from front of the queue, place it into *|buffer| and return true.
89        bool readBuffer(MediaBuffer **buffer);
90        // add a buffer to the back of the queue
91        void pushBuffer(MediaBuffer *mbuf);
92    };
93    Mutexed<Queue> mQueue;
94
95    status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
96    void schedulePull();
97    void handleEOS();
98
99    DISALLOW_EVIL_CONSTRUCTORS(Puller);
100};
101
102MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
103    : mSource(source),
104      mLooper(new ALooper()),
105      mIsAudio(false)
106{
107    sp<MetaData> meta = source->getFormat();
108    const char *mime;
109    CHECK(meta->findCString(kKeyMIMEType, &mime));
110
111    mIsAudio = !strncasecmp(mime, "audio/", 6);
112
113    mLooper->setName("pull_looper");
114}
115
116MediaCodecSource::Puller::~Puller() {
117    mLooper->unregisterHandler(id());
118    mLooper->stop();
119}
120
121void MediaCodecSource::Puller::Queue::pushBuffer(MediaBuffer *mbuf) {
122    mReadBuffers.push_back(mbuf);
123}
124
125bool MediaCodecSource::Puller::Queue::readBuffer(MediaBuffer **mbuf) {
126    if (mReadBuffers.empty()) {
127        *mbuf = NULL;
128        return false;
129    }
130    *mbuf = *mReadBuffers.begin();
131    mReadBuffers.erase(mReadBuffers.begin());
132    return true;
133}
134
135void MediaCodecSource::Puller::Queue::flush() {
136    MediaBuffer *mbuf;
137    while (readBuffer(&mbuf)) {
138        // there are no null buffers in the queue
139        mbuf->release();
140    }
141}
142
143bool MediaCodecSource::Puller::readBuffer(MediaBuffer **mbuf) {
144    Mutexed<Queue>::Locked queue(mQueue);
145    return queue->readBuffer(mbuf);
146}
147
148status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
149        const sp<AMessage> &msg) {
150    sp<AMessage> response;
151    status_t err = msg->postAndAwaitResponse(&response);
152
153    if (err != OK) {
154        return err;
155    }
156
157    if (!response->findInt32("err", &err)) {
158        err = OK;
159    }
160
161    return err;
162}
163
164status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> &notify) {
165    ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
166    mLooper->start(
167            false /* runOnCallingThread */,
168            false /* canCallJava */,
169            PRIORITY_AUDIO);
170    mLooper->registerHandler(this);
171    mNotify = notify;
172
173    sp<AMessage> msg = new AMessage(kWhatStart, this);
174    msg->setObject("meta", meta);
175    return postSynchronouslyAndReturnError(msg);
176}
177
178void MediaCodecSource::Puller::stop() {
179    bool interrupt = false;
180    {
181        // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
182        // stop.
183        Mutexed<Queue>::Locked queue(mQueue);
184        queue->mPulling = false;
185        interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
186        queue->flush(); // flush any unprocessed pulled buffers
187    }
188
189    if (interrupt) {
190        interruptSource();
191    }
192}
193
194void MediaCodecSource::Puller::interruptSource() {
195    // call source->stop if read has been pending for over a second
196    // We have to call this outside the looper as looper is pending on the read.
197    mSource->stop();
198}
199
200void MediaCodecSource::Puller::stopSource() {
201    sp<AMessage> msg = new AMessage(kWhatStop, this);
202    (void)postSynchronouslyAndReturnError(msg);
203}
204
205void MediaCodecSource::Puller::pause() {
206    Mutexed<Queue>::Locked queue(mQueue);
207    queue->mPaused = true;
208}
209
210void MediaCodecSource::Puller::resume() {
211    Mutexed<Queue>::Locked queue(mQueue);
212    queue->mPaused = false;
213}
214
215void MediaCodecSource::Puller::schedulePull() {
216    (new AMessage(kWhatPull, this))->post();
217}
218
219void MediaCodecSource::Puller::handleEOS() {
220    ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
221    sp<AMessage> msg = mNotify->dup();
222    msg->setInt32("eos", 1);
223    msg->post();
224}
225
226void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
227    switch (msg->what()) {
228        case kWhatStart:
229        {
230            sp<RefBase> obj;
231            CHECK(msg->findObject("meta", &obj));
232
233            {
234                Mutexed<Queue>::Locked queue(mQueue);
235                queue->mPulling = true;
236            }
237
238            status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
239
240            if (err == OK) {
241                schedulePull();
242            }
243
244            sp<AMessage> response = new AMessage;
245            response->setInt32("err", err);
246
247            sp<AReplyToken> replyID;
248            CHECK(msg->senderAwaitsResponse(&replyID));
249            response->postReply(replyID);
250            break;
251        }
252
253        case kWhatStop:
254        {
255            mSource->stop();
256
257            sp<AMessage> response = new AMessage;
258            response->setInt32("err", OK);
259
260            sp<AReplyToken> replyID;
261            CHECK(msg->senderAwaitsResponse(&replyID));
262            response->postReply(replyID);
263            break;
264        }
265
266        case kWhatPull:
267        {
268            Mutexed<Queue>::Locked queue(mQueue);
269            queue->mReadPendingSince = ALooper::GetNowUs();
270            if (!queue->mPulling) {
271                handleEOS();
272                break;
273            }
274
275            queue.unlock();
276            MediaBuffer *mbuf = NULL;
277            status_t err = mSource->read(&mbuf);
278            queue.lock();
279
280            queue->mReadPendingSince = 0;
281            // if we need to discard buffer
282            if (!queue->mPulling || queue->mPaused || err != OK) {
283                if (mbuf != NULL) {
284                    mbuf->release();
285                    mbuf = NULL;
286                }
287                if (queue->mPulling && err == OK) {
288                    msg->post(); // if simply paused, keep pulling source
289                    break;
290                } else if (err == ERROR_END_OF_STREAM) {
291                    ALOGV("stream ended, mbuf %p", mbuf);
292                } else if (err != OK) {
293                    ALOGE("error %d reading stream.", err);
294                }
295            }
296
297            if (mbuf != NULL) {
298                queue->pushBuffer(mbuf);
299            }
300
301            queue.unlock();
302
303            if (mbuf != NULL) {
304                mNotify->post();
305                msg->post();
306            } else {
307                handleEOS();
308            }
309            break;
310        }
311
312        default:
313            TRESPASS();
314    }
315}
316
317MediaCodecSource::Output::Output()
318    : mEncoderReachedEOS(false),
319      mErrorCode(OK) {
320}
321
322// static
323sp<MediaCodecSource> MediaCodecSource::Create(
324        const sp<ALooper> &looper,
325        const sp<AMessage> &format,
326        const sp<MediaSource> &source,
327        const sp<PersistentSurface> &persistentSurface,
328        uint32_t flags) {
329    sp<MediaCodecSource> mediaSource = new MediaCodecSource(
330            looper, format, source, persistentSurface, flags);
331
332    if (mediaSource->init() == OK) {
333        return mediaSource;
334    }
335    return NULL;
336}
337
338status_t MediaCodecSource::setInputBufferTimeOffset(int64_t timeOffsetUs) {
339    sp<AMessage> msg = new AMessage(kWhatSetInputBufferTimeOffset, mReflector);
340    msg->setInt64("time-offset-us", timeOffsetUs);
341    return postSynchronouslyAndReturnError(msg);
342}
343
344int64_t MediaCodecSource::getFirstSampleSystemTimeUs() {
345    sp<AMessage> msg = new AMessage(kWhatGetFirstSampleSystemTimeUs, mReflector);
346    sp<AMessage> response;
347    msg->postAndAwaitResponse(&response);
348    int64_t timeUs;
349    if (!response->findInt64("time-us", &timeUs)) {
350        timeUs = -1ll;
351    }
352    return timeUs;
353}
354
355status_t MediaCodecSource::start(MetaData* params) {
356    sp<AMessage> msg = new AMessage(kWhatStart, mReflector);
357    msg->setObject("meta", params);
358    return postSynchronouslyAndReturnError(msg);
359}
360
361status_t MediaCodecSource::stop() {
362    sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
363    return postSynchronouslyAndReturnError(msg);
364}
365
366
367status_t MediaCodecSource::setStopStimeUs(int64_t stopTimeUs) {
368    if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
369        return OK;
370    }
371    sp<AMessage> msg = new AMessage(kWhatSetStopTimeOffset, mReflector);
372    msg->setInt64("stop-time-us", stopTimeUs);
373    return postSynchronouslyAndReturnError(msg);
374}
375
376status_t MediaCodecSource::pause(MetaData* params) {
377    sp<AMessage> msg = new AMessage(kWhatPause, mReflector);
378    msg->setObject("meta", params);
379    msg->post();
380    return OK;
381}
382
383sp<MetaData> MediaCodecSource::getFormat() {
384    Mutexed<sp<MetaData>>::Locked meta(mMeta);
385    return *meta;
386}
387
388sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {
389    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
390    return mGraphicBufferProducer;
391}
392
393status_t MediaCodecSource::read(
394        MediaBuffer** buffer, const ReadOptions* /* options */) {
395    Mutexed<Output>::Locked output(mOutput);
396
397    *buffer = NULL;
398    while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
399        output.waitForCondition(output->mCond);
400    }
401    if (!output->mEncoderReachedEOS) {
402        *buffer = *output->mBufferQueue.begin();
403        output->mBufferQueue.erase(output->mBufferQueue.begin());
404        return OK;
405    }
406    return output->mErrorCode;
407}
408
409void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) {
410    buffer->setObserver(0);
411    buffer->release();
412}
413
414MediaCodecSource::MediaCodecSource(
415        const sp<ALooper> &looper,
416        const sp<AMessage> &outputFormat,
417        const sp<MediaSource> &source,
418        const sp<PersistentSurface> &persistentSurface,
419        uint32_t flags)
420    : mLooper(looper),
421      mOutputFormat(outputFormat),
422      mMeta(new MetaData),
423      mFlags(flags),
424      mIsVideo(false),
425      mStarted(false),
426      mStopping(false),
427      mDoMoreWorkPending(false),
428      mSetEncoderFormat(false),
429      mEncoderFormat(0),
430      mEncoderDataSpace(0),
431      mPersistentSurface(persistentSurface),
432      mInputBufferTimeOffsetUs(0),
433      mFirstSampleSystemTimeUs(-1ll),
434      mPausePending(false),
435      mFirstSampleTimeUs(-1ll),
436      mGeneration(0) {
437    CHECK(mLooper != NULL);
438
439    AString mime;
440    CHECK(mOutputFormat->findString("mime", &mime));
441
442    if (!strncasecmp("video/", mime.c_str(), 6)) {
443        mIsVideo = true;
444    }
445
446    if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
447        mPuller = new Puller(source);
448    }
449}
450
451MediaCodecSource::~MediaCodecSource() {
452    releaseEncoder();
453
454    mCodecLooper->stop();
455    mLooper->unregisterHandler(mReflector->id());
456}
457
458status_t MediaCodecSource::init() {
459    status_t err = initEncoder();
460
461    if (err != OK) {
462        releaseEncoder();
463    }
464
465    return err;
466}
467
468status_t MediaCodecSource::initEncoder() {
469    mReflector = new AHandlerReflector<MediaCodecSource>(this);
470    mLooper->registerHandler(mReflector);
471
472    mCodecLooper = new ALooper;
473    mCodecLooper->setName("codec_looper");
474    mCodecLooper->start();
475
476    if (mFlags & FLAG_USE_SURFACE_INPUT) {
477        mOutputFormat->setInt32("create-input-buffers-suspended", 1);
478    }
479
480    AString outputMIME;
481    CHECK(mOutputFormat->findString("mime", &outputMIME));
482
483    Vector<AString> matchingCodecs;
484    MediaCodecList::findMatchingCodecs(
485            outputMIME.c_str(), true /* encoder */,
486            ((mFlags & FLAG_PREFER_SOFTWARE_CODEC) ? MediaCodecList::kPreferSoftwareCodecs : 0),
487            &matchingCodecs);
488
489    status_t err = NO_INIT;
490    for (size_t ix = 0; ix < matchingCodecs.size(); ++ix) {
491        mEncoder = MediaCodec::CreateByComponentName(
492                mCodecLooper, matchingCodecs[ix]);
493
494        if (mEncoder == NULL) {
495            continue;
496        }
497
498        ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str());
499
500        mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
501        mEncoder->setCallback(mEncoderActivityNotify);
502
503        err = mEncoder->configure(
504                    mOutputFormat,
505                    NULL /* nativeWindow */,
506                    NULL /* crypto */,
507                    MediaCodec::CONFIGURE_FLAG_ENCODE);
508
509        if (err == OK) {
510            break;
511        }
512        mEncoder->release();
513        mEncoder = NULL;
514    }
515
516    if (err != OK) {
517        return err;
518    }
519
520    mEncoder->getOutputFormat(&mOutputFormat);
521    sp<MetaData> meta = new MetaData;
522    convertMessageToMetaData(mOutputFormat, meta);
523    mMeta.lock().set(meta);
524
525    if (mFlags & FLAG_USE_SURFACE_INPUT) {
526        CHECK(mIsVideo);
527
528        if (mPersistentSurface != NULL) {
529            // When using persistent surface, we are only interested in the
530            // consumer, but have to use PersistentSurface as a wrapper to
531            // pass consumer over messages (similar to BufferProducerWrapper)
532            err = mEncoder->setInputSurface(mPersistentSurface);
533        } else {
534            err = mEncoder->createInputSurface(&mGraphicBufferProducer);
535        }
536
537        if (err != OK) {
538            return err;
539        }
540    }
541
542    sp<AMessage> inputFormat;
543    int32_t usingSwReadOften;
544    mSetEncoderFormat = false;
545    if (mEncoder->getInputFormat(&inputFormat) == OK) {
546        mSetEncoderFormat = true;
547        if (inputFormat->findInt32("using-sw-read-often", &usingSwReadOften)
548                && usingSwReadOften) {
549            // this is a SW encoder; signal source to allocate SW readable buffers
550            mEncoderFormat = kDefaultSwVideoEncoderFormat;
551        } else {
552            mEncoderFormat = kDefaultHwVideoEncoderFormat;
553        }
554        if (!inputFormat->findInt32("android._dataspace", &mEncoderDataSpace)) {
555            mEncoderDataSpace = kDefaultVideoEncoderDataSpace;
556        }
557        ALOGV("setting dataspace %#x, format %#x", mEncoderDataSpace, mEncoderFormat);
558    }
559
560    err = mEncoder->start();
561
562    if (err != OK) {
563        return err;
564    }
565
566    {
567        Mutexed<Output>::Locked output(mOutput);
568        output->mEncoderReachedEOS = false;
569        output->mErrorCode = OK;
570    }
571
572    return OK;
573}
574
575void MediaCodecSource::releaseEncoder() {
576    if (mEncoder == NULL) {
577        return;
578    }
579
580    mEncoder->release();
581    mEncoder.clear();
582}
583
584status_t MediaCodecSource::postSynchronouslyAndReturnError(
585        const sp<AMessage> &msg) {
586    sp<AMessage> response;
587    status_t err = msg->postAndAwaitResponse(&response);
588
589    if (err != OK) {
590        return err;
591    }
592
593    if (!response->findInt32("err", &err)) {
594        err = OK;
595    }
596
597    return err;
598}
599
600void MediaCodecSource::signalEOS(status_t err) {
601    bool reachedEOS = false;
602    {
603        Mutexed<Output>::Locked output(mOutput);
604        reachedEOS = output->mEncoderReachedEOS;
605        if (!reachedEOS) {
606            ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
607            // release all unread media buffers
608            for (List<MediaBuffer*>::iterator it = output->mBufferQueue.begin();
609                    it != output->mBufferQueue.end(); it++) {
610                (*it)->release();
611            }
612            output->mBufferQueue.clear();
613            output->mEncoderReachedEOS = true;
614            output->mErrorCode = err;
615            output->mCond.signal();
616
617            reachedEOS = true;
618            output.unlock();
619            releaseEncoder();
620        }
621    }
622
623    if (mStopping && reachedEOS) {
624        ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
625        mPuller->stopSource();
626        ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
627        // posting reply to everyone that's waiting
628        List<sp<AReplyToken>>::iterator it;
629        for (it = mStopReplyIDQueue.begin();
630                it != mStopReplyIDQueue.end(); it++) {
631            (new AMessage)->postReply(*it);
632        }
633        mStopReplyIDQueue.clear();
634        mStopping = false;
635        ++mGeneration;
636    }
637}
638
639void MediaCodecSource::resume(int64_t resumeStartTimeUs) {
640    CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
641    if (mEncoder != NULL) {
642        sp<AMessage> params = new AMessage;
643        params->setInt32("drop-input-frames", false);
644        if (resumeStartTimeUs > 0) {
645            params->setInt64("drop-start-time-us", resumeStartTimeUs);
646        }
647        mEncoder->setParameters(params);
648    }
649}
650
651status_t MediaCodecSource::feedEncoderInputBuffers() {
652    MediaBuffer* mbuf = NULL;
653    while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
654        size_t bufferIndex = *mAvailEncoderInputIndices.begin();
655        mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
656
657        int64_t timeUs = 0ll;
658        uint32_t flags = 0;
659        size_t size = 0;
660
661        if (mbuf != NULL) {
662            CHECK(mbuf->meta_data()->findInt64(kKeyTime, &timeUs));
663            if (mFirstSampleSystemTimeUs < 0ll) {
664                mFirstSampleSystemTimeUs = systemTime() / 1000;
665                if (mPausePending) {
666                    mPausePending = false;
667                    onPause(mFirstSampleSystemTimeUs);
668                    mbuf->release();
669                    mAvailEncoderInputIndices.push_back(bufferIndex);
670                    return OK;
671                }
672            }
673
674            timeUs += mInputBufferTimeOffsetUs;
675
676            // push decoding time for video, or drift time for audio
677            if (mIsVideo) {
678                mDecodingTimeQueue.push_back(timeUs);
679            } else {
680#if DEBUG_DRIFT_TIME
681                if (mFirstSampleTimeUs < 0ll) {
682                    mFirstSampleTimeUs = timeUs;
683                }
684                int64_t driftTimeUs = 0;
685                if (mbuf->meta_data()->findInt64(kKeyDriftTime, &driftTimeUs)
686                        && driftTimeUs) {
687                    driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs;
688                }
689                mDriftTimeQueue.push_back(driftTimeUs);
690#endif // DEBUG_DRIFT_TIME
691            }
692
693            sp<MediaCodecBuffer> inbuf;
694            status_t err = mEncoder->getInputBuffer(bufferIndex, &inbuf);
695
696            if (err != OK || inbuf == NULL || inbuf->data() == NULL
697                    || mbuf->data() == NULL || mbuf->size() == 0) {
698                mbuf->release();
699                signalEOS();
700                break;
701            }
702
703            size = mbuf->size();
704
705            memcpy(inbuf->data(), mbuf->data(), size);
706
707            if (mIsVideo) {
708                // video encoder will release MediaBuffer when done
709                // with underlying data.
710                inbuf->setMediaBufferBase(mbuf);
711            } else {
712                mbuf->release();
713            }
714        } else {
715            flags = MediaCodec::BUFFER_FLAG_EOS;
716        }
717
718        status_t err = mEncoder->queueInputBuffer(
719                bufferIndex, 0, size, timeUs, flags);
720
721        if (err != OK) {
722            return err;
723        }
724    }
725
726    return OK;
727}
728
729status_t MediaCodecSource::onStart(MetaData *params) {
730    if (mStopping) {
731        ALOGE("Failed to start while we're stopping");
732        return INVALID_OPERATION;
733    }
734    int64_t startTimeUs;
735    if (params == NULL || !params->findInt64(kKeyTime, &startTimeUs)) {
736        startTimeUs = -1ll;
737    }
738
739    if (mStarted) {
740        ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio");
741        if (mPausePending) {
742            mPausePending = false;
743            return OK;
744        }
745        if (mIsVideo) {
746            mEncoder->requestIDRFrame();
747        }
748        if (mFlags & FLAG_USE_SURFACE_INPUT) {
749            resume(startTimeUs);
750        } else {
751            CHECK(mPuller != NULL);
752            mPuller->resume();
753        }
754        return OK;
755    }
756
757    ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio");
758
759    status_t err = OK;
760
761    if (mFlags & FLAG_USE_SURFACE_INPUT) {
762        if (mEncoder != NULL) {
763            sp<AMessage> params = new AMessage;
764            params->setInt32("drop-input-frames", false);
765            if (startTimeUs >= 0) {
766                params->setInt64("skip-frames-before", startTimeUs);
767            }
768            mEncoder->setParameters(params);
769        }
770    } else {
771        CHECK(mPuller != NULL);
772        sp<MetaData> meta = params;
773        if (mSetEncoderFormat) {
774            if (meta == NULL) {
775                meta = new MetaData;
776            }
777            meta->setInt32(kKeyPixelFormat, mEncoderFormat);
778            meta->setInt32(kKeyColorSpace, mEncoderDataSpace);
779        }
780
781        sp<AMessage> notify = new AMessage(kWhatPullerNotify, mReflector);
782        err = mPuller->start(meta.get(), notify);
783        if (err != OK) {
784            return err;
785        }
786    }
787
788    ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio");
789
790    mStarted = true;
791    return OK;
792}
793
794void MediaCodecSource::onPause(int64_t pauseStartTimeUs) {
795    if ((mFlags & FLAG_USE_SURFACE_INPUT) && (mEncoder != NULL)) {
796        sp<AMessage> params = new AMessage;
797        params->setInt32("drop-input-frames", true);
798        params->setInt64("drop-start-time-us", pauseStartTimeUs);
799        mEncoder->setParameters(params);
800    } else {
801        CHECK(mPuller != NULL);
802        mPuller->pause();
803    }
804}
805
806void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
807    switch (msg->what()) {
808    case kWhatPullerNotify:
809    {
810        int32_t eos = 0;
811        if (msg->findInt32("eos", &eos) && eos) {
812            ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
813            signalEOS();
814            break;
815        }
816
817        if (mEncoder == NULL) {
818            ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
819            break;
820        }
821
822        feedEncoderInputBuffers();
823        break;
824    }
825    case kWhatEncoderActivity:
826    {
827        if (mEncoder == NULL) {
828            break;
829        }
830
831        int32_t cbID;
832        CHECK(msg->findInt32("callbackID", &cbID));
833        if (cbID == MediaCodec::CB_INPUT_AVAILABLE) {
834            int32_t index;
835            CHECK(msg->findInt32("index", &index));
836
837            mAvailEncoderInputIndices.push_back(index);
838            feedEncoderInputBuffers();
839        } else if (cbID == MediaCodec::CB_OUTPUT_FORMAT_CHANGED) {
840            status_t err = mEncoder->getOutputFormat(&mOutputFormat);
841            if (err != OK) {
842                signalEOS(err);
843                break;
844            }
845            sp<MetaData> meta = new MetaData;
846            convertMessageToMetaData(mOutputFormat, meta);
847            mMeta.lock().set(meta);
848        } else if (cbID == MediaCodec::CB_OUTPUT_AVAILABLE) {
849            int32_t index;
850            size_t offset;
851            size_t size;
852            int64_t timeUs;
853            int32_t flags;
854
855            CHECK(msg->findInt32("index", &index));
856            CHECK(msg->findSize("offset", &offset));
857            CHECK(msg->findSize("size", &size));
858            CHECK(msg->findInt64("timeUs", &timeUs));
859            CHECK(msg->findInt32("flags", &flags));
860
861            if (flags & MediaCodec::BUFFER_FLAG_EOS) {
862                mEncoder->releaseOutputBuffer(index);
863                signalEOS();
864                break;
865            }
866
867            sp<MediaCodecBuffer> outbuf;
868            status_t err = mEncoder->getOutputBuffer(index, &outbuf);
869            if (err != OK || outbuf == NULL || outbuf->data() == NULL
870                || outbuf->size() == 0) {
871                signalEOS();
872                break;
873            }
874
875            MediaBuffer *mbuf = new MediaBuffer(outbuf->size());
876            mbuf->setObserver(this);
877            mbuf->add_ref();
878
879            if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) {
880                if (mIsVideo) {
881                    int64_t decodingTimeUs;
882                    if (mFlags & FLAG_USE_SURFACE_INPUT) {
883                        if (mFirstSampleSystemTimeUs < 0ll) {
884                            mFirstSampleSystemTimeUs = systemTime() / 1000;
885                            if (mPausePending) {
886                                mPausePending = false;
887                                onPause(mFirstSampleSystemTimeUs);
888                                mbuf->release();
889                                break;
890                            }
891                        }
892                        // Timestamp offset is already adjusted in GraphicBufferSource.
893                        // GraphicBufferSource is supposed to discard samples
894                        // queued before start, and offset timeUs by start time
895                        CHECK_GE(timeUs, 0ll);
896                        // TODO:
897                        // Decoding time for surface source is unavailable,
898                        // use presentation time for now. May need to move
899                        // this logic into MediaCodec.
900                        decodingTimeUs = timeUs;
901                    } else {
902                        CHECK(!mDecodingTimeQueue.empty());
903                        decodingTimeUs = *(mDecodingTimeQueue.begin());
904                        mDecodingTimeQueue.erase(mDecodingTimeQueue.begin());
905                    }
906                    mbuf->meta_data()->setInt64(kKeyDecodingTime, decodingTimeUs);
907
908                    ALOGV("[video] time %" PRId64 " us (%.2f secs), dts/pts diff %" PRId64,
909                            timeUs, timeUs / 1E6, decodingTimeUs - timeUs);
910                } else {
911                    int64_t driftTimeUs = 0;
912#if DEBUG_DRIFT_TIME
913                    CHECK(!mDriftTimeQueue.empty());
914                    driftTimeUs = *(mDriftTimeQueue.begin());
915                    mDriftTimeQueue.erase(mDriftTimeQueue.begin());
916                    mbuf->meta_data()->setInt64(kKeyDriftTime, driftTimeUs);
917#endif // DEBUG_DRIFT_TIME
918                    ALOGV("[audio] time %" PRId64 " us (%.2f secs), drift %" PRId64,
919                            timeUs, timeUs / 1E6, driftTimeUs);
920                }
921                mbuf->meta_data()->setInt64(kKeyTime, timeUs);
922            } else {
923                mbuf->meta_data()->setInt64(kKeyTime, 0ll);
924                mbuf->meta_data()->setInt32(kKeyIsCodecConfig, true);
925            }
926            if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) {
927                mbuf->meta_data()->setInt32(kKeyIsSyncFrame, true);
928            }
929            memcpy(mbuf->data(), outbuf->data(), outbuf->size());
930
931            {
932                Mutexed<Output>::Locked output(mOutput);
933                output->mBufferQueue.push_back(mbuf);
934                output->mCond.signal();
935            }
936
937            mEncoder->releaseOutputBuffer(index);
938       } else if (cbID == MediaCodec::CB_ERROR) {
939            status_t err;
940            CHECK(msg->findInt32("err", &err));
941            ALOGE("Encoder (%s) reported error : 0x%x",
942                    mIsVideo ? "video" : "audio", err);
943            if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
944                mStopping = true;
945                mPuller->stop();
946            }
947            signalEOS();
948       }
949       break;
950    }
951    case kWhatStart:
952    {
953        sp<AReplyToken> replyID;
954        CHECK(msg->senderAwaitsResponse(&replyID));
955
956        sp<RefBase> obj;
957        CHECK(msg->findObject("meta", &obj));
958        MetaData *params = static_cast<MetaData *>(obj.get());
959
960        sp<AMessage> response = new AMessage;
961        response->setInt32("err", onStart(params));
962        response->postReply(replyID);
963        break;
964    }
965    case kWhatStop:
966    {
967        ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio");
968
969        sp<AReplyToken> replyID;
970        CHECK(msg->senderAwaitsResponse(&replyID));
971
972        if (mOutput.lock()->mEncoderReachedEOS) {
973            // if we already reached EOS, reply and return now
974            ALOGI("encoder (%s) already stopped",
975                    mIsVideo ? "video" : "audio");
976            (new AMessage)->postReply(replyID);
977            break;
978        }
979
980        mStopReplyIDQueue.push_back(replyID);
981        if (mStopping) {
982            // nothing to do if we're already stopping, reply will be posted
983            // to all when we're stopped.
984            break;
985        }
986
987        mStopping = true;
988
989        // if using surface, signal source EOS and wait for EOS to come back.
990        // otherwise, stop puller (which also clears the input buffer queue)
991        // and wait for the EOS message. We cannot call source->stop() because
992        // the encoder may still be processing input buffers.
993        if (mFlags & FLAG_USE_SURFACE_INPUT) {
994            mEncoder->signalEndOfInputStream();
995        } else {
996            mPuller->stop();
997        }
998
999        // complete stop even if encoder/puller stalled
1000        sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
1001        timeoutMsg->setInt32("generation", mGeneration);
1002        timeoutMsg->post(kStopTimeoutUs);
1003        break;
1004    }
1005
1006    case kWhatStopStalled:
1007    {
1008        int32_t generation;
1009        CHECK(msg->findInt32("generation", &generation));
1010        if (generation != mGeneration) {
1011             break;
1012        }
1013
1014        if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
1015            ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
1016            mPuller->interruptSource();
1017            ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
1018        }
1019        signalEOS();
1020        break;
1021    }
1022
1023    case kWhatPause:
1024    {
1025        if (mFirstSampleSystemTimeUs < 0) {
1026            mPausePending = true;
1027        } else {
1028            sp<RefBase> obj;
1029            CHECK(msg->findObject("meta", &obj));
1030            MetaData *params = static_cast<MetaData *>(obj.get());
1031            int64_t pauseStartTimeUs = -1;
1032            if (params == NULL || !params->findInt64(kKeyTime, &pauseStartTimeUs)) {
1033                pauseStartTimeUs = -1ll;
1034            }
1035            onPause(pauseStartTimeUs);
1036        }
1037        break;
1038    }
1039    case kWhatSetInputBufferTimeOffset:
1040    {
1041        sp<AReplyToken> replyID;
1042        CHECK(msg->senderAwaitsResponse(&replyID));
1043        status_t err = OK;
1044        CHECK(msg->findInt64("time-offset-us", &mInputBufferTimeOffsetUs));
1045
1046        // Propagate the timestamp offset to GraphicBufferSource.
1047        if (mFlags & FLAG_USE_SURFACE_INPUT) {
1048            sp<AMessage> params = new AMessage;
1049            params->setInt64("time-offset-us", mInputBufferTimeOffsetUs);
1050            err = mEncoder->setParameters(params);
1051        }
1052
1053        sp<AMessage> response = new AMessage;
1054        response->setInt32("err", err);
1055        response->postReply(replyID);
1056        break;
1057    }
1058    case kWhatSetStopTimeOffset:
1059    {
1060        sp<AReplyToken> replyID;
1061        CHECK(msg->senderAwaitsResponse(&replyID));
1062        status_t err = OK;
1063        int64_t stopTimeUs;
1064        CHECK(msg->findInt64("stop-time-us", &stopTimeUs));
1065
1066        // Propagate the timestamp offset to GraphicBufferSource.
1067        if (mFlags & FLAG_USE_SURFACE_INPUT) {
1068            sp<AMessage> params = new AMessage;
1069            params->setInt64("stop-time-us", stopTimeUs);
1070            err = mEncoder->setParameters(params);
1071        }
1072
1073        sp<AMessage> response = new AMessage;
1074        response->setInt32("err", err);
1075        response->postReply(replyID);
1076        break;
1077    }
1078    case kWhatGetFirstSampleSystemTimeUs:
1079    {
1080        sp<AReplyToken> replyID;
1081        CHECK(msg->senderAwaitsResponse(&replyID));
1082
1083        sp<AMessage> response = new AMessage;
1084        response->setInt64("time-us", mFirstSampleSystemTimeUs);
1085        response->postReply(replyID);
1086        break;
1087    }
1088    default:
1089        TRESPASS();
1090    }
1091}
1092
1093} // namespace android
1094