android_StreamPlayer.cpp revision 665ca3f1b0fc90cd5980a435d164354b2529c0b5
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define USE_LOG SLAndroidLogLevel_Verbose 18 19#include "sles_allinclusive.h" 20#include "android_StreamPlayer.h" 21 22#include <media/IStreamSource.h> 23#include <media/IMediaPlayerService.h> 24#include <media/stagefright/foundation/ADebug.h> 25#include <binder/IPCThreadState.h> 26 27#include "mpeg2ts/ATSParser.h" 28 29//-------------------------------------------------------------------------------------------------- 30namespace android { 31 32StreamSourceAppProxy::StreamSourceAppProxy( 33 IAndroidBufferQueue *androidBufferQueue, 34 const sp<CallbackProtector> &callbackProtector, 35 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own 36 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the 37 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's 38 // destructor to run from inside it's own constructor. 39 StreamPlayer * /* const sp<StreamPlayer> & */ player) : 40 mBuffersHasBeenSet(false), 41 mAndroidBufferQueue(androidBufferQueue), 42 mCallbackProtector(callbackProtector), 43 mPlayer(player) 44{ 45 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()"); 46} 47 48StreamSourceAppProxy::~StreamSourceAppProxy() { 49 SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()"); 50 disconnect(); 51} 52 53const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = { 54 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key 55 sizeof(SLuint32), // item size 56 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data 57}; 58 59//-------------------------------------------------- 60// IStreamSource implementation 61void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) { 62 assert(listener != NULL); 63 Mutex::Autolock _l(mLock); 64 assert(mListener == NULL); 65 mListener = listener; 66} 67 68void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) { 69 Mutex::Autolock _l(mLock); 70 assert(!mBuffersHasBeenSet); 71 mBuffers = buffers; 72 mBuffersHasBeenSet = true; 73} 74 75void StreamSourceAppProxy::onBufferAvailable(size_t index) { 76 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index); 77 78 { 79 Mutex::Autolock _l(mLock); 80 // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail 81 // assert(mBuffersHasBeenSet); 82 CHECK_LT(index, mBuffers.size()); 83#if 0 // enable if needed for debugging 84 sp<IMemory> mem = mBuffers.itemAt(index); 85 SLAint64 length = (SLAint64) mem->size(); 86#endif 87 mAvailableBuffers.push_back(index); 88 //SL_LOGD("onBufferAvailable() now %d buffers available in queue", mAvailableBuffers.size()); 89 } 90 91 // a new shared mem buffer is available: let's try to fill immediately 92 pullFromBuffQueue(); 93} 94 95void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) { 96 if (mListener != 0) { 97 mListener->issueCommand(cmd, false /* synchronous */, msg); 98 } 99} 100 101void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) { 102 if (mListener != 0) { 103 mListener->queueBuffer(buffIndex, buffLength); 104 } 105} 106 107void StreamSourceAppProxy::disconnect() { 108 Mutex::Autolock _l(mLock); 109 mListener.clear(); 110 // Force binder to push the decremented reference count for sp<IStreamListener>. 111 // mediaserver and client both have sp<> to the other. When you decrement an sp<> 112 // reference count, binder doesn't push that to the other process immediately. 113 IPCThreadState::self()->flushCommands(); 114 mBuffers.clear(); 115 mBuffersHasBeenSet = false; 116 mAvailableBuffers.clear(); 117} 118 119//-------------------------------------------------- 120// consumption from ABQ: pull from the ABQ, and push to shared memory (media server) 121void StreamSourceAppProxy::pullFromBuffQueue() { 122 123 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) { 124 125 size_t bufferId; 126 void* bufferLoc; 127 size_t buffSize; 128 129 slAndroidBufferQueueCallback callback = NULL; 130 void* pBufferContext, *pBufferData, *callbackPContext = NULL; 131 AdvancedBufferHeader *oldFront = NULL; 132 uint32_t dataSize /* , dataUsed */; 133 134 // retrieve data from the buffer queue 135 interface_lock_exclusive(mAndroidBufferQueue); 136 137 // can this read operation cause us to call the buffer queue callback 138 // (either because there was a command with no data, or all the data has been consumed) 139 bool queueCallbackCandidate = false; 140 141 if (mAndroidBufferQueue->mState.count != 0) { 142 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize); 143 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear); 144 145 oldFront = mAndroidBufferQueue->mFront; 146 AdvancedBufferHeader *newFront = &oldFront[1]; 147 148 // consume events when starting to read data from a buffer for the first time 149 if (oldFront->mDataSizeConsumed == 0) { 150 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue 151 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) { 152 receivedCmd_l(IStreamListener::EOS); 153 // EOS has no associated data 154 queueCallbackCandidate = true; 155 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) { 156 receivedCmd_l(IStreamListener::DISCONTINUITY); 157 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) { 158 sp<AMessage> msg = new AMessage(); 159 msg->setInt64(IStreamListener::kKeyResumeAtPTS, 160 (int64_t)oldFront->mItems.mTsCmdData.mPts); 161 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 162 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_FORMAT_CHANGE) { 163 sp<AMessage> msg = new AMessage(); 164 // positive value for format change key makes the discontinuity "hard", see key def 165 msg->setInt32( 166 IStreamListener::kKeyDiscontinuityMask, 167 ATSParser::DISCONTINUITY_FORMATCHANGE); 168 169 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 170 } 171 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY | 172 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE)) { 173 const sp<StreamPlayer> player(mPlayer.promote()); 174 if (player != NULL) { 175 // FIXME see note at onSeek 176 player->seek(ANDROID_UNKNOWN_TIME); 177 } 178 } 179 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE; 180 } 181 182 { 183 // we're going to change the shared mem buffer queue, so lock it 184 Mutex::Autolock _l(mLock); 185 if (!mAvailableBuffers.empty()) { 186 bufferId = *mAvailableBuffers.begin(); 187 CHECK_LT(bufferId, mBuffers.size()); 188 sp<IMemory> mem = mBuffers.itemAt(bufferId); 189 bufferLoc = mem->pointer(); 190 buffSize = mem->size(); 191 192 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed; 193 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) { 194 // more available than requested, copy as much as requested 195 // consume data: 1/ copy to given destination 196 memcpy(bufferLoc, pSrc, buffSize); 197 // 2/ keep track of how much has been consumed 198 oldFront->mDataSizeConsumed += buffSize; 199 // 3/ notify shared mem listener that new data is available 200 receivedBuffer_l(bufferId, buffSize); 201 mAvailableBuffers.erase(mAvailableBuffers.begin()); 202 } else { 203 // requested as much available or more: consume the whole of the current 204 // buffer and move to the next 205 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed; 206 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed); 207 oldFront->mDataSizeConsumed = oldFront->mDataSize; 208 209 // move queue to next 210 if (newFront == &mAndroidBufferQueue-> 211 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) { 212 // reached the end, circle back 213 newFront = mAndroidBufferQueue->mBufferArray; 214 } 215 mAndroidBufferQueue->mFront = newFront; 216 mAndroidBufferQueue->mState.count--; 217 mAndroidBufferQueue->mState.index++; 218 219 if (consumed > 0) { 220 // consume data: 1/ copy to given destination 221 memcpy(bufferLoc, pSrc, consumed); 222 // 2/ keep track of how much has been consumed 223 // here nothing to do because we are done with this buffer 224 // 3/ notify StreamPlayer that new data is available 225 receivedBuffer_l(bufferId, consumed); 226 mAvailableBuffers.erase(mAvailableBuffers.begin()); 227 } 228 229 // data has been consumed, and the buffer queue state has been updated 230 // we will notify the client if applicable 231 queueCallbackCandidate = true; 232 } 233 } 234 235 if (queueCallbackCandidate) { 236 if (mAndroidBufferQueue->mCallbackEventsMask & 237 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) { 238 callback = mAndroidBufferQueue->mCallback; 239 // save callback data while under lock 240 callbackPContext = mAndroidBufferQueue->mContext; 241 pBufferContext = (void *)oldFront->mBufferContext; 242 pBufferData = (void *)oldFront->mDataBuffer; 243 dataSize = oldFront->mDataSize; 244 // here a buffer is only dequeued when fully consumed 245 //dataUsed = oldFront->mDataSizeConsumed; 246 } 247 } 248 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size()); 249 if (!mAvailableBuffers.empty()) { 250 // there is still room in the shared memory, recheck later if we can pull 251 // data from the buffer queue and write it to shared memory 252 const sp<StreamPlayer> player(mPlayer.promote()); 253 if (player != NULL) { 254 player->queueRefilled(); 255 } 256 } 257 } 258 259 } else { // empty queue 260 SL_LOGD("ABQ empty, starving!"); 261 } 262 263 interface_unlock_exclusive(mAndroidBufferQueue); 264 265 // notify client of buffer processed 266 if (NULL != callback) { 267 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext, 268 pBufferContext, pBufferData, dataSize, 269 dataSize, /* dataUsed */ 270 // no messages during playback other than marking the buffer as processed 271 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */, 272 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ ); 273 if (SL_RESULT_SUCCESS != result) { 274 // Reserved for future use 275 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result); 276 } 277 } 278 279 mCallbackProtector->exitCb(); 280 } // enterCbIfOk 281} 282 283 284//-------------------------------------------------------------------------------------------------- 285StreamPlayer::StreamPlayer(AudioPlayback_Parameters* params, bool hasVideo, 286 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) : 287 GenericMediaPlayer(params, hasVideo), 288 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)), 289 mStopForDestroyCompleted(false) 290{ 291 SL_LOGD("StreamPlayer::StreamPlayer()"); 292 293 mPlaybackParams = *params; 294 295} 296 297StreamPlayer::~StreamPlayer() { 298 SL_LOGD("StreamPlayer::~StreamPlayer()"); 299 mAppProxy->disconnect(); 300} 301 302 303void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) { 304 switch (msg->what()) { 305 case kWhatPullFromAbq: 306 onPullFromAndroidBufferQueue(); 307 break; 308 309 case kWhatStopForDestroy: 310 onStopForDestroy(); 311 break; 312 313 default: 314 GenericMediaPlayer::onMessageReceived(msg); 315 break; 316 } 317} 318 319 320void StreamPlayer::preDestroy() { 321 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper 322 (new AMessage(kWhatStopForDestroy, id()))->post(); 323 { 324 Mutex::Autolock _l(mStopForDestroyLock); 325 while (!mStopForDestroyCompleted) { 326 mStopForDestroyCondition.wait(mStopForDestroyLock); 327 } 328 } 329 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign 330 GenericMediaPlayer::preDestroy(); 331} 332 333 334void StreamPlayer::onStopForDestroy() { 335 if (mPlayer != 0) { 336 mPlayer->stop(); 337 // causes CHECK failure in Nuplayer 338 //mPlayer->setDataSource(NULL); 339 mPlayer->setVideoSurfaceTexture(NULL); 340 mPlayer->disconnect(); 341 mPlayer.clear(); 342 { 343 // FIXME ugh make this a method 344 Mutex::Autolock _l(mPreparedPlayerLock); 345 mPreparedPlayer.clear(); 346 } 347 } 348 { 349 Mutex::Autolock _l(mStopForDestroyLock); 350 mStopForDestroyCompleted = true; 351 } 352 mStopForDestroyCondition.signal(); 353} 354 355 356/** 357 * Asynchronously notify the player that the queue is ready to be pulled from. 358 */ 359void StreamPlayer::queueRefilled() { 360 // async notification that the ABQ was refilled: the player should pull from the ABQ, and 361 // and push to shared memory (to the media server) 362 (new AMessage(kWhatPullFromAbq, id()))->post(); 363} 364 365 366void StreamPlayer::appClear_l() { 367 // the user of StreamPlayer has cleared its AndroidBufferQueue: 368 // there's no clear() for the shared memory queue, so this is a no-op 369} 370 371 372//-------------------------------------------------- 373// Event handlers 374void StreamPlayer::onPrepare() { 375 SL_LOGD("StreamPlayer::onPrepare()"); 376 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService()); 377 if (mediaPlayerService != NULL) { 378 mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/, 379 mPlaybackParams.sessionId); 380 if (mPlayer == NULL) { 381 SL_LOGE("media player service failed to create player by app proxy"); 382 } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) { 383 SL_LOGE("setDataSource failed"); 384 mPlayer.clear(); 385 } 386 } 387 if (mPlayer == NULL) { 388 mStateFlags |= kFlagPreparedUnsuccessfully; 389 } 390 GenericMediaPlayer::onPrepare(); 391 SL_LOGD("StreamPlayer::onPrepare() done"); 392} 393 394 395void StreamPlayer::onPlay() { 396 SL_LOGD("StreamPlayer::onPlay()"); 397 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the 398 // player had starved the shared memory) 399 queueRefilled(); 400 401 GenericMediaPlayer::onPlay(); 402} 403 404 405void StreamPlayer::onPullFromAndroidBufferQueue() { 406 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()"); 407 mAppProxy->pullFromBuffQueue(); 408} 409 410} // namespace android 411