NuCachedSource2.cpp revision 1b86fe063badb5f28c467ade39be0f4008688947
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NuCachedSource2" 19#include <utils/Log.h> 20 21#include "include/NuCachedSource2.h" 22#include "include/HTTPBase.h" 23 24#include <cutils/properties.h> 25#include <media/stagefright/foundation/ADebug.h> 26#include <media/stagefright/foundation/AMessage.h> 27#include <media/stagefright/MediaErrors.h> 28 29namespace android { 30 31struct PageCache { 32 PageCache(size_t pageSize); 33 ~PageCache(); 34 35 struct Page { 36 void *mData; 37 size_t mSize; 38 }; 39 40 Page *acquirePage(); 41 void releasePage(Page *page); 42 43 void appendPage(Page *page); 44 size_t releaseFromStart(size_t maxBytes); 45 46 size_t totalSize() const { 47 return mTotalSize; 48 } 49 50 void copy(size_t from, void *data, size_t size); 51 52private: 53 size_t mPageSize; 54 size_t mTotalSize; 55 56 List<Page *> mActivePages; 57 List<Page *> mFreePages; 58 59 void freePages(List<Page *> *list); 60 61 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 62}; 63 64PageCache::PageCache(size_t pageSize) 65 : mPageSize(pageSize), 66 mTotalSize(0) { 67} 68 69PageCache::~PageCache() { 70 freePages(&mActivePages); 71 freePages(&mFreePages); 72} 73 74void PageCache::freePages(List<Page *> *list) { 75 List<Page *>::iterator it = list->begin(); 76 while (it != list->end()) { 77 Page *page = *it; 78 79 free(page->mData); 80 delete page; 81 page = NULL; 82 83 ++it; 84 } 85} 86 87PageCache::Page *PageCache::acquirePage() { 88 if (!mFreePages.empty()) { 89 List<Page *>::iterator it = mFreePages.begin(); 90 Page *page = *it; 91 mFreePages.erase(it); 92 93 return page; 94 } 95 96 Page *page = new Page; 97 page->mData = malloc(mPageSize); 98 page->mSize = 0; 99 100 return page; 101} 102 103void PageCache::releasePage(Page *page) { 104 page->mSize = 0; 105 mFreePages.push_back(page); 106} 107 108void PageCache::appendPage(Page *page) { 109 mTotalSize += page->mSize; 110 mActivePages.push_back(page); 111} 112 113size_t PageCache::releaseFromStart(size_t maxBytes) { 114 size_t bytesReleased = 0; 115 116 while (maxBytes > 0 && !mActivePages.empty()) { 117 List<Page *>::iterator it = mActivePages.begin(); 118 119 Page *page = *it; 120 121 if (maxBytes < page->mSize) { 122 break; 123 } 124 125 mActivePages.erase(it); 126 127 maxBytes -= page->mSize; 128 bytesReleased += page->mSize; 129 130 releasePage(page); 131 } 132 133 mTotalSize -= bytesReleased; 134 return bytesReleased; 135} 136 137void PageCache::copy(size_t from, void *data, size_t size) { 138 ALOGV("copy from %d size %d", from, size); 139 140 if (size == 0) { 141 return; 142 } 143 144 CHECK_LE(from + size, mTotalSize); 145 146 size_t offset = 0; 147 List<Page *>::iterator it = mActivePages.begin(); 148 while (from >= offset + (*it)->mSize) { 149 offset += (*it)->mSize; 150 ++it; 151 } 152 153 size_t delta = from - offset; 154 size_t avail = (*it)->mSize - delta; 155 156 if (avail >= size) { 157 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 158 return; 159 } 160 161 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 162 ++it; 163 data = (uint8_t *)data + avail; 164 size -= avail; 165 166 while (size > 0) { 167 size_t copy = (*it)->mSize; 168 if (copy > size) { 169 copy = size; 170 } 171 memcpy(data, (*it)->mData, copy); 172 data = (uint8_t *)data + copy; 173 size -= copy; 174 ++it; 175 } 176} 177 178//////////////////////////////////////////////////////////////////////////////// 179 180NuCachedSource2::NuCachedSource2( 181 const sp<DataSource> &source, 182 const char *cacheConfig, 183 bool disconnectAtHighwatermark) 184 : mSource(source), 185 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 186 mLooper(new ALooper), 187 mCache(new PageCache(kPageSize)), 188 mCacheOffset(0), 189 mFinalStatus(OK), 190 mLastAccessPos(0), 191 mFetching(true), 192 mLastFetchTimeUs(-1), 193 mNumRetriesLeft(kMaxNumRetries), 194 mHighwaterThresholdBytes(kDefaultHighWaterThreshold), 195 mLowwaterThresholdBytes(kDefaultLowWaterThreshold), 196 mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs), 197 mDisconnectAtHighwatermark(disconnectAtHighwatermark) { 198 // We are NOT going to support disconnect-at-highwatermark indefinitely 199 // and we are not guaranteeing support for client-specified cache 200 // parameters. Both of these are temporary measures to solve a specific 201 // problem that will be solved in a better way going forward. 202 203 updateCacheParamsFromSystemProperty(); 204 205 if (cacheConfig != NULL) { 206 updateCacheParamsFromString(cacheConfig); 207 } 208 209 if (mDisconnectAtHighwatermark) { 210 // Makes no sense to disconnect and do keep-alives... 211 mKeepAliveIntervalUs = 0; 212 } 213 214 mLooper->setName("NuCachedSource2"); 215 mLooper->registerHandler(mReflector); 216 217 // Since it may not be obvious why our looper thread needs to be 218 // able to call into java since it doesn't appear to do so at all... 219 // IMediaHTTPConnection may be (and most likely is) implemented in JAVA 220 // and a local JAVA IBinder will call directly into JNI methods. 221 // So whenever we call DataSource::readAt it may end up in a call to 222 // IMediaHTTPConnection::readAt and therefore call back into JAVA. 223 mLooper->start(false /* runOnCallingThread */, true /* canCallJava */); 224 225 Mutex::Autolock autoLock(mLock); 226 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 227} 228 229NuCachedSource2::~NuCachedSource2() { 230 mLooper->stop(); 231 mLooper->unregisterHandler(mReflector->id()); 232 233 delete mCache; 234 mCache = NULL; 235} 236 237status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 238 if (mSource->flags() & kIsHTTPBasedSource) { 239 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 240 return source->getEstimatedBandwidthKbps(kbps); 241 } 242 return ERROR_UNSUPPORTED; 243} 244 245status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 246 if (mSource->flags() & kIsHTTPBasedSource) { 247 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 248 return source->setBandwidthStatCollectFreq(freqMs); 249 } 250 return ERROR_UNSUPPORTED; 251} 252 253status_t NuCachedSource2::initCheck() const { 254 return mSource->initCheck(); 255} 256 257status_t NuCachedSource2::getSize(off64_t *size) { 258 return mSource->getSize(size); 259} 260 261uint32_t NuCachedSource2::flags() { 262 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 263 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 264 return (flags | kIsCachingDataSource); 265} 266 267void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 268 switch (msg->what()) { 269 case kWhatFetchMore: 270 { 271 onFetch(); 272 break; 273 } 274 275 case kWhatRead: 276 { 277 onRead(msg); 278 break; 279 } 280 281 default: 282 TRESPASS(); 283 } 284} 285 286void NuCachedSource2::fetchInternal() { 287 ALOGV("fetchInternal"); 288 289 bool reconnect = false; 290 291 { 292 Mutex::Autolock autoLock(mLock); 293 CHECK(mFinalStatus == OK || mNumRetriesLeft > 0); 294 295 if (mFinalStatus != OK) { 296 --mNumRetriesLeft; 297 298 reconnect = true; 299 } 300 } 301 302 if (reconnect) { 303 status_t err = 304 mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize()); 305 306 Mutex::Autolock autoLock(mLock); 307 308 if (err == ERROR_UNSUPPORTED || err == -EPIPE) { 309 // These are errors that are not likely to go away even if we 310 // retry, i.e. the server doesn't support range requests or similar. 311 mNumRetriesLeft = 0; 312 return; 313 } else if (err != OK) { 314 ALOGI("The attempt to reconnect failed, %d retries remaining", 315 mNumRetriesLeft); 316 317 return; 318 } 319 } 320 321 PageCache::Page *page = mCache->acquirePage(); 322 323 ssize_t n = mSource->readAt( 324 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 325 326 Mutex::Autolock autoLock(mLock); 327 328 if (n < 0) { 329 mFinalStatus = n; 330 if (n == ERROR_UNSUPPORTED || n == -EPIPE) { 331 // These are errors that are not likely to go away even if we 332 // retry, i.e. the server doesn't support range requests or similar. 333 mNumRetriesLeft = 0; 334 } 335 336 ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft); 337 mCache->releasePage(page); 338 } else if (n == 0) { 339 ALOGI("ERROR_END_OF_STREAM"); 340 341 mNumRetriesLeft = 0; 342 mFinalStatus = ERROR_END_OF_STREAM; 343 344 mCache->releasePage(page); 345 } else { 346 if (mFinalStatus != OK) { 347 ALOGI("retrying a previously failed read succeeded."); 348 } 349 mNumRetriesLeft = kMaxNumRetries; 350 mFinalStatus = OK; 351 352 page->mSize = n; 353 mCache->appendPage(page); 354 } 355} 356 357void NuCachedSource2::onFetch() { 358 ALOGV("onFetch"); 359 360 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 361 ALOGV("EOS reached, done prefetching for now"); 362 mFetching = false; 363 } 364 365 bool keepAlive = 366 !mFetching 367 && mFinalStatus == OK 368 && mKeepAliveIntervalUs > 0 369 && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs; 370 371 if (mFetching || keepAlive) { 372 if (keepAlive) { 373 ALOGI("Keep alive"); 374 } 375 376 fetchInternal(); 377 378 mLastFetchTimeUs = ALooper::GetNowUs(); 379 380 if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) { 381 ALOGI("Cache full, done prefetching for now"); 382 mFetching = false; 383 384 if (mDisconnectAtHighwatermark 385 && (mSource->flags() & DataSource::kIsHTTPBasedSource)) { 386 ALOGV("Disconnecting at high watermark"); 387 static_cast<HTTPBase *>(mSource.get())->disconnect(); 388 mFinalStatus = -EAGAIN; 389 } 390 } 391 } else { 392 Mutex::Autolock autoLock(mLock); 393 restartPrefetcherIfNecessary_l(); 394 } 395 396 int64_t delayUs; 397 if (mFetching) { 398 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 399 // We failed this time and will try again in 3 seconds. 400 delayUs = 3000000ll; 401 } else { 402 delayUs = 0; 403 } 404 } else { 405 delayUs = 100000ll; 406 } 407 408 (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs); 409} 410 411void NuCachedSource2::onRead(const sp<AMessage> &msg) { 412 ALOGV("onRead"); 413 414 int64_t offset; 415 CHECK(msg->findInt64("offset", &offset)); 416 417 void *data; 418 CHECK(msg->findPointer("data", &data)); 419 420 size_t size; 421 CHECK(msg->findSize("size", &size)); 422 423 ssize_t result = readInternal(offset, data, size); 424 425 if (result == -EAGAIN) { 426 msg->post(50000); 427 return; 428 } 429 430 Mutex::Autolock autoLock(mLock); 431 432 CHECK(mAsyncResult == NULL); 433 434 mAsyncResult = new AMessage; 435 mAsyncResult->setInt32("result", result); 436 437 mCondition.signal(); 438} 439 440void NuCachedSource2::restartPrefetcherIfNecessary_l( 441 bool ignoreLowWaterThreshold, bool force) { 442 static const size_t kGrayArea = 1024 * 1024; 443 444 if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) { 445 return; 446 } 447 448 if (!ignoreLowWaterThreshold && !force 449 && mCacheOffset + mCache->totalSize() - mLastAccessPos 450 >= mLowwaterThresholdBytes) { 451 return; 452 } 453 454 size_t maxBytes = mLastAccessPos - mCacheOffset; 455 456 if (!force) { 457 if (maxBytes < kGrayArea) { 458 return; 459 } 460 461 maxBytes -= kGrayArea; 462 } 463 464 size_t actualBytes = mCache->releaseFromStart(maxBytes); 465 mCacheOffset += actualBytes; 466 467 ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 468 mFetching = true; 469} 470 471ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 472 Mutex::Autolock autoSerializer(mSerializer); 473 474 ALOGV("readAt offset %lld, size %d", offset, size); 475 476 Mutex::Autolock autoLock(mLock); 477 478 // If the request can be completely satisfied from the cache, do so. 479 480 if (offset >= mCacheOffset 481 && offset + size <= mCacheOffset + mCache->totalSize()) { 482 size_t delta = offset - mCacheOffset; 483 mCache->copy(delta, data, size); 484 485 mLastAccessPos = offset + size; 486 487 return size; 488 } 489 490 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 491 msg->setInt64("offset", offset); 492 msg->setPointer("data", data); 493 msg->setSize("size", size); 494 495 CHECK(mAsyncResult == NULL); 496 msg->post(); 497 498 while (mAsyncResult == NULL) { 499 mCondition.wait(mLock); 500 } 501 502 int32_t result; 503 CHECK(mAsyncResult->findInt32("result", &result)); 504 505 mAsyncResult.clear(); 506 507 if (result > 0) { 508 mLastAccessPos = offset + result; 509 } 510 511 return (ssize_t)result; 512} 513 514size_t NuCachedSource2::cachedSize() { 515 Mutex::Autolock autoLock(mLock); 516 return mCacheOffset + mCache->totalSize(); 517} 518 519size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const { 520 Mutex::Autolock autoLock(mLock); 521 return approxDataRemaining_l(finalStatus); 522} 523 524size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const { 525 *finalStatus = mFinalStatus; 526 527 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 528 // Pretend that everything is fine until we're out of retries. 529 *finalStatus = OK; 530 } 531 532 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 533 if (mLastAccessPos < lastBytePosCached) { 534 return lastBytePosCached - mLastAccessPos; 535 } 536 return 0; 537} 538 539ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 540 CHECK_LE(size, (size_t)mHighwaterThresholdBytes); 541 542 ALOGV("readInternal offset %lld size %d", offset, size); 543 544 Mutex::Autolock autoLock(mLock); 545 546 if (!mFetching) { 547 mLastAccessPos = offset; 548 restartPrefetcherIfNecessary_l( 549 false, // ignoreLowWaterThreshold 550 true); // force 551 } 552 553 if (offset < mCacheOffset 554 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 555 static const off64_t kPadding = 256 * 1024; 556 557 // In the presence of multiple decoded streams, once of them will 558 // trigger this seek request, the other one will request data "nearby" 559 // soon, adjust the seek position so that that subsequent request 560 // does not trigger another seek. 561 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 562 563 seekInternal_l(seekOffset); 564 } 565 566 size_t delta = offset - mCacheOffset; 567 568 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 569 if (delta >= mCache->totalSize()) { 570 return mFinalStatus; 571 } 572 573 size_t avail = mCache->totalSize() - delta; 574 575 if (avail > size) { 576 avail = size; 577 } 578 579 mCache->copy(delta, data, avail); 580 581 return avail; 582 } 583 584 if (offset + size <= mCacheOffset + mCache->totalSize()) { 585 mCache->copy(delta, data, size); 586 587 return size; 588 } 589 590 ALOGV("deferring read"); 591 592 return -EAGAIN; 593} 594 595status_t NuCachedSource2::seekInternal_l(off64_t offset) { 596 mLastAccessPos = offset; 597 598 if (offset >= mCacheOffset 599 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 600 return OK; 601 } 602 603 ALOGI("new range: offset= %lld", offset); 604 605 mCacheOffset = offset; 606 607 size_t totalSize = mCache->totalSize(); 608 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 609 610 mNumRetriesLeft = kMaxNumRetries; 611 mFetching = true; 612 613 return OK; 614} 615 616void NuCachedSource2::resumeFetchingIfNecessary() { 617 Mutex::Autolock autoLock(mLock); 618 619 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 620} 621 622sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) { 623 return mSource->DrmInitialization(mime); 624} 625 626void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 627 mSource->getDrmInfo(handle, client); 628} 629 630String8 NuCachedSource2::getUri() { 631 return mSource->getUri(); 632} 633 634String8 NuCachedSource2::getMIMEType() const { 635 return mSource->getMIMEType(); 636} 637 638void NuCachedSource2::updateCacheParamsFromSystemProperty() { 639 char value[PROPERTY_VALUE_MAX]; 640 if (!property_get("media.stagefright.cache-params", value, NULL)) { 641 return; 642 } 643 644 updateCacheParamsFromString(value); 645} 646 647void NuCachedSource2::updateCacheParamsFromString(const char *s) { 648 ssize_t lowwaterMarkKb, highwaterMarkKb; 649 int keepAliveSecs; 650 651 if (sscanf(s, "%ld/%ld/%d", 652 &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) { 653 ALOGE("Failed to parse cache parameters from '%s'.", s); 654 return; 655 } 656 657 if (lowwaterMarkKb >= 0) { 658 mLowwaterThresholdBytes = lowwaterMarkKb * 1024; 659 } else { 660 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 661 } 662 663 if (highwaterMarkKb >= 0) { 664 mHighwaterThresholdBytes = highwaterMarkKb * 1024; 665 } else { 666 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 667 } 668 669 if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) { 670 ALOGE("Illegal low/highwater marks specified, reverting to defaults."); 671 672 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 673 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 674 } 675 676 if (keepAliveSecs >= 0) { 677 mKeepAliveIntervalUs = keepAliveSecs * 1000000ll; 678 } else { 679 mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs; 680 } 681 682 ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us", 683 mLowwaterThresholdBytes, 684 mHighwaterThresholdBytes, 685 mKeepAliveIntervalUs); 686} 687 688// static 689void NuCachedSource2::RemoveCacheSpecificHeaders( 690 KeyedVector<String8, String8> *headers, 691 String8 *cacheConfig, 692 bool *disconnectAtHighwatermark) { 693 *cacheConfig = String8(); 694 *disconnectAtHighwatermark = false; 695 696 if (headers == NULL) { 697 return; 698 } 699 700 ssize_t index; 701 if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) { 702 *cacheConfig = headers->valueAt(index); 703 704 headers->removeItemsAt(index); 705 706 ALOGV("Using special cache config '%s'", cacheConfig->string()); 707 } 708 709 if ((index = headers->indexOfKey( 710 String8("x-disconnect-at-highwatermark"))) >= 0) { 711 *disconnectAtHighwatermark = true; 712 headers->removeItemsAt(index); 713 714 ALOGV("Client requested disconnection at highwater mark"); 715 } 716} 717 718} // namespace android 719