android_StreamPlayer.cpp revision e2e8fa36bd7448b59fbcdf141e0b6d21e5401d91
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define USE_LOG SLAndroidLogLevel_Verbose
18
19#include "sles_allinclusive.h"
20#include "android_StreamPlayer.h"
21
22#include <media/IStreamSource.h>
23#include <media/IMediaPlayerService.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <binder/IPCThreadState.h>
26
27
28//--------------------------------------------------------------------------------------------------
29namespace android {
30
31StreamSourceAppProxy::StreamSourceAppProxy(
32        IAndroidBufferQueue *androidBufferQueue,
33        const sp<CallbackProtector> &callbackProtector,
34        // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
35        // construction.   If you pass in a sp<> to 'this' inside a constructor, then first the
36        // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
37        // destructor to run from inside it's own constructor.
38        StreamPlayer * /* const sp<StreamPlayer> & */ player) :
39    mBuffersHasBeenSet(false),
40    mAndroidBufferQueue(androidBufferQueue),
41    mCallbackProtector(callbackProtector),
42    mPlayer(player)
43{
44    SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
45}
46
47StreamSourceAppProxy::~StreamSourceAppProxy() {
48    // FIXME make this an SL_LOGV later; this just proves that the bug is fixed
49    SL_LOGI("StreamSourceAppProxy::~StreamSourceAppProxy()");
50    disconnect();
51}
52
53const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
54        SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
55        sizeof(SLuint32),                    // item size
56        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
57};
58
59//--------------------------------------------------
60// IStreamSource implementation
61void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
62    assert(listener != NULL);
63    Mutex::Autolock _l(mLock);
64    assert(mListener == NULL);
65    mListener = listener;
66}
67
68void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
69    Mutex::Autolock _l(mLock);
70    assert(!mBuffersHasBeenSet);
71    mBuffers = buffers;
72    mBuffersHasBeenSet = true;
73}
74
75void StreamSourceAppProxy::onBufferAvailable(size_t index) {
76    //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
77
78    {
79        Mutex::Autolock _l(mLock);
80        // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail
81        // assert(mBuffersHasBeenSet);
82        CHECK_LT(index, mBuffers.size());
83#if 0   // enable if needed for debugging
84        sp<IMemory> mem = mBuffers.itemAt(index);
85        SLAint64 length = (SLAint64) mem->size();
86#endif
87        mAvailableBuffers.push_back(index);
88        //SL_LOGD("onBufferAvailable() now %d buffers available in queue",
89        //         mAvailableBuffers.size());
90    }
91
92    // a new shared mem buffer is available: let's try to fill immediately
93    pullFromBuffQueue();
94}
95
96void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
97    if (mListener != 0) {
98        mListener->issueCommand(cmd, false /* synchronous */, msg);
99    }
100}
101
102void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
103    if (mListener != 0) {
104        mListener->queueBuffer(buffIndex, buffLength);
105    }
106}
107
108void StreamSourceAppProxy::disconnect() {
109    Mutex::Autolock _l(mLock);
110    mListener.clear();
111    // Force binder to push the decremented reference count for sp<IStreamListener>.
112    // mediaserver and client both have sp<> to the other. When you decrement an sp<>
113    // reference count, binder doesn't push that to the other process immediately.
114    IPCThreadState::self()->flushCommands();
115    mBuffers.clear();
116    mBuffersHasBeenSet = false;
117    mAvailableBuffers.clear();
118}
119
120//--------------------------------------------------
121// consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
122void StreamSourceAppProxy::pullFromBuffQueue() {
123
124  if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
125
126    size_t bufferId;
127    void* bufferLoc;
128    size_t buffSize;
129
130    slAndroidBufferQueueCallback callback = NULL;
131    void* pBufferContext, *pBufferData, *callbackPContext = NULL;
132    AdvancedBufferHeader *oldFront = NULL;
133    uint32_t dataSize /* , dataUsed */;
134
135    // retrieve data from the buffer queue
136    interface_lock_exclusive(mAndroidBufferQueue);
137
138    // can this read operation cause us to call the buffer queue callback
139    // (either because there was a command with no data, or all the data has been consumed)
140    bool queueCallbackCandidate = false;
141
142    if (mAndroidBufferQueue->mState.count != 0) {
143        // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
144        assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
145
146        oldFront = mAndroidBufferQueue->mFront;
147        AdvancedBufferHeader *newFront = &oldFront[1];
148
149        // consume events when starting to read data from a buffer for the first time
150        if (oldFront->mDataSizeConsumed == 0) {
151            // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
152            if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
153                receivedCmd_l(IStreamListener::EOS);
154                // EOS has no associated data
155                queueCallbackCandidate = true;
156            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
157                receivedCmd_l(IStreamListener::DISCONTINUITY);
158            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
159                sp<AMessage> msg = new AMessage();
160                msg->setInt64(IStreamListener::kKeyResumeAtPTS,
161                        (int64_t)oldFront->mItems.mTsCmdData.mPts);
162                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
163            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_FORMAT_CHANGE) {
164                sp<AMessage> msg = new AMessage();
165                // positive value for format change key makes the discontinuity "hard", see key def
166                msg->setInt32(IStreamListener::kKeyFormatChange, (int32_t) 1);
167                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
168            }
169            if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
170                    ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE)) {
171                const sp<StreamPlayer> player(mPlayer.promote());
172                if (player != NULL) {
173                    // FIXME see note at onSeek
174                    player->seek(ANDROID_UNKNOWN_TIME);
175                }
176            }
177            oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
178        }
179
180        {
181            // we're going to change the shared mem buffer queue, so lock it
182            Mutex::Autolock _l(mLock);
183            if (!mAvailableBuffers.empty()) {
184                bufferId = *mAvailableBuffers.begin();
185                CHECK_LT(bufferId, mBuffers.size());
186                sp<IMemory> mem = mBuffers.itemAt(bufferId);
187                bufferLoc = mem->pointer();
188                buffSize = mem->size();
189
190                char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
191                if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
192                    // more available than requested, copy as much as requested
193                    // consume data: 1/ copy to given destination
194                    memcpy(bufferLoc, pSrc, buffSize);
195                    //               2/ keep track of how much has been consumed
196                    oldFront->mDataSizeConsumed += buffSize;
197                    //               3/ notify shared mem listener that new data is available
198                    receivedBuffer_l(bufferId, buffSize);
199                    mAvailableBuffers.erase(mAvailableBuffers.begin());
200                } else {
201                    // requested as much available or more: consume the whole of the current
202                    //   buffer and move to the next
203                    size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
204                    //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
205                    oldFront->mDataSizeConsumed = oldFront->mDataSize;
206
207                    // move queue to next
208                    if (newFront == &mAndroidBufferQueue->
209                            mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
210                        // reached the end, circle back
211                        newFront = mAndroidBufferQueue->mBufferArray;
212                    }
213                    mAndroidBufferQueue->mFront = newFront;
214                    mAndroidBufferQueue->mState.count--;
215                    mAndroidBufferQueue->mState.index++;
216
217                    if (consumed > 0) {
218                        // consume data: 1/ copy to given destination
219                        memcpy(bufferLoc, pSrc, consumed);
220                        //               2/ keep track of how much has been consumed
221                        // here nothing to do because we are done with this buffer
222                        //               3/ notify StreamPlayer that new data is available
223                        receivedBuffer_l(bufferId, consumed);
224                        mAvailableBuffers.erase(mAvailableBuffers.begin());
225                    }
226
227                    // data has been consumed, and the buffer queue state has been updated
228                    // we will notify the client if applicable
229                    queueCallbackCandidate = true;
230                }
231            }
232
233            if (queueCallbackCandidate) {
234                if (mAndroidBufferQueue->mCallbackEventsMask &
235                        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
236                    callback = mAndroidBufferQueue->mCallback;
237                    // save callback data while under lock
238                    callbackPContext = mAndroidBufferQueue->mContext;
239                    pBufferContext = (void *)oldFront->mBufferContext;
240                    pBufferData    = (void *)oldFront->mDataBuffer;
241                    dataSize       = oldFront->mDataSize;
242                    // here a buffer is only dequeued when fully consumed
243                    //dataUsed     = oldFront->mDataSizeConsumed;
244                }
245            }
246            //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
247            if (!mAvailableBuffers.empty()) {
248                // there is still room in the shared memory, recheck later if we can pull
249                // data from the buffer queue and write it to shared memory
250                const sp<StreamPlayer> player(mPlayer.promote());
251                if (player != NULL) {
252                    player->queueRefilled();
253                }
254            }
255        }
256
257    } else { // empty queue
258        SL_LOGD("ABQ empty, starving!");
259    }
260
261    interface_unlock_exclusive(mAndroidBufferQueue);
262
263    // notify client of buffer processed
264    if (NULL != callback) {
265        SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
266                pBufferContext, pBufferData, dataSize,
267                dataSize, /* dataUsed  */
268                // no messages during playback other than marking the buffer as processed
269                (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
270                NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
271        if (SL_RESULT_SUCCESS != result) {
272            // Reserved for future use
273            SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
274        }
275    }
276
277    mCallbackProtector->exitCb();
278  } // enterCbIfOk
279}
280
281
282//--------------------------------------------------------------------------------------------------
283StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
284        IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
285        GenericMediaPlayer(params, hasVideo),
286        mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
287        mStopForDestroyCompleted(false)
288{
289    SL_LOGD("StreamPlayer::StreamPlayer()");
290}
291
292StreamPlayer::~StreamPlayer() {
293    SL_LOGD("StreamPlayer::~StreamPlayer()");
294    mAppProxy->disconnect();
295}
296
297
298void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
299    switch (msg->what()) {
300        case kWhatPullFromAbq:
301            onPullFromAndroidBufferQueue();
302            break;
303
304        case kWhatStopForDestroy:
305            onStopForDestroy();
306            break;
307
308        default:
309            GenericMediaPlayer::onMessageReceived(msg);
310            break;
311    }
312}
313
314
315void StreamPlayer::preDestroy() {
316    // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
317    (new AMessage(kWhatStopForDestroy, id()))->post();
318    {
319        Mutex::Autolock _l(mStopForDestroyLock);
320        while (!mStopForDestroyCompleted) {
321            mStopForDestroyCondition.wait(mStopForDestroyLock);
322        }
323    }
324    // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
325    GenericMediaPlayer::preDestroy();
326}
327
328
329void StreamPlayer::onStopForDestroy() {
330    if (mPlayer != 0) {
331        mPlayer->stop();
332        // causes CHECK failure in Nuplayer
333        //mPlayer->setDataSource(NULL);
334        mPlayer->setVideoSurface(NULL);
335        mPlayer->disconnect();
336        mPlayer.clear();
337        {
338            // FIXME ugh make this a method
339            Mutex::Autolock _l(mPreparedPlayerLock);
340            mPreparedPlayer.clear();
341        }
342    }
343    mStopForDestroyCompleted = true;
344    mStopForDestroyCondition.signal();
345}
346
347
348/**
349 * Asynchronously notify the player that the queue is ready to be pulled from.
350 */
351void StreamPlayer::queueRefilled() {
352    // async notification that the ABQ was refilled: the player should pull from the ABQ, and
353    //    and push to shared memory (to the media server)
354    (new AMessage(kWhatPullFromAbq, id()))->post();
355}
356
357
358void StreamPlayer::appClear_l() {
359    // the user of StreamPlayer has cleared its AndroidBufferQueue:
360    // there's no clear() for the shared memory queue, so this is a no-op
361}
362
363
364//--------------------------------------------------
365// Event handlers
366void StreamPlayer::onPrepare() {
367    SL_LOGD("StreamPlayer::onPrepare()");
368        sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
369        if (mediaPlayerService != NULL) {
370            mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/,
371                    mPlaybackParams.sessionId);
372            if (mPlayer == NULL) {
373                SL_LOGE("media player service failed to create player by app proxy");
374            } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
375                SL_LOGE("setDataSource failed");
376                mPlayer.clear();
377            }
378        }
379    if (mPlayer == NULL) {
380        mStateFlags |= kFlagPreparedUnsuccessfully;
381    }
382    GenericMediaPlayer::onPrepare();
383    SL_LOGD("StreamPlayer::onPrepare() done");
384}
385
386
387void StreamPlayer::onPlay() {
388    SL_LOGD("StreamPlayer::onPlay()");
389    // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
390    // player had starved the shared memory)
391    queueRefilled();
392
393    GenericMediaPlayer::onPlay();
394}
395
396
397void StreamPlayer::onPullFromAndroidBufferQueue() {
398    SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
399    mAppProxy->pullFromBuffQueue();
400}
401
402} // namespace android
403