1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "BpMediaSource"
19#include <utils/Log.h>
20
21#include <inttypes.h>
22#include <stdint.h>
23#include <sys/types.h>
24
25#include <binder/Parcel.h>
26#include <media/IMediaSource.h>
27#include <media/stagefright/MediaBuffer.h>
28#include <media/stagefright/MediaBufferGroup.h>
29#include <media/stagefright/MediaSource.h>
30#include <media/stagefright/MetaData.h>
31
32namespace android {
33
34enum {
35    START = IBinder::FIRST_CALL_TRANSACTION,
36    STOP,
37    PAUSE,
38    GETFORMAT,
39    // READ, deprecated
40    READMULTIPLE,
41    RELEASE_BUFFER,
42    SUPPORT_NONBLOCKING_READ,
43};
44
45enum {
46    NULL_BUFFER,
47    SHARED_BUFFER,
48    INLINE_BUFFER,
49    SHARED_BUFFER_INDEX,
50};
51
52class RemoteMediaBufferWrapper : public MediaBuffer {
53public:
54    RemoteMediaBufferWrapper(const sp<IMemory> &mem)
55        : MediaBuffer(mem) {
56        ALOGV("RemoteMediaBufferWrapper: creating %p", this);
57    }
58
59protected:
60    virtual ~RemoteMediaBufferWrapper() {
61        // Release our interest in the MediaBuffer's shared memory.
62        int32_t old = addRemoteRefcount(-1);
63        ALOGV("RemoteMediaBufferWrapper: releasing %p, refcount %d", this, old - 1);
64        mMemory.clear(); // don't set the dead object flag.
65    }
66};
67
68class BpMediaSource : public BpInterface<IMediaSource> {
69public:
70    BpMediaSource(const sp<IBinder>& impl)
71        : BpInterface<IMediaSource>(impl), mBuffersSinceStop(0)
72    {
73    }
74
75    virtual status_t start(MetaData *params) {
76        ALOGV("start");
77        Parcel data, reply;
78        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
79        if (params) {
80            params->writeToParcel(data);
81        }
82        status_t ret = remote()->transact(START, data, &reply);
83        if (ret == NO_ERROR && params) {
84            ALOGW("ignoring potentially modified MetaData from start");
85            ALOGW("input:");
86            params->dumpToLog();
87            sp<MetaData> meta = MetaData::createFromParcel(reply);
88            ALOGW("output:");
89            meta->dumpToLog();
90        }
91        return ret;
92    }
93
94    virtual status_t stop() {
95        ALOGV("stop");
96        Parcel data, reply;
97        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
98        status_t status = remote()->transact(STOP, data, &reply);
99        mMemoryCache.reset();
100        mBuffersSinceStop = 0;
101        return status;
102    }
103
104    virtual sp<MetaData> getFormat() {
105        ALOGV("getFormat");
106        Parcel data, reply;
107        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
108        status_t ret = remote()->transact(GETFORMAT, data, &reply);
109        if (ret == NO_ERROR) {
110            mMetaData = MetaData::createFromParcel(reply);
111            return mMetaData;
112        }
113        return NULL;
114    }
115
116    virtual status_t read(MediaBuffer **buffer, const ReadOptions *options) {
117        Vector<MediaBuffer *> buffers;
118        status_t ret = readMultiple(&buffers, 1 /* maxNumBuffers */, options);
119        *buffer = buffers.size() == 0 ? nullptr : buffers[0];
120        ALOGV("read status %d, bufferCount %u, sinceStop %u",
121                ret, *buffer != nullptr, mBuffersSinceStop);
122        return ret;
123    }
124
125    virtual status_t readMultiple(
126            Vector<MediaBuffer *> *buffers, uint32_t maxNumBuffers, const ReadOptions *options) {
127        ALOGV("readMultiple");
128        if (buffers == NULL || !buffers->isEmpty()) {
129            return BAD_VALUE;
130        }
131        Parcel data, reply;
132        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
133        data.writeUint32(maxNumBuffers);
134        if (options != nullptr) {
135            data.writeByteArray(sizeof(*options), (uint8_t*) options);
136        }
137        status_t ret = remote()->transact(READMULTIPLE, data, &reply);
138        mMemoryCache.gc();
139        if (ret != NO_ERROR) {
140            return ret;
141        }
142        // wrap the returned data in a vector of MediaBuffers
143        int32_t buftype;
144        uint32_t bufferCount = 0;
145        while ((buftype = reply.readInt32()) != NULL_BUFFER) {
146            LOG_ALWAYS_FATAL_IF(bufferCount >= maxNumBuffers,
147                    "Received %u+ buffers and requested %u buffers",
148                    bufferCount + 1, maxNumBuffers);
149            MediaBuffer *buf;
150            if (buftype == SHARED_BUFFER || buftype == SHARED_BUFFER_INDEX) {
151                uint64_t index = reply.readUint64();
152                ALOGV("Received %s index %llu",
153                        buftype == SHARED_BUFFER ? "SHARED_BUFFER" : "SHARED_BUFFER_INDEX",
154                        (unsigned long long) index);
155                sp<IMemory> mem;
156                if (buftype == SHARED_BUFFER) {
157                    sp<IBinder> binder = reply.readStrongBinder();
158                    mem = interface_cast<IMemory>(binder);
159                    LOG_ALWAYS_FATAL_IF(mem.get() == nullptr,
160                            "Received NULL IMemory for shared buffer");
161                    mMemoryCache.insert(index, mem);
162                } else {
163                    mem = mMemoryCache.lookup(index);
164                    LOG_ALWAYS_FATAL_IF(mem.get() == nullptr,
165                            "Received invalid IMemory index for shared buffer: %llu",
166                            (unsigned long long)index);
167                }
168                size_t offset = reply.readInt32();
169                size_t length = reply.readInt32();
170                buf = new RemoteMediaBufferWrapper(mem);
171                buf->set_range(offset, length);
172                buf->meta_data()->updateFromParcel(reply);
173            } else { // INLINE_BUFFER
174                int32_t len = reply.readInt32();
175                ALOGV("INLINE_BUFFER status %d and len %d", ret, len);
176                buf = new MediaBuffer(len);
177                reply.read(buf->data(), len);
178                buf->meta_data()->updateFromParcel(reply);
179            }
180            buffers->push_back(buf);
181            ++bufferCount;
182            ++mBuffersSinceStop;
183        }
184        ret = reply.readInt32();
185        ALOGV("readMultiple status %d, bufferCount %u, sinceStop %u",
186                ret, bufferCount, mBuffersSinceStop);
187        return ret;
188    }
189
190    // Binder proxy adds readMultiple support.
191    virtual bool supportReadMultiple() {
192        return true;
193    }
194
195    virtual bool supportNonblockingRead() {
196        ALOGV("supportNonblockingRead");
197        Parcel data, reply;
198        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
199        status_t ret = remote()->transact(SUPPORT_NONBLOCKING_READ, data, &reply);
200        if (ret == NO_ERROR) {
201            return reply.readInt32() != 0;
202        }
203        return false;
204    }
205
206    virtual status_t pause() {
207        ALOGV("pause");
208        Parcel data, reply;
209        data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
210        return remote()->transact(PAUSE, data, &reply);
211    }
212
213    virtual status_t setBuffers(const Vector<MediaBuffer *> & buffers __unused) {
214        ALOGV("setBuffers NOT IMPLEMENTED");
215        return ERROR_UNSUPPORTED; // default
216    }
217
218private:
219
220    uint32_t mBuffersSinceStop; // Buffer tracking variable
221
222    // NuPlayer passes pointers-to-metadata around, so we use this to keep the metadata alive
223    // XXX: could we use this for caching, or does metadata change on the fly?
224    sp<MetaData> mMetaData;
225
226    // Cache all IMemory objects received from MediaExtractor.
227    // We gc IMemory objects that are no longer active (referenced by a MediaBuffer).
228
229    struct MemoryCache {
230        sp<IMemory> lookup(uint64_t index) {
231            auto p = mIndexToMemory.find(index);
232            if (p == mIndexToMemory.end()) {
233                ALOGE("cannot find index!");
234                return nullptr;
235            }
236            return p->second;
237        }
238
239        void insert(uint64_t index, const sp<IMemory> &mem) {
240            if (mIndexToMemory.find(index) != mIndexToMemory.end()) {
241                ALOGE("index %llu already present", (unsigned long long)index);
242                return;
243            }
244            (void)mIndexToMemory.emplace(index, mem);
245        }
246
247        void reset() {
248            mIndexToMemory.clear();
249        }
250
251        void gc() {
252            for (auto it = mIndexToMemory.begin(); it != mIndexToMemory.end(); ) {
253                if (MediaBuffer::isDeadObject(it->second)) {
254                    it = mIndexToMemory.erase(it);
255                } else {
256                    ++it;
257                }
258            }
259        }
260    private:
261        // C++14 unordered_map erase on iterator is stable; C++11 has no guarantee.
262        std::map<uint64_t, sp<IMemory>> mIndexToMemory;
263    } mMemoryCache;
264};
265
266IMPLEMENT_META_INTERFACE(MediaSource, "android.media.IMediaSource");
267
268#undef LOG_TAG
269#define LOG_TAG "BnMediaSource"
270
271BnMediaSource::BnMediaSource()
272    : mBuffersSinceStop(0)
273    , mGroup(new MediaBufferGroup(kBinderMediaBuffers /* growthLimit */)) {
274}
275
276BnMediaSource::~BnMediaSource() {
277}
278
279status_t BnMediaSource::onTransact(
280    uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
281{
282    switch (code) {
283        case START: {
284            ALOGV("start");
285            CHECK_INTERFACE(IMediaSource, data, reply);
286            sp<MetaData> meta;
287            if (data.dataAvail()) {
288                meta = MetaData::createFromParcel(data);
289            }
290            status_t ret = start(meta.get());
291            if (ret == NO_ERROR && meta != NULL) {
292                meta->writeToParcel(*reply);
293            }
294            return ret;
295        }
296        case STOP: {
297            ALOGV("stop");
298            CHECK_INTERFACE(IMediaSource, data, reply);
299            mGroup->signalBufferReturned(nullptr);
300            status_t status = stop();
301            mIndexCache.reset();
302            mBuffersSinceStop = 0;
303            return status;
304        }
305        case PAUSE: {
306            ALOGV("pause");
307            CHECK_INTERFACE(IMediaSource, data, reply);
308            mGroup->signalBufferReturned(nullptr);
309            return pause();
310        }
311        case GETFORMAT: {
312            ALOGV("getFormat");
313            CHECK_INTERFACE(IMediaSource, data, reply);
314            sp<MetaData> meta = getFormat();
315            if (meta != NULL) {
316                meta->writeToParcel(*reply);
317                return NO_ERROR;
318            }
319            return UNKNOWN_ERROR;
320        }
321        case READMULTIPLE: {
322            ALOGV("readMultiple");
323            CHECK_INTERFACE(IMediaSource, data, reply);
324
325            // Get max number of buffers to read.
326            uint32_t maxNumBuffers;
327            data.readUint32(&maxNumBuffers);
328            if (maxNumBuffers > kMaxNumReadMultiple) {
329                maxNumBuffers = kMaxNumReadMultiple;
330            }
331
332            // Get read options, if any.
333            ReadOptions opts;
334            uint32_t len;
335            const bool useOptions =
336                    data.readUint32(&len) == NO_ERROR
337                    && len == sizeof(opts)
338                    && data.read((void *)&opts, len) == NO_ERROR;
339
340            mGroup->signalBufferReturned(nullptr);
341            mIndexCache.gc();
342            size_t inlineTransferSize = 0;
343            status_t ret = NO_ERROR;
344            uint32_t bufferCount = 0;
345            for (; bufferCount < maxNumBuffers; ++bufferCount, ++mBuffersSinceStop) {
346                MediaBuffer *buf = nullptr;
347                ret = read(&buf, useOptions ? &opts : nullptr);
348                opts.clearNonPersistent(); // Remove options that only apply to first buffer.
349                if (ret != NO_ERROR || buf == nullptr) {
350                    break;
351                }
352
353                // Even if we're using shared memory, we might not want to use it, since for small
354                // sizes it's faster to copy data through the Binder transaction
355                // On the other hand, if the data size is large enough, it's better to use shared
356                // memory. When data is too large, binder can't handle it.
357                //
358                // TODO: reduce MediaBuffer::kSharedMemThreshold
359                MediaBuffer *transferBuf = nullptr;
360                const size_t length = buf->range_length();
361                size_t offset = buf->range_offset();
362                if (length >= (supportNonblockingRead() && buf->mMemory != nullptr ?
363                        kTransferSharedAsSharedThreshold : kTransferInlineAsSharedThreshold)) {
364                    if (buf->mMemory != nullptr) {
365                        ALOGV("Use shared memory: %zu", length);
366                        transferBuf = buf;
367                    } else {
368                        ALOGD("Large buffer %zu without IMemory!", length);
369                        ret = mGroup->acquire_buffer(
370                                &transferBuf, false /* nonBlocking */, length);
371                        if (ret != OK
372                                || transferBuf == nullptr
373                                || transferBuf->mMemory == nullptr) {
374                            ALOGW("Failed to acquire shared memory, size %zu, ret %d",
375                                    length, ret);
376                            if (transferBuf != nullptr) {
377                                transferBuf->release();
378                                transferBuf = nullptr;
379                            }
380                            // Current buffer transmit inline; no more additional buffers.
381                            maxNumBuffers = 0;
382                        } else {
383                            memcpy(transferBuf->data(), (uint8_t*)buf->data() + offset, length);
384                            offset = 0;
385                            if (!mGroup->has_buffers()) {
386                                maxNumBuffers = 0; // No more MediaBuffers, stop readMultiple.
387                            }
388                        }
389                    }
390                }
391                if (transferBuf != nullptr) { // Using shared buffers.
392                    if (!transferBuf->isObserved()) {
393                        // Transfer buffer must be part of a MediaBufferGroup.
394                        ALOGV("adding shared memory buffer %p to local group", transferBuf);
395                        mGroup->add_buffer(transferBuf);
396                        transferBuf->add_ref(); // We have already acquired buffer.
397                    }
398                    uint64_t index = mIndexCache.lookup(transferBuf->mMemory);
399                    if (index == 0) {
400                        index = mIndexCache.insert(transferBuf->mMemory);
401                        reply->writeInt32(SHARED_BUFFER);
402                        reply->writeUint64(index);
403                        reply->writeStrongBinder(IInterface::asBinder(transferBuf->mMemory));
404                        ALOGV("SHARED_BUFFER(%p) %llu",
405                                transferBuf, (unsigned long long)index);
406                    } else {
407                        reply->writeInt32(SHARED_BUFFER_INDEX);
408                        reply->writeUint64(index);
409                        ALOGV("SHARED_BUFFER_INDEX(%p) %llu",
410                                transferBuf, (unsigned long long)index);
411                    }
412                    reply->writeInt32(offset);
413                    reply->writeInt32(length);
414                    buf->meta_data()->writeToParcel(*reply);
415                    transferBuf->addRemoteRefcount(1);
416                    if (transferBuf != buf) {
417                        transferBuf->release(); // release local ref
418                    } else if (!supportNonblockingRead()) {
419                        maxNumBuffers = 0; // stop readMultiple with one shared buffer.
420                    }
421                } else {
422                    ALOGV_IF(buf->mMemory != nullptr,
423                            "INLINE(%p) %zu shared mem available, but only %zu used",
424                            buf, buf->mMemory->size(), length);
425                    reply->writeInt32(INLINE_BUFFER);
426                    reply->writeByteArray(length, (uint8_t*)buf->data() + offset);
427                    buf->meta_data()->writeToParcel(*reply);
428                    inlineTransferSize += length;
429                    if (inlineTransferSize > kInlineMaxTransfer) {
430                        maxNumBuffers = 0; // stop readMultiple if inline transfer is too large.
431                    }
432                }
433                buf->release();
434            }
435            reply->writeInt32(NULL_BUFFER); // Indicate no more MediaBuffers.
436            reply->writeInt32(ret);
437            ALOGV("readMultiple status %d, bufferCount %u, sinceStop %u",
438                    ret, bufferCount, mBuffersSinceStop);
439            return NO_ERROR;
440        }
441        case SUPPORT_NONBLOCKING_READ: {
442            ALOGV("supportNonblockingRead");
443            CHECK_INTERFACE(IMediaSource, data, reply);
444            reply->writeInt32((int32_t)supportNonblockingRead());
445            return NO_ERROR;
446        }
447        default:
448            return BBinder::onTransact(code, data, reply, flags);
449    }
450}
451
452////////////////////////////////////////////////////////////////////////////////
453
454IMediaSource::ReadOptions::ReadOptions() {
455    reset();
456}
457
458void IMediaSource::ReadOptions::reset() {
459    mOptions = 0;
460    mSeekTimeUs = 0;
461    mLatenessUs = 0;
462    mNonBlocking = false;
463}
464
465void IMediaSource::ReadOptions::setNonBlocking() {
466    mNonBlocking = true;
467}
468
469void IMediaSource::ReadOptions::clearNonBlocking() {
470    mNonBlocking = false;
471}
472
473bool IMediaSource::ReadOptions::getNonBlocking() const {
474    return mNonBlocking;
475}
476
477void IMediaSource::ReadOptions::setSeekTo(int64_t time_us, SeekMode mode) {
478    mOptions |= kSeekTo_Option;
479    mSeekTimeUs = time_us;
480    mSeekMode = mode;
481}
482
483void IMediaSource::ReadOptions::clearSeekTo() {
484    mOptions &= ~kSeekTo_Option;
485    mSeekTimeUs = 0;
486    mSeekMode = SEEK_CLOSEST_SYNC;
487}
488
489bool IMediaSource::ReadOptions::getSeekTo(
490        int64_t *time_us, SeekMode *mode) const {
491    *time_us = mSeekTimeUs;
492    *mode = mSeekMode;
493    return (mOptions & kSeekTo_Option) != 0;
494}
495
496void IMediaSource::ReadOptions::setLateBy(int64_t lateness_us) {
497    mLatenessUs = lateness_us;
498}
499
500int64_t IMediaSource::ReadOptions::getLateBy() const {
501    return mLatenessUs;
502}
503
504
505}  // namespace android
506
507