1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define USE_LOG SLAndroidLogLevel_Verbose
18
19#include "sles_allinclusive.h"
20#include "android_StreamPlayer.h"
21
22#include <media/IStreamSource.h>
23#include <media/IMediaPlayerService.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <binder/IPCThreadState.h>
26
27#include "mpeg2ts/ATSParser.h"
28
29//--------------------------------------------------------------------------------------------------
30namespace android {
31
32StreamSourceAppProxy::StreamSourceAppProxy(
33        IAndroidBufferQueue *androidBufferQueue,
34        const sp<CallbackProtector> &callbackProtector,
35        // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
36        // construction.   If you pass in a sp<> to 'this' inside a constructor, then first the
37        // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
38        // destructor to run from inside it's own constructor.
39        StreamPlayer * /* const sp<StreamPlayer> & */ player) :
40    mBuffersHasBeenSet(false),
41    mAndroidBufferQueue(androidBufferQueue),
42    mCallbackProtector(callbackProtector),
43    mPlayer(player)
44{
45    SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
46}
47
48StreamSourceAppProxy::~StreamSourceAppProxy() {
49    SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
50    disconnect();
51}
52
53const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
54        SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
55        sizeof(SLuint32),                    // item size
56        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
57};
58
59//--------------------------------------------------
60// IStreamSource implementation
61void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
62    assert(listener != NULL);
63    Mutex::Autolock _l(mLock);
64    assert(mListener == NULL);
65    mListener = listener;
66}
67
68void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
69    Mutex::Autolock _l(mLock);
70    assert(!mBuffersHasBeenSet);
71    mBuffers = buffers;
72    mBuffersHasBeenSet = true;
73}
74
75void StreamSourceAppProxy::onBufferAvailable(size_t index) {
76    //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
77
78    {
79        Mutex::Autolock _l(mLock);
80        if (!mBuffersHasBeenSet) {
81            // no buffers available to push data to from the buffer queue, bail
82            return;
83        }
84        CHECK_LT(index, mBuffers.size());
85#if 0   // enable if needed for debugging
86        sp<IMemory> mem = mBuffers.itemAt(index);
87        SLAint64 length = (SLAint64) mem->size();
88#endif
89        mAvailableBuffers.push_back(index);
90        //SL_LOGD("onBufferAvailable() now %d buffers available in queue",
91        //         mAvailableBuffers.size());
92    }
93
94    // a new shared mem buffer is available: let's try to fill immediately
95    pullFromBuffQueue();
96}
97
98void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
99    if (mListener != 0) {
100        mListener->issueCommand(cmd, false /* synchronous */, msg);
101    }
102}
103
104void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
105    if (mListener != 0) {
106        mListener->queueBuffer(buffIndex, buffLength);
107    }
108}
109
110void StreamSourceAppProxy::disconnect() {
111    Mutex::Autolock _l(mLock);
112    mListener.clear();
113    // Force binder to push the decremented reference count for sp<IStreamListener>.
114    // mediaserver and client both have sp<> to the other. When you decrement an sp<>
115    // reference count, binder doesn't push that to the other process immediately.
116    IPCThreadState::self()->flushCommands();
117    mBuffers.clear();
118    mBuffersHasBeenSet = false;
119    mAvailableBuffers.clear();
120}
121
122//--------------------------------------------------
123// consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
124void StreamSourceAppProxy::pullFromBuffQueue() {
125
126  if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
127
128    size_t bufferId;
129    void* bufferLoc;
130    size_t buffSize;
131
132    slAndroidBufferQueueCallback callback = NULL;
133    void* pBufferContext, *pBufferData, *callbackPContext = NULL;
134    AdvancedBufferHeader *oldFront = NULL;
135    uint32_t dataSize /* , dataUsed */;
136
137    // retrieve data from the buffer queue
138    interface_lock_exclusive(mAndroidBufferQueue);
139
140    // can this read operation cause us to call the buffer queue callback
141    // (either because there was a command with no data, or all the data has been consumed)
142    bool queueCallbackCandidate = false;
143
144    if (mAndroidBufferQueue->mState.count != 0) {
145        // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
146        assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
147
148        oldFront = mAndroidBufferQueue->mFront;
149        AdvancedBufferHeader *newFront = &oldFront[1];
150
151        // consume events when starting to read data from a buffer for the first time
152        if (oldFront->mDataSizeConsumed == 0) {
153            // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
154            if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
155                receivedCmd_l(IStreamListener::EOS);
156                // EOS has no associated data
157                queueCallbackCandidate = true;
158            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
159                receivedCmd_l(IStreamListener::DISCONTINUITY);
160            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
161                sp<AMessage> msg = new AMessage();
162                msg->setInt64(IStreamListener::kKeyResumeAtPTS,
163                        (int64_t)oldFront->mItems.mTsCmdData.mPts);
164                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
165            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
166                    & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
167                sp<AMessage> msg = new AMessage();
168                msg->setInt32(
169                        IStreamListener::kKeyDiscontinuityMask,
170                        ATSParser::DISCONTINUITY_FORMATCHANGE);
171                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
172            } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
173                    & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
174                sp<AMessage> msg = new AMessage();
175                msg->setInt32(
176                        IStreamListener::kKeyDiscontinuityMask,
177                        ATSParser::DISCONTINUITY_VIDEO_FORMAT);
178                receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
179            }
180            // note that here we are intentionally only supporting
181            //   ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
182
183            // some commands may introduce a time discontinuity, reevaluate position if needed
184            if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
185                    ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
186                const sp<StreamPlayer> player(mPlayer.promote());
187                if (player != NULL) {
188                    // FIXME see note at onSeek
189                    player->seek(ANDROID_UNKNOWN_TIME);
190                }
191            }
192            oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
193        }
194
195        {
196            // we're going to change the shared mem buffer queue, so lock it
197            Mutex::Autolock _l(mLock);
198            if (!mAvailableBuffers.empty()) {
199                bufferId = *mAvailableBuffers.begin();
200                CHECK_LT(bufferId, mBuffers.size());
201                sp<IMemory> mem = mBuffers.itemAt(bufferId);
202                bufferLoc = mem->pointer();
203                buffSize = mem->size();
204
205                char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
206                if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
207                    // more available than requested, copy as much as requested
208                    // consume data: 1/ copy to given destination
209                    memcpy(bufferLoc, pSrc, buffSize);
210                    //               2/ keep track of how much has been consumed
211                    oldFront->mDataSizeConsumed += buffSize;
212                    //               3/ notify shared mem listener that new data is available
213                    receivedBuffer_l(bufferId, buffSize);
214                    mAvailableBuffers.erase(mAvailableBuffers.begin());
215                } else {
216                    // requested as much available or more: consume the whole of the current
217                    //   buffer and move to the next
218                    size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
219                    //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
220                    oldFront->mDataSizeConsumed = oldFront->mDataSize;
221
222                    // move queue to next
223                    if (newFront == &mAndroidBufferQueue->
224                            mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
225                        // reached the end, circle back
226                        newFront = mAndroidBufferQueue->mBufferArray;
227                    }
228                    mAndroidBufferQueue->mFront = newFront;
229                    mAndroidBufferQueue->mState.count--;
230                    mAndroidBufferQueue->mState.index++;
231
232                    if (consumed > 0) {
233                        // consume data: 1/ copy to given destination
234                        memcpy(bufferLoc, pSrc, consumed);
235                        //               2/ keep track of how much has been consumed
236                        // here nothing to do because we are done with this buffer
237                        //               3/ notify StreamPlayer that new data is available
238                        receivedBuffer_l(bufferId, consumed);
239                        mAvailableBuffers.erase(mAvailableBuffers.begin());
240                    }
241
242                    // data has been consumed, and the buffer queue state has been updated
243                    // we will notify the client if applicable
244                    queueCallbackCandidate = true;
245                }
246            }
247
248            if (queueCallbackCandidate) {
249                if (mAndroidBufferQueue->mCallbackEventsMask &
250                        SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
251                    callback = mAndroidBufferQueue->mCallback;
252                    // save callback data while under lock
253                    callbackPContext = mAndroidBufferQueue->mContext;
254                    pBufferContext = (void *)oldFront->mBufferContext;
255                    pBufferData    = (void *)oldFront->mDataBuffer;
256                    dataSize       = oldFront->mDataSize;
257                    // here a buffer is only dequeued when fully consumed
258                    //dataUsed     = oldFront->mDataSizeConsumed;
259                }
260            }
261            //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
262            if (!mAvailableBuffers.empty()) {
263                // there is still room in the shared memory, recheck later if we can pull
264                // data from the buffer queue and write it to shared memory
265                const sp<StreamPlayer> player(mPlayer.promote());
266                if (player != NULL) {
267                    player->queueRefilled();
268                }
269            }
270        }
271
272    } else { // empty queue
273        SL_LOGD("ABQ empty, starving!");
274    }
275
276    interface_unlock_exclusive(mAndroidBufferQueue);
277
278    // notify client of buffer processed
279    if (NULL != callback) {
280        SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
281                pBufferContext, pBufferData, dataSize,
282                dataSize, /* dataUsed  */
283                // no messages during playback other than marking the buffer as processed
284                (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
285                NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
286        if (SL_RESULT_SUCCESS != result) {
287            // Reserved for future use
288            SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
289        }
290    }
291
292    mCallbackProtector->exitCb();
293  } // enterCbIfOk
294}
295
296
297//--------------------------------------------------------------------------------------------------
298StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
299        IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
300        GenericMediaPlayer(params, hasVideo),
301        mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
302        mStopForDestroyCompleted(false)
303{
304    SL_LOGD("StreamPlayer::StreamPlayer()");
305}
306
307StreamPlayer::~StreamPlayer() {
308    SL_LOGD("StreamPlayer::~StreamPlayer()");
309    mAppProxy->disconnect();
310}
311
312
313void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
314    switch (msg->what()) {
315        case kWhatPullFromAbq:
316            onPullFromAndroidBufferQueue();
317            break;
318
319        case kWhatStopForDestroy:
320            onStopForDestroy();
321            break;
322
323        default:
324            GenericMediaPlayer::onMessageReceived(msg);
325            break;
326    }
327}
328
329
330void StreamPlayer::preDestroy() {
331    // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
332    (new AMessage(kWhatStopForDestroy, id()))->post();
333    {
334        Mutex::Autolock _l(mStopForDestroyLock);
335        while (!mStopForDestroyCompleted) {
336            mStopForDestroyCondition.wait(mStopForDestroyLock);
337        }
338    }
339    // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
340    GenericMediaPlayer::preDestroy();
341}
342
343
344void StreamPlayer::onStopForDestroy() {
345    if (mPlayer != 0) {
346        mPlayer->stop();
347        // causes CHECK failure in Nuplayer
348        //mPlayer->setDataSource(NULL);
349        mPlayer->setVideoSurfaceTexture(NULL);
350        mPlayer->disconnect();
351        mPlayer.clear();
352        {
353            // FIXME ugh make this a method
354            Mutex::Autolock _l(mPreparedPlayerLock);
355            mPreparedPlayer.clear();
356        }
357    }
358    {
359        Mutex::Autolock _l(mStopForDestroyLock);
360        mStopForDestroyCompleted = true;
361    }
362    mStopForDestroyCondition.signal();
363}
364
365
366/**
367 * Asynchronously notify the player that the queue is ready to be pulled from.
368 */
369void StreamPlayer::queueRefilled() {
370    // async notification that the ABQ was refilled: the player should pull from the ABQ, and
371    //    and push to shared memory (to the media server)
372    (new AMessage(kWhatPullFromAbq, id()))->post();
373}
374
375
376void StreamPlayer::appClear_l() {
377    // the user of StreamPlayer has cleared its AndroidBufferQueue:
378    // there's no clear() for the shared memory queue, so this is a no-op
379}
380
381
382//--------------------------------------------------
383// Event handlers
384void StreamPlayer::onPrepare() {
385    SL_LOGD("StreamPlayer::onPrepare()");
386        sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
387        if (mediaPlayerService != NULL) {
388            mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/,
389                    mPlaybackParams.sessionId);
390            if (mPlayer == NULL) {
391                SL_LOGE("media player service failed to create player by app proxy");
392            } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
393                SL_LOGE("setDataSource failed");
394                mPlayer.clear();
395            }
396        }
397    if (mPlayer == NULL) {
398        mStateFlags |= kFlagPreparedUnsuccessfully;
399    }
400    GenericMediaPlayer::onPrepare();
401    SL_LOGD("StreamPlayer::onPrepare() done");
402}
403
404
405void StreamPlayer::onPlay() {
406    SL_LOGD("StreamPlayer::onPlay()");
407    // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
408    // player had starved the shared memory)
409    queueRefilled();
410
411    GenericMediaPlayer::onPlay();
412}
413
414
415void StreamPlayer::onPullFromAndroidBufferQueue() {
416    SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
417    mAppProxy->pullFromBuffQueue();
418}
419
420} // namespace android
421