NuCachedSource2.cpp revision a045cb0e77097120e86e367e1cab5494ce2a5d5e
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NuCachedSource2" 19#include <utils/Log.h> 20 21#include "include/NuCachedSource2.h" 22#include "include/HTTPBase.h" 23 24#include <cutils/properties.h> 25#include <media/stagefright/foundation/ADebug.h> 26#include <media/stagefright/foundation/AMessage.h> 27#include <media/stagefright/MediaErrors.h> 28 29namespace android { 30 31struct PageCache { 32 PageCache(size_t pageSize); 33 ~PageCache(); 34 35 struct Page { 36 void *mData; 37 size_t mSize; 38 }; 39 40 Page *acquirePage(); 41 void releasePage(Page *page); 42 43 void appendPage(Page *page); 44 size_t releaseFromStart(size_t maxBytes); 45 46 size_t totalSize() const { 47 return mTotalSize; 48 } 49 50 void copy(size_t from, void *data, size_t size); 51 52private: 53 size_t mPageSize; 54 size_t mTotalSize; 55 56 List<Page *> mActivePages; 57 List<Page *> mFreePages; 58 59 void freePages(List<Page *> *list); 60 61 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 62}; 63 64PageCache::PageCache(size_t pageSize) 65 : mPageSize(pageSize), 66 mTotalSize(0) { 67} 68 69PageCache::~PageCache() { 70 freePages(&mActivePages); 71 freePages(&mFreePages); 72} 73 74void PageCache::freePages(List<Page *> *list) { 75 List<Page *>::iterator it = list->begin(); 76 while (it != list->end()) { 77 Page *page = *it; 78 79 free(page->mData); 80 delete page; 81 page = NULL; 82 83 ++it; 84 } 85} 86 87PageCache::Page *PageCache::acquirePage() { 88 if (!mFreePages.empty()) { 89 List<Page *>::iterator it = mFreePages.begin(); 90 Page *page = *it; 91 mFreePages.erase(it); 92 93 return page; 94 } 95 96 Page *page = new Page; 97 page->mData = malloc(mPageSize); 98 page->mSize = 0; 99 100 return page; 101} 102 103void PageCache::releasePage(Page *page) { 104 page->mSize = 0; 105 mFreePages.push_back(page); 106} 107 108void PageCache::appendPage(Page *page) { 109 mTotalSize += page->mSize; 110 mActivePages.push_back(page); 111} 112 113size_t PageCache::releaseFromStart(size_t maxBytes) { 114 size_t bytesReleased = 0; 115 116 while (maxBytes > 0 && !mActivePages.empty()) { 117 List<Page *>::iterator it = mActivePages.begin(); 118 119 Page *page = *it; 120 121 if (maxBytes < page->mSize) { 122 break; 123 } 124 125 mActivePages.erase(it); 126 127 maxBytes -= page->mSize; 128 bytesReleased += page->mSize; 129 130 releasePage(page); 131 } 132 133 mTotalSize -= bytesReleased; 134 return bytesReleased; 135} 136 137void PageCache::copy(size_t from, void *data, size_t size) { 138 LOGV("copy from %d size %d", from, size); 139 140 if (size == 0) { 141 return; 142 } 143 144 CHECK_LE(from + size, mTotalSize); 145 146 size_t offset = 0; 147 List<Page *>::iterator it = mActivePages.begin(); 148 while (from >= offset + (*it)->mSize) { 149 offset += (*it)->mSize; 150 ++it; 151 } 152 153 size_t delta = from - offset; 154 size_t avail = (*it)->mSize - delta; 155 156 if (avail >= size) { 157 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 158 return; 159 } 160 161 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 162 ++it; 163 data = (uint8_t *)data + avail; 164 size -= avail; 165 166 while (size > 0) { 167 size_t copy = (*it)->mSize; 168 if (copy > size) { 169 copy = size; 170 } 171 memcpy(data, (*it)->mData, copy); 172 data = (uint8_t *)data + copy; 173 size -= copy; 174 ++it; 175 } 176} 177 178//////////////////////////////////////////////////////////////////////////////// 179 180NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 181 : mSource(source), 182 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 183 mLooper(new ALooper), 184 mCache(new PageCache(kPageSize)), 185 mCacheOffset(0), 186 mFinalStatus(OK), 187 mLastAccessPos(0), 188 mFetching(true), 189 mLastFetchTimeUs(-1), 190 mNumRetriesLeft(kMaxNumRetries), 191 mHighwaterThresholdBytes(kDefaultHighWaterThreshold), 192 mLowwaterThresholdBytes(kDefaultLowWaterThreshold), 193 mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs) { 194 updateCacheParamsFromSystemProperty(); 195 196 mLooper->setName("NuCachedSource2"); 197 mLooper->registerHandler(mReflector); 198 mLooper->start(); 199 200 Mutex::Autolock autoLock(mLock); 201 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 202} 203 204NuCachedSource2::~NuCachedSource2() { 205 mLooper->stop(); 206 mLooper->unregisterHandler(mReflector->id()); 207 208 delete mCache; 209 mCache = NULL; 210} 211 212status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 213 if (mSource->flags() & kIsHTTPBasedSource) { 214 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 215 return source->getEstimatedBandwidthKbps(kbps); 216 } 217 return ERROR_UNSUPPORTED; 218} 219 220status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 221 if (mSource->flags() & kIsHTTPBasedSource) { 222 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 223 return source->setBandwidthStatCollectFreq(freqMs); 224 } 225 return ERROR_UNSUPPORTED; 226} 227 228status_t NuCachedSource2::initCheck() const { 229 return mSource->initCheck(); 230} 231 232status_t NuCachedSource2::getSize(off64_t *size) { 233 return mSource->getSize(size); 234} 235 236uint32_t NuCachedSource2::flags() { 237 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 238 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 239 return (flags | kIsCachingDataSource); 240} 241 242void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 243 switch (msg->what()) { 244 case kWhatFetchMore: 245 { 246 onFetch(); 247 break; 248 } 249 250 case kWhatRead: 251 { 252 onRead(msg); 253 break; 254 } 255 256 default: 257 TRESPASS(); 258 } 259} 260 261void NuCachedSource2::fetchInternal() { 262 LOGV("fetchInternal"); 263 264 { 265 Mutex::Autolock autoLock(mLock); 266 CHECK(mFinalStatus == OK || mNumRetriesLeft > 0); 267 268 if (mFinalStatus != OK) { 269 --mNumRetriesLeft; 270 271 status_t err = 272 mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize()); 273 274 if (err == ERROR_UNSUPPORTED) { 275 mNumRetriesLeft = 0; 276 return; 277 } else if (err != OK) { 278 LOGI("The attempt to reconnect failed, %d retries remaining", 279 mNumRetriesLeft); 280 281 return; 282 } 283 } 284 } 285 286 PageCache::Page *page = mCache->acquirePage(); 287 288 ssize_t n = mSource->readAt( 289 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 290 291 Mutex::Autolock autoLock(mLock); 292 293 if (n < 0) { 294 LOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft); 295 mFinalStatus = n; 296 mCache->releasePage(page); 297 } else if (n == 0) { 298 LOGI("ERROR_END_OF_STREAM"); 299 300 mNumRetriesLeft = 0; 301 mFinalStatus = ERROR_END_OF_STREAM; 302 303 mCache->releasePage(page); 304 } else { 305 if (mFinalStatus != OK) { 306 LOGI("retrying a previously failed read succeeded."); 307 } 308 mNumRetriesLeft = kMaxNumRetries; 309 mFinalStatus = OK; 310 311 page->mSize = n; 312 mCache->appendPage(page); 313 } 314} 315 316void NuCachedSource2::onFetch() { 317 LOGV("onFetch"); 318 319 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 320 LOGV("EOS reached, done prefetching for now"); 321 mFetching = false; 322 } 323 324 bool keepAlive = 325 !mFetching 326 && mFinalStatus == OK 327 && mKeepAliveIntervalUs > 0 328 && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs; 329 330 if (mFetching || keepAlive) { 331 if (keepAlive) { 332 LOGI("Keep alive"); 333 } 334 335 fetchInternal(); 336 337 mLastFetchTimeUs = ALooper::GetNowUs(); 338 339 if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) { 340 LOGI("Cache full, done prefetching for now"); 341 mFetching = false; 342 } 343 } else { 344 Mutex::Autolock autoLock(mLock); 345 restartPrefetcherIfNecessary_l(); 346 } 347 348 int64_t delayUs; 349 if (mFetching) { 350 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 351 // We failed this time and will try again in 3 seconds. 352 delayUs = 3000000ll; 353 } else { 354 delayUs = 0; 355 } 356 } else { 357 delayUs = 100000ll; 358 } 359 360 (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs); 361} 362 363void NuCachedSource2::onRead(const sp<AMessage> &msg) { 364 LOGV("onRead"); 365 366 int64_t offset; 367 CHECK(msg->findInt64("offset", &offset)); 368 369 void *data; 370 CHECK(msg->findPointer("data", &data)); 371 372 size_t size; 373 CHECK(msg->findSize("size", &size)); 374 375 ssize_t result = readInternal(offset, data, size); 376 377 if (result == -EAGAIN) { 378 msg->post(50000); 379 return; 380 } 381 382 Mutex::Autolock autoLock(mLock); 383 384 CHECK(mAsyncResult == NULL); 385 386 mAsyncResult = new AMessage; 387 mAsyncResult->setInt32("result", result); 388 389 mCondition.signal(); 390} 391 392void NuCachedSource2::restartPrefetcherIfNecessary_l( 393 bool ignoreLowWaterThreshold, bool force) { 394 static const size_t kGrayArea = 1024 * 1024; 395 396 if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) { 397 return; 398 } 399 400 if (!ignoreLowWaterThreshold && !force 401 && mCacheOffset + mCache->totalSize() - mLastAccessPos 402 >= mLowwaterThresholdBytes) { 403 return; 404 } 405 406 size_t maxBytes = mLastAccessPos - mCacheOffset; 407 408 if (!force) { 409 if (maxBytes < kGrayArea) { 410 return; 411 } 412 413 maxBytes -= kGrayArea; 414 } 415 416 size_t actualBytes = mCache->releaseFromStart(maxBytes); 417 mCacheOffset += actualBytes; 418 419 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 420 mFetching = true; 421} 422 423ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 424 Mutex::Autolock autoSerializer(mSerializer); 425 426 LOGV("readAt offset %lld, size %d", offset, size); 427 428 Mutex::Autolock autoLock(mLock); 429 430 // If the request can be completely satisfied from the cache, do so. 431 432 if (offset >= mCacheOffset 433 && offset + size <= mCacheOffset + mCache->totalSize()) { 434 size_t delta = offset - mCacheOffset; 435 mCache->copy(delta, data, size); 436 437 mLastAccessPos = offset + size; 438 439 return size; 440 } 441 442 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 443 msg->setInt64("offset", offset); 444 msg->setPointer("data", data); 445 msg->setSize("size", size); 446 447 CHECK(mAsyncResult == NULL); 448 msg->post(); 449 450 while (mAsyncResult == NULL) { 451 mCondition.wait(mLock); 452 } 453 454 int32_t result; 455 CHECK(mAsyncResult->findInt32("result", &result)); 456 457 mAsyncResult.clear(); 458 459 if (result > 0) { 460 mLastAccessPos = offset + result; 461 } 462 463 return (ssize_t)result; 464} 465 466size_t NuCachedSource2::cachedSize() { 467 Mutex::Autolock autoLock(mLock); 468 return mCacheOffset + mCache->totalSize(); 469} 470 471size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) { 472 Mutex::Autolock autoLock(mLock); 473 return approxDataRemaining_l(finalStatus); 474} 475 476size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) { 477 *finalStatus = mFinalStatus; 478 479 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 480 // Pretend that everything is fine until we're out of retries. 481 *finalStatus = OK; 482 } 483 484 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 485 if (mLastAccessPos < lastBytePosCached) { 486 return lastBytePosCached - mLastAccessPos; 487 } 488 return 0; 489} 490 491ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 492 CHECK_LE(size, (size_t)mHighwaterThresholdBytes); 493 494 LOGV("readInternal offset %lld size %d", offset, size); 495 496 Mutex::Autolock autoLock(mLock); 497 498 if (!mFetching) { 499 mLastAccessPos = offset; 500 restartPrefetcherIfNecessary_l( 501 false, // ignoreLowWaterThreshold 502 true); // force 503 } 504 505 if (offset < mCacheOffset 506 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 507 static const off64_t kPadding = 256 * 1024; 508 509 // In the presence of multiple decoded streams, once of them will 510 // trigger this seek request, the other one will request data "nearby" 511 // soon, adjust the seek position so that that subsequent request 512 // does not trigger another seek. 513 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 514 515 seekInternal_l(seekOffset); 516 } 517 518 size_t delta = offset - mCacheOffset; 519 520 if (mFinalStatus != OK) { 521 if (delta >= mCache->totalSize()) { 522 return mFinalStatus; 523 } 524 525 size_t avail = mCache->totalSize() - delta; 526 527 if (avail > size) { 528 avail = size; 529 } 530 531 mCache->copy(delta, data, avail); 532 533 return avail; 534 } 535 536 if (offset + size <= mCacheOffset + mCache->totalSize()) { 537 mCache->copy(delta, data, size); 538 539 return size; 540 } 541 542 LOGV("deferring read"); 543 544 return -EAGAIN; 545} 546 547status_t NuCachedSource2::seekInternal_l(off64_t offset) { 548 mLastAccessPos = offset; 549 550 if (offset >= mCacheOffset 551 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 552 return OK; 553 } 554 555 LOGI("new range: offset= %lld", offset); 556 557 mCacheOffset = offset; 558 559 size_t totalSize = mCache->totalSize(); 560 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 561 562 mFinalStatus = OK; 563 mFetching = true; 564 565 return OK; 566} 567 568void NuCachedSource2::resumeFetchingIfNecessary() { 569 Mutex::Autolock autoLock(mLock); 570 571 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 572} 573 574sp<DecryptHandle> NuCachedSource2::DrmInitialization() { 575 return mSource->DrmInitialization(); 576} 577 578void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 579 mSource->getDrmInfo(handle, client); 580} 581 582String8 NuCachedSource2::getUri() { 583 return mSource->getUri(); 584} 585 586String8 NuCachedSource2::getMIMEType() const { 587 return mSource->getMIMEType(); 588} 589 590void NuCachedSource2::updateCacheParamsFromSystemProperty() { 591 char value[PROPERTY_VALUE_MAX]; 592 if (!property_get("media.stagefright.cache-params", value, NULL)) { 593 return; 594 } 595 596 updateCacheParamsFromString(value); 597} 598 599void NuCachedSource2::updateCacheParamsFromString(const char *s) { 600 ssize_t lowwaterMarkKb, highwaterMarkKb; 601 unsigned keepAliveSecs; 602 603 if (sscanf(s, "%ld/%ld/%u", 604 &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3 605 || lowwaterMarkKb >= highwaterMarkKb) { 606 LOGE("Failed to parse cache parameters from '%s'.", s); 607 return; 608 } 609 610 if (lowwaterMarkKb >= 0) { 611 mLowwaterThresholdBytes = lowwaterMarkKb * 1024; 612 } else { 613 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 614 } 615 616 if (highwaterMarkKb >= 0) { 617 mHighwaterThresholdBytes = highwaterMarkKb * 1024; 618 } else { 619 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 620 } 621 622 mKeepAliveIntervalUs = keepAliveSecs * 1000000ll; 623 624 LOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us", 625 mLowwaterThresholdBytes, 626 mHighwaterThresholdBytes, 627 mKeepAliveIntervalUs); 628} 629 630} // namespace android 631