android_StreamPlayer.cpp revision 513222822545c3e954176476b263df52a47f43a4
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define USE_LOG SLAndroidLogLevel_Verbose
18
19#include "sles_allinclusive.h"
20#include "android_StreamPlayer.h"
21
22#include <media/IStreamSource.h>
23#include <media/IMediaPlayerService.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <binder/IPCThreadState.h>
26
27
28//--------------------------------------------------------------------------------------------------
29namespace android {
30
31StreamSourceAppProxy::StreamSourceAppProxy(
32        IAndroidBufferQueue *androidBufferQueue,
33        const sp<CallbackProtector> &callbackProtector,
34        // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
35        // construction.   If you pass in a sp<> to 'this' inside a constructor, then first the
36        // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
37        // destructor to run from inside it's own constructor.
38        StreamPlayer * /* const sp<StreamPlayer> & */ player) :
39    mBuffersHasBeenSet(false),
40    mAndroidBufferQueue(androidBufferQueue),
41    mCallbackProtector(callbackProtector),
42    mPlayer(player)
43{
44    SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
45}
46
47StreamSourceAppProxy::~StreamSourceAppProxy() {
48    SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
49    disconnect();
50}
51
52const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
53        SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
54        sizeof(SLuint32),                    // item size
55        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
56};
57
58//--------------------------------------------------
59// IStreamSource implementation
60void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
61    assert(listener != NULL);
62    Mutex::Autolock _l(mLock);
63    assert(mListener == NULL);
64    mListener = listener;
65}
66
67void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
68    Mutex::Autolock _l(mLock);
69    assert(!mBuffersHasBeenSet);
70    mBuffers = buffers;
71    mBuffersHasBeenSet = true;
72}
73
74void StreamSourceAppProxy::onBufferAvailable(size_t index) {
75    //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
76
77    {
78        Mutex::Autolock _l(mLock);
79        // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail
80        // assert(mBuffersHasBeenSet);
81        CHECK_LT(index, mBuffers.size());
82#if 0   // enable if needed for debugging
83        sp<IMemory> mem = mBuffers.itemAt(index);
84        SLAint64 length = (SLAint64) mem->size();
85#endif
86        mAvailableBuffers.push_back(index);
87        //SL_LOGD("onBufferAvailable() now %d buffers available in queue", mAvailableBuffers.size());
88    }
89
90    // a new shared mem buffer is available: let's try to fill immediately
91    pullFromBuffQueue();
92}
93
94void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
95    if (mListener != 0) {
96        mListener->issueCommand(cmd, false /* synchronous */, msg);
97    }
98}
99
100void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
101    if (mListener != 0) {
102        mListener->queueBuffer(buffIndex, buffLength);
103    }
104}
105
106void StreamSourceAppProxy::disconnect() {
107    Mutex::Autolock _l(mLock);
108    mListener.clear();
109    // Force binder to push the decremented reference count for sp<IStreamListener>.
110    // mediaserver and client both have sp<> to the other. When you decrement an sp<>
111    // reference count, binder doesn't push that to the other process immediately.
112    IPCThreadState::self()->flushCommands();
113    mBuffers.clear();
114    mBuffersHasBeenSet = false;
115    mAvailableBuffers.clear();
116}
117
118//--------------------------------------------------
119// consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
120void StreamSourceAppProxy::pullFromBuffQueue() {
121
122  if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
123
124    size_t bufferId;
125    void* bufferLoc;
126    size_t buffSize;
127
128    slAndroidBufferQueueCallback callback = NULL;
129    void* pBufferContext, *pBufferData, *callbackPContext = NULL;
130    AdvancedBufferHeader *oldFront = NULL;
131    uint32_t dataSize /* , dataUsed */;
132
133    // retrieve data from the buffer queue
134    interface_lock_exclusive(mAndroidBufferQueue);
135
136    // can this read operation cause us to call the buffer queue callback
137    // (either because there was a command with no data, or all the data has been consumed)
138    bool queueCallbackCandidate = false;
139
140    if (mAndroidBufferQueue->mState.count != 0) {
141        // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
142        assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
143
144        oldFront = mAndroidBufferQueue->mFront;
145        AdvancedBufferHeader *newFront = &oldFront[1];
146
147        // consume events when starting to read data from a buffer for the first time
148        if (oldFront->mDataSizeConsumed == 0) {
149            // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
150            if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
151                receivedCmd_l(IStreamListener::EOS);
152                // EOS has no associated data
153                queueCallbackCandidate = true;
154            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
155                receivedCmd_l(IStreamListener::DISCONTINUITY);
156            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
157                sp<AMessage> msg = new AMessage();
158                msg->setInt64(IStreamListener::kKeyResumeAtPTS,
159                        (int64_t)oldFront->mItems.mTsCmdData.mPts);
160                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
161            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_FORMAT_CHANGE) {
162                sp<AMessage> msg = new AMessage();
163                // positive value for format change key makes the discontinuity "hard", see key def
164                msg->setInt32(IStreamListener::kKeyFormatChange, (int32_t) 1);
165                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
166            }
167            if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
168                    ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE)) {
169                const sp<StreamPlayer> player(mPlayer.promote());
170                if (player != NULL) {
171                    // FIXME see note at onSeek
172                    player->seek(ANDROID_UNKNOWN_TIME);
173                }
174            }
175            oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
176        }
177
178        {
179            // we're going to change the shared mem buffer queue, so lock it
180            Mutex::Autolock _l(mLock);
181            if (!mAvailableBuffers.empty()) {
182                bufferId = *mAvailableBuffers.begin();
183                CHECK_LT(bufferId, mBuffers.size());
184                sp<IMemory> mem = mBuffers.itemAt(bufferId);
185                bufferLoc = mem->pointer();
186                buffSize = mem->size();
187
188                char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
189                if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
190                    // more available than requested, copy as much as requested
191                    // consume data: 1/ copy to given destination
192                    memcpy(bufferLoc, pSrc, buffSize);
193                    //               2/ keep track of how much has been consumed
194                    oldFront->mDataSizeConsumed += buffSize;
195                    //               3/ notify shared mem listener that new data is available
196                    receivedBuffer_l(bufferId, buffSize);
197                    mAvailableBuffers.erase(mAvailableBuffers.begin());
198                } else {
199                    // requested as much available or more: consume the whole of the current
200                    //   buffer and move to the next
201                    size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
202                    //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
203                    oldFront->mDataSizeConsumed = oldFront->mDataSize;
204
205                    // move queue to next
206                    if (newFront == &mAndroidBufferQueue->
207                            mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
208                        // reached the end, circle back
209                        newFront = mAndroidBufferQueue->mBufferArray;
210                    }
211                    mAndroidBufferQueue->mFront = newFront;
212                    mAndroidBufferQueue->mState.count--;
213                    mAndroidBufferQueue->mState.index++;
214
215                    if (consumed > 0) {
216                        // consume data: 1/ copy to given destination
217                        memcpy(bufferLoc, pSrc, consumed);
218                        //               2/ keep track of how much has been consumed
219                        // here nothing to do because we are done with this buffer
220                        //               3/ notify StreamPlayer that new data is available
221                        receivedBuffer_l(bufferId, consumed);
222                        mAvailableBuffers.erase(mAvailableBuffers.begin());
223                    }
224
225                    // data has been consumed, and the buffer queue state has been updated
226                    // we will notify the client if applicable
227                    queueCallbackCandidate = true;
228                }
229            }
230
231            if (queueCallbackCandidate) {
232                if (mAndroidBufferQueue->mCallbackEventsMask &
233                        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
234                    callback = mAndroidBufferQueue->mCallback;
235                    // save callback data while under lock
236                    callbackPContext = mAndroidBufferQueue->mContext;
237                    pBufferContext = (void *)oldFront->mBufferContext;
238                    pBufferData    = (void *)oldFront->mDataBuffer;
239                    dataSize       = oldFront->mDataSize;
240                    // here a buffer is only dequeued when fully consumed
241                    //dataUsed     = oldFront->mDataSizeConsumed;
242                }
243            }
244            //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
245            if (!mAvailableBuffers.empty()) {
246                // there is still room in the shared memory, recheck later if we can pull
247                // data from the buffer queue and write it to shared memory
248                const sp<StreamPlayer> player(mPlayer.promote());
249                if (player != NULL) {
250                    player->queueRefilled();
251                }
252            }
253        }
254
255    } else { // empty queue
256        SL_LOGD("ABQ empty, starving!");
257    }
258
259    interface_unlock_exclusive(mAndroidBufferQueue);
260
261    // notify client of buffer processed
262    if (NULL != callback) {
263        SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
264                pBufferContext, pBufferData, dataSize,
265                dataSize, /* dataUsed  */
266                // no messages during playback other than marking the buffer as processed
267                (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
268                NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
269        if (SL_RESULT_SUCCESS != result) {
270            // Reserved for future use
271            SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
272        }
273    }
274
275    mCallbackProtector->exitCb();
276  } // enterCbIfOk
277}
278
279
280//--------------------------------------------------------------------------------------------------
281StreamPlayer::StreamPlayer(AudioPlayback_Parameters* params, bool hasVideo,
282        IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
283        GenericMediaPlayer(params, hasVideo),
284        mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
285        mStopForDestroyCompleted(false)
286{
287    SL_LOGD("StreamPlayer::StreamPlayer()");
288
289    mPlaybackParams = *params;
290
291}
292
293StreamPlayer::~StreamPlayer() {
294    SL_LOGD("StreamPlayer::~StreamPlayer()");
295    mAppProxy->disconnect();
296}
297
298
299void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
300    switch (msg->what()) {
301        case kWhatPullFromAbq:
302            onPullFromAndroidBufferQueue();
303            break;
304
305        case kWhatStopForDestroy:
306            onStopForDestroy();
307            break;
308
309        default:
310            GenericMediaPlayer::onMessageReceived(msg);
311            break;
312    }
313}
314
315
316void StreamPlayer::preDestroy() {
317    // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
318    (new AMessage(kWhatStopForDestroy, id()))->post();
319    {
320        Mutex::Autolock _l(mStopForDestroyLock);
321        while (!mStopForDestroyCompleted) {
322            mStopForDestroyCondition.wait(mStopForDestroyLock);
323        }
324    }
325    // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
326    GenericMediaPlayer::preDestroy();
327}
328
329
330void StreamPlayer::onStopForDestroy() {
331    if (mPlayer != 0) {
332        mPlayer->stop();
333        // causes CHECK failure in Nuplayer
334        //mPlayer->setDataSource(NULL);
335        mPlayer->setVideoSurfaceTexture(NULL);
336        mPlayer->disconnect();
337        mPlayer.clear();
338        {
339            // FIXME ugh make this a method
340            Mutex::Autolock _l(mPreparedPlayerLock);
341            mPreparedPlayer.clear();
342        }
343    }
344    {
345        Mutex::Autolock _l(mStopForDestroyLock);
346        mStopForDestroyCompleted = true;
347    }
348    mStopForDestroyCondition.signal();
349}
350
351
352/**
353 * Asynchronously notify the player that the queue is ready to be pulled from.
354 */
355void StreamPlayer::queueRefilled() {
356    // async notification that the ABQ was refilled: the player should pull from the ABQ, and
357    //    and push to shared memory (to the media server)
358    (new AMessage(kWhatPullFromAbq, id()))->post();
359}
360
361
362void StreamPlayer::appClear_l() {
363    // the user of StreamPlayer has cleared its AndroidBufferQueue:
364    // there's no clear() for the shared memory queue, so this is a no-op
365}
366
367
368//--------------------------------------------------
369// Event handlers
370void StreamPlayer::onPrepare() {
371    SL_LOGD("StreamPlayer::onPrepare()");
372        sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
373        if (mediaPlayerService != NULL) {
374            mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/,
375                    mPlaybackParams.sessionId);
376            if (mPlayer == NULL) {
377                SL_LOGE("media player service failed to create player by app proxy");
378            } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
379                SL_LOGE("setDataSource failed");
380                mPlayer.clear();
381            }
382        }
383    if (mPlayer == NULL) {
384        mStateFlags |= kFlagPreparedUnsuccessfully;
385    }
386    GenericMediaPlayer::onPrepare();
387    SL_LOGD("StreamPlayer::onPrepare() done");
388}
389
390
391void StreamPlayer::onPlay() {
392    SL_LOGD("StreamPlayer::onPlay()");
393    // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
394    // player had starved the shared memory)
395    queueRefilled();
396
397    GenericMediaPlayer::onPlay();
398}
399
400
401void StreamPlayer::onPullFromAndroidBufferQueue() {
402    SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
403    mAppProxy->pullFromBuffQueue();
404}
405
406} // namespace android
407