1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "Prefetcher" 18//#define LOG_NDEBUG 0 19#include <utils/Log.h> 20 21#include "include/Prefetcher.h" 22 23#include <media/stagefright/MediaBuffer.h> 24#include <media/stagefright/MediaDebug.h> 25#include <media/stagefright/MediaErrors.h> 26#include <media/stagefright/MediaSource.h> 27#include <media/stagefright/MetaData.h> 28#include <utils/List.h> 29 30namespace android { 31 32struct PrefetchedSource : public MediaSource { 33 PrefetchedSource( 34 size_t index, 35 const sp<MediaSource> &source); 36 37 virtual status_t start(MetaData *params); 38 virtual status_t stop(); 39 40 virtual status_t read( 41 MediaBuffer **buffer, const ReadOptions *options); 42 43 virtual sp<MetaData> getFormat(); 44 45protected: 46 virtual ~PrefetchedSource(); 47 48private: 49 friend struct Prefetcher; 50 51 Mutex mLock; 52 Condition mCondition; 53 54 sp<MediaSource> mSource; 55 size_t mIndex; 56 bool mStarted; 57 bool mReachedEOS; 58 status_t mFinalStatus; 59 int64_t mSeekTimeUs; 60 int64_t mCacheDurationUs; 61 size_t mCacheSizeBytes; 62 bool mPrefetcherStopped; 63 bool mCurrentlyPrefetching; 64 65 List<MediaBuffer *> mCachedBuffers; 66 67 // Returns true iff source is currently caching. 68 bool getCacheDurationUs(int64_t *durationUs, size_t *totalSize = NULL); 69 70 void updateCacheDuration_l(); 71 void clearCache_l(); 72 73 void cacheMore(); 74 void onPrefetcherStopped(); 75 76 PrefetchedSource(const PrefetchedSource &); 77 PrefetchedSource &operator=(const PrefetchedSource &); 78}; 79 80Prefetcher::Prefetcher() 81 : mDone(false), 82 mThreadExited(false) { 83 startThread(); 84} 85 86Prefetcher::~Prefetcher() { 87 stopThread(); 88} 89 90sp<MediaSource> Prefetcher::addSource(const sp<MediaSource> &source) { 91 Mutex::Autolock autoLock(mLock); 92 93 sp<PrefetchedSource> psource = 94 new PrefetchedSource(mSources.size(), source); 95 96 mSources.add(psource); 97 98 return psource; 99} 100 101void Prefetcher::startThread() { 102 mThreadExited = false; 103 mDone = false; 104 105 int res = androidCreateThreadEtc( 106 ThreadWrapper, this, "Prefetcher", 107 ANDROID_PRIORITY_DEFAULT, 0, &mThread); 108 109 CHECK_EQ(res, 1); 110} 111 112void Prefetcher::stopThread() { 113 Mutex::Autolock autoLock(mLock); 114 115 while (!mThreadExited) { 116 mDone = true; 117 mCondition.signal(); 118 mCondition.wait(mLock); 119 } 120} 121 122// static 123int Prefetcher::ThreadWrapper(void *me) { 124 static_cast<Prefetcher *>(me)->threadFunc(); 125 126 return 0; 127} 128 129// Cache at most 1 min for each source. 130static int64_t kMaxCacheDurationUs = 60 * 1000000ll; 131 132// At the same time cache at most 5MB per source. 133static size_t kMaxCacheSizeBytes = 5 * 1024 * 1024; 134 135// If the amount of cached data drops below this, 136// fill the cache up to the max duration again. 137static int64_t kLowWaterDurationUs = 5000000ll; 138 139void Prefetcher::threadFunc() { 140 bool fillingCache = false; 141 142 for (;;) { 143 sp<PrefetchedSource> minSource; 144 int64_t minCacheDurationUs = -1; 145 146 { 147 Mutex::Autolock autoLock(mLock); 148 if (mDone) { 149 break; 150 } 151 152 mCondition.waitRelative( 153 mLock, fillingCache ? 1ll : 1000000000ll); 154 155 156 ssize_t minIndex = -1; 157 for (size_t i = 0; i < mSources.size(); ++i) { 158 sp<PrefetchedSource> source = mSources[i].promote(); 159 160 if (source == NULL) { 161 continue; 162 } 163 164 int64_t cacheDurationUs; 165 size_t cacheSizeBytes; 166 if (!source->getCacheDurationUs(&cacheDurationUs, &cacheSizeBytes)) { 167 continue; 168 } 169 170 if (cacheSizeBytes > kMaxCacheSizeBytes) { 171 LOGI("max cache size reached"); 172 continue; 173 } 174 175 if (mSources.size() > 1 && cacheDurationUs >= kMaxCacheDurationUs) { 176 LOGI("max duration reached, size = %d bytes", cacheSizeBytes); 177 continue; 178 } 179 180 if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) { 181 minCacheDurationUs = cacheDurationUs; 182 minIndex = i; 183 minSource = source; 184 } 185 } 186 187 if (minIndex < 0) { 188 if (fillingCache) { 189 LOGV("[%p] done filling the cache, above high water mark.", 190 this); 191 fillingCache = false; 192 } 193 continue; 194 } 195 } 196 197 if (!fillingCache && minCacheDurationUs < kLowWaterDurationUs) { 198 LOGI("[%p] cache below low water mark, filling cache.", this); 199 fillingCache = true; 200 } 201 202 if (fillingCache) { 203 // Make sure not to hold the lock while calling into the source. 204 // The lock guards the list of sources, not the individual sources 205 // themselves. 206 minSource->cacheMore(); 207 } 208 } 209 210 Mutex::Autolock autoLock(mLock); 211 for (size_t i = 0; i < mSources.size(); ++i) { 212 sp<PrefetchedSource> source = mSources[i].promote(); 213 214 if (source == NULL) { 215 continue; 216 } 217 218 source->onPrefetcherStopped(); 219 } 220 221 mThreadExited = true; 222 mCondition.signal(); 223} 224 225int64_t Prefetcher::getCachedDurationUs(bool *noMoreData) { 226 Mutex::Autolock autoLock(mLock); 227 228 int64_t minCacheDurationUs = -1; 229 ssize_t minIndex = -1; 230 bool anySourceActive = false; 231 for (size_t i = 0; i < mSources.size(); ++i) { 232 int64_t cacheDurationUs; 233 sp<PrefetchedSource> source = mSources[i].promote(); 234 if (source == NULL) { 235 continue; 236 } 237 238 if (source->getCacheDurationUs(&cacheDurationUs)) { 239 anySourceActive = true; 240 } 241 242 if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) { 243 minCacheDurationUs = cacheDurationUs; 244 minIndex = i; 245 } 246 } 247 248 if (noMoreData) { 249 *noMoreData = !anySourceActive; 250 } 251 252 return minCacheDurationUs < 0 ? 0 : minCacheDurationUs; 253} 254 255status_t Prefetcher::prepare( 256 bool (*continueFunc)(void *cookie), void *cookie) { 257 // Fill the cache. 258 259 int64_t duration; 260 bool noMoreData; 261 do { 262 usleep(100000); 263 264 if (continueFunc && !(*continueFunc)(cookie)) { 265 return -EINTR; 266 } 267 268 duration = getCachedDurationUs(&noMoreData); 269 } while (!noMoreData && duration < 2000000ll); 270 271 return OK; 272} 273 274//////////////////////////////////////////////////////////////////////////////// 275 276PrefetchedSource::PrefetchedSource( 277 size_t index, 278 const sp<MediaSource> &source) 279 : mSource(source), 280 mIndex(index), 281 mStarted(false), 282 mReachedEOS(false), 283 mSeekTimeUs(0), 284 mCacheDurationUs(0), 285 mCacheSizeBytes(0), 286 mPrefetcherStopped(false), 287 mCurrentlyPrefetching(false) { 288} 289 290PrefetchedSource::~PrefetchedSource() { 291 if (mStarted) { 292 stop(); 293 } 294} 295 296status_t PrefetchedSource::start(MetaData *params) { 297 CHECK(!mStarted); 298 299 Mutex::Autolock autoLock(mLock); 300 301 status_t err = mSource->start(params); 302 303 if (err != OK) { 304 return err; 305 } 306 307 mStarted = true; 308 309 return OK; 310} 311 312status_t PrefetchedSource::stop() { 313 CHECK(mStarted); 314 315 Mutex::Autolock autoLock(mLock); 316 317 while (mCurrentlyPrefetching) { 318 mCondition.wait(mLock); 319 } 320 321 clearCache_l(); 322 323 status_t err = mSource->stop(); 324 325 mStarted = false; 326 327 return err; 328} 329 330status_t PrefetchedSource::read( 331 MediaBuffer **out, const ReadOptions *options) { 332 *out = NULL; 333 334 Mutex::Autolock autoLock(mLock); 335 336 CHECK(mStarted); 337 338 int64_t seekTimeUs; 339 if (options && options->getSeekTo(&seekTimeUs)) { 340 CHECK(seekTimeUs >= 0); 341 342 clearCache_l(); 343 344 mReachedEOS = false; 345 mSeekTimeUs = seekTimeUs; 346 } 347 348 while (!mPrefetcherStopped && !mReachedEOS && mCachedBuffers.empty()) { 349 mCondition.wait(mLock); 350 } 351 352 if (mCachedBuffers.empty()) { 353 return mReachedEOS ? mFinalStatus : ERROR_END_OF_STREAM; 354 } 355 356 *out = *mCachedBuffers.begin(); 357 mCachedBuffers.erase(mCachedBuffers.begin()); 358 updateCacheDuration_l(); 359 mCacheSizeBytes -= (*out)->size(); 360 361 return OK; 362} 363 364sp<MetaData> PrefetchedSource::getFormat() { 365 return mSource->getFormat(); 366} 367 368bool PrefetchedSource::getCacheDurationUs( 369 int64_t *durationUs, size_t *totalSize) { 370 Mutex::Autolock autoLock(mLock); 371 372 *durationUs = mCacheDurationUs; 373 if (totalSize != NULL) { 374 *totalSize = mCacheSizeBytes; 375 } 376 377 if (!mStarted || mReachedEOS) { 378 return false; 379 } 380 381 return true; 382} 383 384void PrefetchedSource::cacheMore() { 385 MediaSource::ReadOptions options; 386 387 Mutex::Autolock autoLock(mLock); 388 389 if (!mStarted) { 390 return; 391 } 392 393 mCurrentlyPrefetching = true; 394 395 if (mSeekTimeUs >= 0) { 396 options.setSeekTo(mSeekTimeUs); 397 mSeekTimeUs = -1; 398 } 399 400 // Ensure our object does not go away while we're not holding 401 // the lock. 402 sp<PrefetchedSource> me = this; 403 404 mLock.unlock(); 405 MediaBuffer *buffer; 406 status_t err = mSource->read(&buffer, &options); 407 mLock.lock(); 408 409 if (err != OK) { 410 mCurrentlyPrefetching = false; 411 mReachedEOS = true; 412 mFinalStatus = err; 413 mCondition.signal(); 414 415 return; 416 } 417 418 CHECK(buffer != NULL); 419 420 MediaBuffer *copy = new MediaBuffer(buffer->range_length()); 421 memcpy(copy->data(), 422 (const uint8_t *)buffer->data() + buffer->range_offset(), 423 buffer->range_length()); 424 425 sp<MetaData> from = buffer->meta_data(); 426 sp<MetaData> to = copy->meta_data(); 427 428 int64_t timeUs; 429 if (from->findInt64(kKeyTime, &timeUs)) { 430 to->setInt64(kKeyTime, timeUs); 431 } 432 433 buffer->release(); 434 buffer = NULL; 435 436 mCachedBuffers.push_back(copy); 437 updateCacheDuration_l(); 438 mCacheSizeBytes += copy->size(); 439 440 mCurrentlyPrefetching = false; 441 mCondition.signal(); 442} 443 444void PrefetchedSource::updateCacheDuration_l() { 445 if (mCachedBuffers.size() < 2) { 446 mCacheDurationUs = 0; 447 } else { 448 int64_t firstTimeUs, lastTimeUs; 449 CHECK((*mCachedBuffers.begin())->meta_data()->findInt64( 450 kKeyTime, &firstTimeUs)); 451 CHECK((*--mCachedBuffers.end())->meta_data()->findInt64( 452 kKeyTime, &lastTimeUs)); 453 454 mCacheDurationUs = lastTimeUs - firstTimeUs; 455 } 456} 457 458void PrefetchedSource::clearCache_l() { 459 List<MediaBuffer *>::iterator it = mCachedBuffers.begin(); 460 while (it != mCachedBuffers.end()) { 461 (*it)->release(); 462 463 it = mCachedBuffers.erase(it); 464 } 465 466 updateCacheDuration_l(); 467 mCacheSizeBytes = 0; 468} 469 470void PrefetchedSource::onPrefetcherStopped() { 471 Mutex::Autolock autoLock(mLock); 472 mPrefetcherStopped = true; 473 mCondition.signal(); 474} 475 476} // namespace android 477