android_StreamPlayer.cpp revision a9f22e6f5f53e90daa779e38b22f88e4faa35c95
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define USE_LOG SLAndroidLogLevel_Verbose
18
19#include "sles_allinclusive.h"
20#include "android_StreamPlayer.h"
21
22#include <media/IStreamSource.h>
23#include <media/IMediaPlayerService.h>
24#include <media/stagefright/foundation/ADebug.h>
25
26
27//--------------------------------------------------------------------------------------------------
28namespace android {
29
30StreamSourceAppProxy::StreamSourceAppProxy(
31        IAndroidBufferQueue *androidBufferQueue,
32        const sp<CallbackProtector> &callbackProtector,
33        const sp<StreamPlayer> &player) :
34    mAndroidBufferQueue(androidBufferQueue),
35    mCallbackProtector(callbackProtector),
36    mPlayer(player)
37{
38    SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
39}
40
41StreamSourceAppProxy::~StreamSourceAppProxy() {
42    SL_LOGD("StreamSourceAppProxy::~StreamSourceAppProxy()");
43    mListener.clear();
44    mBuffers.clear();
45}
46
47const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
48        SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
49        sizeof(SLuint32),                    // item size
50        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
51};
52
53//--------------------------------------------------
54// IStreamSource implementation
55void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
56    Mutex::Autolock _l(mLock);
57    mListener = listener;
58}
59
60void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
61    mBuffers = buffers;
62}
63
64void StreamSourceAppProxy::onBufferAvailable(size_t index) {
65    //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
66
67    CHECK_LT(index, mBuffers.size());
68    sp<IMemory> mem = mBuffers.itemAt(index);
69    SLAint64 length = (SLAint64) mem->size();
70
71    {
72        Mutex::Autolock _l(mLock);
73        mAvailableBuffers.push_back(index);
74    }
75    //SL_LOGD("onBufferAvailable() now %d buffers available in queue", mAvailableBuffers.size());
76
77    // a new shared mem buffer is available: let's try to fill immediately
78    pullFromBuffQueue();
79}
80
81void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
82    if (mListener != 0) {
83        mListener->issueCommand(cmd, false /* synchronous */, msg);
84    }
85}
86
87void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
88    if (mListener != 0) {
89        mListener->queueBuffer(buffIndex, buffLength);
90    }
91}
92
93//--------------------------------------------------
94// consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
95void StreamSourceAppProxy::pullFromBuffQueue() {
96
97  if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
98
99    size_t bufferId;
100    void* bufferLoc;
101    size_t buffSize;
102
103    slAndroidBufferQueueCallback callback = NULL;
104    void* pBufferContext, *pBufferData, *callbackPContext = NULL;
105    AdvancedBufferHeader *oldFront = NULL;
106    uint32_t dataSize /* , dataUsed */;
107
108    // retrieve data from the buffer queue
109    interface_lock_exclusive(mAndroidBufferQueue);
110
111    // can this read operation cause us to call the buffer queue callback
112    // (either because there was a command with no data, or all the data has been consumed)
113    bool queueCallbackCandidate = false;
114
115    if (mAndroidBufferQueue->mState.count != 0) {
116        // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
117        assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
118
119        oldFront = mAndroidBufferQueue->mFront;
120        AdvancedBufferHeader *newFront = &oldFront[1];
121
122        // consume events when starting to read data from a buffer for the first time
123        if (oldFront->mDataSizeConsumed == 0) {
124            // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
125            if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
126                receivedCmd_l(IStreamListener::EOS);
127                // EOS has no associated data
128                queueCallbackCandidate = true;
129            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
130                receivedCmd_l(IStreamListener::DISCONTINUITY);
131            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
132                sp<AMessage> msg = new AMessage();
133                msg->setInt64(IStreamListener::kKeyResumeAtPTS,
134                        (int64_t)oldFront->mItems.mTsCmdData.mPts);
135                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
136            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_FORMAT_CHANGE) {
137                sp<AMessage> msg = new AMessage();
138                // positive value for format change key makes the discontinuity "hard", see key def
139                msg->setInt32(IStreamListener::kKeyFormatChange, (int32_t) 1);
140                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
141            }
142            if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
143                    ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE)) {
144                // FIXME see note at onSeek
145                mPlayer->seek(ANDROID_UNKNOWN_TIME);
146            }
147            oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
148        }
149
150        {
151            // we're going to change the shared mem buffer queue, so lock it
152            Mutex::Autolock _l(mLock);
153            if (!mAvailableBuffers.empty()) {
154                bufferId = *mAvailableBuffers.begin();
155                CHECK_LT(bufferId, mBuffers.size());
156                sp<IMemory> mem = mBuffers.itemAt(bufferId);
157                bufferLoc = mem->pointer();
158                buffSize = mem->size();
159
160                char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
161                if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
162                    // more available than requested, copy as much as requested
163                    // consume data: 1/ copy to given destination
164                    memcpy(bufferLoc, pSrc, buffSize);
165                    //               2/ keep track of how much has been consumed
166                    oldFront->mDataSizeConsumed += buffSize;
167                    //               3/ notify shared mem listener that new data is available
168                    receivedBuffer_l(bufferId, buffSize);
169                    mAvailableBuffers.erase(mAvailableBuffers.begin());
170                } else {
171                    // requested as much available or more: consume the whole of the current
172                    //   buffer and move to the next
173                    size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
174                    //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
175                    oldFront->mDataSizeConsumed = oldFront->mDataSize;
176
177                    // move queue to next
178                    if (newFront == &mAndroidBufferQueue->
179                            mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
180                        // reached the end, circle back
181                        newFront = mAndroidBufferQueue->mBufferArray;
182                    }
183                    mAndroidBufferQueue->mFront = newFront;
184                    mAndroidBufferQueue->mState.count--;
185                    mAndroidBufferQueue->mState.index++;
186
187                    if (consumed > 0) {
188                        // consume data: 1/ copy to given destination
189                        memcpy(bufferLoc, pSrc, consumed);
190                        //               2/ keep track of how much has been consumed
191                        // here nothing to do because we are done with this buffer
192                        //               3/ notify StreamPlayer that new data is available
193                        receivedBuffer_l(bufferId, consumed);
194                        mAvailableBuffers.erase(mAvailableBuffers.begin());
195                    }
196
197                    // data has been consumed, and the buffer queue state has been updated
198                    // we will notify the client if applicable
199                    queueCallbackCandidate = true;
200                }
201            }
202
203            if (queueCallbackCandidate) {
204                if (mAndroidBufferQueue->mCallbackEventsMask &
205                        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
206                    callback = mAndroidBufferQueue->mCallback;
207                    // save callback data while under lock
208                    callbackPContext = mAndroidBufferQueue->mContext;
209                    pBufferContext = (void *)oldFront->mBufferContext;
210                    pBufferData    = (void *)oldFront->mDataBuffer;
211                    dataSize       = oldFront->mDataSize;
212                    // here a buffer is only dequeued when fully consumed
213                    //dataUsed     = oldFront->mDataSizeConsumed;
214                }
215            }
216            //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
217            if (!mAvailableBuffers.empty()) {
218                // there is still room in the shared memory, recheck later if we can pull
219                // data from the buffer queue and write it to shared memory
220                mPlayer->queueRefilled();
221            }
222        }
223
224    } else { // empty queue
225        SL_LOGD("ABQ empty, starving!");
226    }
227
228    interface_unlock_exclusive(mAndroidBufferQueue);
229
230    // notify client of buffer processed
231    if (NULL != callback) {
232        SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
233                pBufferContext, pBufferData, dataSize,
234                dataSize, /* dataUsed  */
235                // no messages during playback other than marking the buffer as processed
236                (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
237                NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
238        if (SL_RESULT_SUCCESS != result) {
239            // Reserved for future use
240            SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
241        }
242    }
243
244    mCallbackProtector->exitCb();
245  } // enterCbIfOk
246}
247
248
249//--------------------------------------------------------------------------------------------------
250StreamPlayer::StreamPlayer(AudioPlayback_Parameters* params, bool hasVideo,
251        IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
252        GenericMediaPlayer(params, hasVideo),
253        mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
254        mStopForDestroyCompleted(false)
255{
256    SL_LOGD("StreamPlayer::StreamPlayer()");
257
258    mPlaybackParams = *params;
259
260}
261
262StreamPlayer::~StreamPlayer() {
263    SL_LOGD("StreamPlayer::~StreamPlayer()");
264}
265
266
267void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
268    switch (msg->what()) {
269        case kWhatPullFromAbq:
270            onPullFromAndroidBufferQueue();
271            break;
272
273        case kWhatStopForDestroy:
274            onStopForDestroy();
275            break;
276
277        default:
278            GenericMediaPlayer::onMessageReceived(msg);
279            break;
280    }
281}
282
283
284void StreamPlayer::preDestroy() {
285    // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
286    (new AMessage(kWhatStopForDestroy, id()))->post();
287    {
288        Mutex::Autolock _l(mStopForDestroyLock);
289        while (!mStopForDestroyCompleted) {
290            mStopForDestroyCondition.wait(mStopForDestroyLock);
291        }
292    }
293    // skipping past GenericMediaPlayer::preDestroy
294    GenericPlayer::preDestroy();
295}
296
297
298void StreamPlayer::onStopForDestroy() {
299    if (mPlayer != 0) {
300        mPlayer->stop();
301    }
302    mStopForDestroyCompleted = true;
303    mStopForDestroyCondition.signal();
304}
305
306
307/**
308 * Asynchronously notify the player that the queue is ready to be pulled from.
309 */
310void StreamPlayer::queueRefilled() {
311    // async notification that the ABQ was refilled: the player should pull from the ABQ, and
312    //    and push to shared memory (to the media server)
313    (new AMessage(kWhatPullFromAbq, id()))->post();
314}
315
316
317void StreamPlayer::appClear_l() {
318    // the user of StreamPlayer has cleared its AndroidBufferQueue:
319    // there's no clear() for the shared memory queue, so this is a no-op
320}
321
322
323//--------------------------------------------------
324// Event handlers
325void StreamPlayer::onPrepare() {
326    SL_LOGD("StreamPlayer::onPrepare()");
327        sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
328        if (mediaPlayerService != NULL) {
329            mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/,
330                    mPlaybackParams.sessionId);
331            if (mPlayer == NULL) {
332                SL_LOGE("media player service failed to create player by app proxy");
333            } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
334                SL_LOGE("setDataSource failed");
335                mPlayer.clear();
336            }
337        }
338    if (mPlayer == NULL) {
339        mStateFlags |= kFlagPreparedUnsuccessfully;
340    }
341    GenericMediaPlayer::onPrepare();
342    SL_LOGD("StreamPlayer::onPrepare() done");
343}
344
345
346void StreamPlayer::onPlay() {
347    SL_LOGD("StreamPlayer::onPlay()");
348    // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
349    // player had starved the shared memory)
350    queueRefilled();
351
352    GenericMediaPlayer::onPlay();
353}
354
355
356void StreamPlayer::onPullFromAndroidBufferQueue() {
357    SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
358    mAppProxy->pullFromBuffQueue();
359}
360
361} // namespace android
362