NuCachedSource2.cpp revision 48296b792a8d68358de74141fa80bd5bd84d0307
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include <inttypes.h> 18 19//#define LOG_NDEBUG 0 20#define LOG_TAG "NuCachedSource2" 21#include <utils/Log.h> 22 23#include "include/NuCachedSource2.h" 24#include "include/HTTPBase.h" 25 26#include <cutils/properties.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/AMessage.h> 29#include <media/stagefright/MediaErrors.h> 30 31namespace android { 32 33struct PageCache { 34 PageCache(size_t pageSize); 35 ~PageCache(); 36 37 struct Page { 38 void *mData; 39 size_t mSize; 40 }; 41 42 Page *acquirePage(); 43 void releasePage(Page *page); 44 45 void appendPage(Page *page); 46 size_t releaseFromStart(size_t maxBytes); 47 48 size_t totalSize() const { 49 return mTotalSize; 50 } 51 52 void copy(size_t from, void *data, size_t size); 53 54private: 55 size_t mPageSize; 56 size_t mTotalSize; 57 58 List<Page *> mActivePages; 59 List<Page *> mFreePages; 60 61 void freePages(List<Page *> *list); 62 63 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 64}; 65 66PageCache::PageCache(size_t pageSize) 67 : mPageSize(pageSize), 68 mTotalSize(0) { 69} 70 71PageCache::~PageCache() { 72 freePages(&mActivePages); 73 freePages(&mFreePages); 74} 75 76void PageCache::freePages(List<Page *> *list) { 77 List<Page *>::iterator it = list->begin(); 78 while (it != list->end()) { 79 Page *page = *it; 80 81 free(page->mData); 82 delete page; 83 page = NULL; 84 85 ++it; 86 } 87} 88 89PageCache::Page *PageCache::acquirePage() { 90 if (!mFreePages.empty()) { 91 List<Page *>::iterator it = mFreePages.begin(); 92 Page *page = *it; 93 mFreePages.erase(it); 94 95 return page; 96 } 97 98 Page *page = new Page; 99 page->mData = malloc(mPageSize); 100 page->mSize = 0; 101 102 return page; 103} 104 105void PageCache::releasePage(Page *page) { 106 page->mSize = 0; 107 mFreePages.push_back(page); 108} 109 110void PageCache::appendPage(Page *page) { 111 mTotalSize += page->mSize; 112 mActivePages.push_back(page); 113} 114 115size_t PageCache::releaseFromStart(size_t maxBytes) { 116 size_t bytesReleased = 0; 117 118 while (maxBytes > 0 && !mActivePages.empty()) { 119 List<Page *>::iterator it = mActivePages.begin(); 120 121 Page *page = *it; 122 123 if (maxBytes < page->mSize) { 124 break; 125 } 126 127 mActivePages.erase(it); 128 129 maxBytes -= page->mSize; 130 bytesReleased += page->mSize; 131 132 releasePage(page); 133 } 134 135 mTotalSize -= bytesReleased; 136 return bytesReleased; 137} 138 139void PageCache::copy(size_t from, void *data, size_t size) { 140 ALOGV("copy from %zu size %zu", from, size); 141 142 if (size == 0) { 143 return; 144 } 145 146 CHECK_LE(from + size, mTotalSize); 147 148 size_t offset = 0; 149 List<Page *>::iterator it = mActivePages.begin(); 150 while (from >= offset + (*it)->mSize) { 151 offset += (*it)->mSize; 152 ++it; 153 } 154 155 size_t delta = from - offset; 156 size_t avail = (*it)->mSize - delta; 157 158 if (avail >= size) { 159 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 160 return; 161 } 162 163 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 164 ++it; 165 data = (uint8_t *)data + avail; 166 size -= avail; 167 168 while (size > 0) { 169 size_t copy = (*it)->mSize; 170 if (copy > size) { 171 copy = size; 172 } 173 memcpy(data, (*it)->mData, copy); 174 data = (uint8_t *)data + copy; 175 size -= copy; 176 ++it; 177 } 178} 179 180//////////////////////////////////////////////////////////////////////////////// 181 182NuCachedSource2::NuCachedSource2( 183 const sp<DataSource> &source, 184 const char *cacheConfig, 185 bool disconnectAtHighwatermark) 186 : mSource(source), 187 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 188 mLooper(new ALooper), 189 mCache(new PageCache(kPageSize)), 190 mCacheOffset(0), 191 mFinalStatus(OK), 192 mLastAccessPos(0), 193 mFetching(true), 194 mDisconnecting(false), 195 mLastFetchTimeUs(-1), 196 mNumRetriesLeft(kMaxNumRetries), 197 mHighwaterThresholdBytes(kDefaultHighWaterThreshold), 198 mLowwaterThresholdBytes(kDefaultLowWaterThreshold), 199 mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs), 200 mDisconnectAtHighwatermark(disconnectAtHighwatermark) { 201 // We are NOT going to support disconnect-at-highwatermark indefinitely 202 // and we are not guaranteeing support for client-specified cache 203 // parameters. Both of these are temporary measures to solve a specific 204 // problem that will be solved in a better way going forward. 205 206 updateCacheParamsFromSystemProperty(); 207 208 if (cacheConfig != NULL) { 209 updateCacheParamsFromString(cacheConfig); 210 } 211 212 if (mDisconnectAtHighwatermark) { 213 // Makes no sense to disconnect and do keep-alives... 214 mKeepAliveIntervalUs = 0; 215 } 216 217 mLooper->setName("NuCachedSource2"); 218 mLooper->registerHandler(mReflector); 219 220 // Since it may not be obvious why our looper thread needs to be 221 // able to call into java since it doesn't appear to do so at all... 222 // IMediaHTTPConnection may be (and most likely is) implemented in JAVA 223 // and a local JAVA IBinder will call directly into JNI methods. 224 // So whenever we call DataSource::readAt it may end up in a call to 225 // IMediaHTTPConnection::readAt and therefore call back into JAVA. 226 mLooper->start(false /* runOnCallingThread */, true /* canCallJava */); 227 228 Mutex::Autolock autoLock(mLock); 229 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 230} 231 232NuCachedSource2::~NuCachedSource2() { 233 mLooper->stop(); 234 mLooper->unregisterHandler(mReflector->id()); 235 236 delete mCache; 237 mCache = NULL; 238} 239 240status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 241 if (mSource->flags() & kIsHTTPBasedSource) { 242 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 243 return source->getEstimatedBandwidthKbps(kbps); 244 } 245 return ERROR_UNSUPPORTED; 246} 247 248void NuCachedSource2::disconnect() { 249 if (mSource->flags() & kIsHTTPBasedSource) { 250 ALOGV("disconnecting HTTPBasedSource"); 251 252 { 253 Mutex::Autolock autoLock(mLock); 254 // set mDisconnecting to true, if a fetch returns after 255 // this, the source will be marked as EOS. 256 mDisconnecting = true; 257 } 258 259 // explicitly disconnect from the source, to allow any 260 // pending reads to return more promptly 261 static_cast<HTTPBase *>(mSource.get())->disconnect(); 262 } 263} 264 265status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 266 if (mSource->flags() & kIsHTTPBasedSource) { 267 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 268 return source->setBandwidthStatCollectFreq(freqMs); 269 } 270 return ERROR_UNSUPPORTED; 271} 272 273status_t NuCachedSource2::initCheck() const { 274 return mSource->initCheck(); 275} 276 277status_t NuCachedSource2::getSize(off64_t *size) { 278 return mSource->getSize(size); 279} 280 281uint32_t NuCachedSource2::flags() { 282 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 283 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 284 return (flags | kIsCachingDataSource); 285} 286 287void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 288 switch (msg->what()) { 289 case kWhatFetchMore: 290 { 291 onFetch(); 292 break; 293 } 294 295 case kWhatRead: 296 { 297 onRead(msg); 298 break; 299 } 300 301 default: 302 TRESPASS(); 303 } 304} 305 306void NuCachedSource2::fetchInternal() { 307 ALOGV("fetchInternal"); 308 309 bool reconnect = false; 310 311 { 312 Mutex::Autolock autoLock(mLock); 313 CHECK(mFinalStatus == OK || mNumRetriesLeft > 0); 314 315 if (mFinalStatus != OK) { 316 --mNumRetriesLeft; 317 318 reconnect = true; 319 } 320 } 321 322 if (reconnect) { 323 status_t err = 324 mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize()); 325 326 Mutex::Autolock autoLock(mLock); 327 328 if (err == ERROR_UNSUPPORTED || err == -EPIPE) { 329 // These are errors that are not likely to go away even if we 330 // retry, i.e. the server doesn't support range requests or similar. 331 mNumRetriesLeft = 0; 332 return; 333 } else if (err != OK) { 334 ALOGI("The attempt to reconnect failed, %d retries remaining", 335 mNumRetriesLeft); 336 337 return; 338 } 339 } 340 341 PageCache::Page *page = mCache->acquirePage(); 342 343 ssize_t n = mSource->readAt( 344 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 345 346 Mutex::Autolock autoLock(mLock); 347 348 if (n == 0 || mDisconnecting) { 349 ALOGI("ERROR_END_OF_STREAM"); 350 351 mNumRetriesLeft = 0; 352 mFinalStatus = ERROR_END_OF_STREAM; 353 354 mCache->releasePage(page); 355 } else if (n < 0) { 356 mFinalStatus = n; 357 if (n == ERROR_UNSUPPORTED || n == -EPIPE) { 358 // These are errors that are not likely to go away even if we 359 // retry, i.e. the server doesn't support range requests or similar. 360 mNumRetriesLeft = 0; 361 } 362 363 ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft); 364 mCache->releasePage(page); 365 } else { 366 if (mFinalStatus != OK) { 367 ALOGI("retrying a previously failed read succeeded."); 368 } 369 mNumRetriesLeft = kMaxNumRetries; 370 mFinalStatus = OK; 371 372 page->mSize = n; 373 mCache->appendPage(page); 374 } 375} 376 377void NuCachedSource2::onFetch() { 378 ALOGV("onFetch"); 379 380 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 381 ALOGV("EOS reached, done prefetching for now"); 382 mFetching = false; 383 } 384 385 bool keepAlive = 386 !mFetching 387 && mFinalStatus == OK 388 && mKeepAliveIntervalUs > 0 389 && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs; 390 391 if (mFetching || keepAlive) { 392 if (keepAlive) { 393 ALOGI("Keep alive"); 394 } 395 396 fetchInternal(); 397 398 mLastFetchTimeUs = ALooper::GetNowUs(); 399 400 if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) { 401 ALOGI("Cache full, done prefetching for now"); 402 mFetching = false; 403 404 if (mDisconnectAtHighwatermark 405 && (mSource->flags() & DataSource::kIsHTTPBasedSource)) { 406 ALOGV("Disconnecting at high watermark"); 407 static_cast<HTTPBase *>(mSource.get())->disconnect(); 408 mFinalStatus = -EAGAIN; 409 } 410 } 411 } else { 412 Mutex::Autolock autoLock(mLock); 413 restartPrefetcherIfNecessary_l(); 414 } 415 416 int64_t delayUs; 417 if (mFetching) { 418 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 419 // We failed this time and will try again in 3 seconds. 420 delayUs = 3000000ll; 421 } else { 422 delayUs = 0; 423 } 424 } else { 425 delayUs = 100000ll; 426 } 427 428 (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs); 429} 430 431void NuCachedSource2::onRead(const sp<AMessage> &msg) { 432 ALOGV("onRead"); 433 434 int64_t offset; 435 CHECK(msg->findInt64("offset", &offset)); 436 437 void *data; 438 CHECK(msg->findPointer("data", &data)); 439 440 size_t size; 441 CHECK(msg->findSize("size", &size)); 442 443 ssize_t result = readInternal(offset, data, size); 444 445 if (result == -EAGAIN) { 446 msg->post(50000); 447 return; 448 } 449 450 Mutex::Autolock autoLock(mLock); 451 452 CHECK(mAsyncResult == NULL); 453 454 mAsyncResult = new AMessage; 455 mAsyncResult->setInt32("result", result); 456 457 mCondition.signal(); 458} 459 460void NuCachedSource2::restartPrefetcherIfNecessary_l( 461 bool ignoreLowWaterThreshold, bool force) { 462 static const size_t kGrayArea = 1024 * 1024; 463 464 if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) { 465 return; 466 } 467 468 if (!ignoreLowWaterThreshold && !force 469 && mCacheOffset + mCache->totalSize() - mLastAccessPos 470 >= mLowwaterThresholdBytes) { 471 return; 472 } 473 474 size_t maxBytes = mLastAccessPos - mCacheOffset; 475 476 if (!force) { 477 if (maxBytes < kGrayArea) { 478 return; 479 } 480 481 maxBytes -= kGrayArea; 482 } 483 484 size_t actualBytes = mCache->releaseFromStart(maxBytes); 485 mCacheOffset += actualBytes; 486 487 ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize()); 488 mFetching = true; 489} 490 491ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 492 Mutex::Autolock autoSerializer(mSerializer); 493 494 ALOGV("readAt offset %lld, size %zu", offset, size); 495 496 Mutex::Autolock autoLock(mLock); 497 498 // If the request can be completely satisfied from the cache, do so. 499 500 if (offset >= mCacheOffset 501 && offset + size <= mCacheOffset + mCache->totalSize()) { 502 size_t delta = offset - mCacheOffset; 503 mCache->copy(delta, data, size); 504 505 mLastAccessPos = offset + size; 506 507 return size; 508 } 509 510 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 511 msg->setInt64("offset", offset); 512 msg->setPointer("data", data); 513 msg->setSize("size", size); 514 515 CHECK(mAsyncResult == NULL); 516 msg->post(); 517 518 while (mAsyncResult == NULL) { 519 mCondition.wait(mLock); 520 } 521 522 int32_t result; 523 CHECK(mAsyncResult->findInt32("result", &result)); 524 525 mAsyncResult.clear(); 526 527 if (result > 0) { 528 mLastAccessPos = offset + result; 529 } 530 531 return (ssize_t)result; 532} 533 534size_t NuCachedSource2::cachedSize() { 535 Mutex::Autolock autoLock(mLock); 536 return mCacheOffset + mCache->totalSize(); 537} 538 539size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const { 540 Mutex::Autolock autoLock(mLock); 541 return approxDataRemaining_l(finalStatus); 542} 543 544size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const { 545 *finalStatus = mFinalStatus; 546 547 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 548 // Pretend that everything is fine until we're out of retries. 549 *finalStatus = OK; 550 } 551 552 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 553 if (mLastAccessPos < lastBytePosCached) { 554 return lastBytePosCached - mLastAccessPos; 555 } 556 return 0; 557} 558 559ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 560 CHECK_LE(size, (size_t)mHighwaterThresholdBytes); 561 562 ALOGV("readInternal offset %lld size %zu", offset, size); 563 564 Mutex::Autolock autoLock(mLock); 565 566 if (!mFetching) { 567 mLastAccessPos = offset; 568 restartPrefetcherIfNecessary_l( 569 false, // ignoreLowWaterThreshold 570 true); // force 571 } 572 573 if (offset < mCacheOffset 574 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 575 static const off64_t kPadding = 256 * 1024; 576 577 // In the presence of multiple decoded streams, once of them will 578 // trigger this seek request, the other one will request data "nearby" 579 // soon, adjust the seek position so that that subsequent request 580 // does not trigger another seek. 581 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 582 583 seekInternal_l(seekOffset); 584 } 585 586 size_t delta = offset - mCacheOffset; 587 588 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 589 if (delta >= mCache->totalSize()) { 590 return mFinalStatus; 591 } 592 593 size_t avail = mCache->totalSize() - delta; 594 595 if (avail > size) { 596 avail = size; 597 } 598 599 mCache->copy(delta, data, avail); 600 601 return avail; 602 } 603 604 if (offset + size <= mCacheOffset + mCache->totalSize()) { 605 mCache->copy(delta, data, size); 606 607 return size; 608 } 609 610 ALOGV("deferring read"); 611 612 return -EAGAIN; 613} 614 615status_t NuCachedSource2::seekInternal_l(off64_t offset) { 616 mLastAccessPos = offset; 617 618 if (offset >= mCacheOffset 619 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 620 return OK; 621 } 622 623 ALOGI("new range: offset= %lld", offset); 624 625 mCacheOffset = offset; 626 627 size_t totalSize = mCache->totalSize(); 628 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 629 630 mNumRetriesLeft = kMaxNumRetries; 631 mFetching = true; 632 633 return OK; 634} 635 636void NuCachedSource2::resumeFetchingIfNecessary() { 637 Mutex::Autolock autoLock(mLock); 638 639 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 640} 641 642sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) { 643 return mSource->DrmInitialization(mime); 644} 645 646void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 647 mSource->getDrmInfo(handle, client); 648} 649 650String8 NuCachedSource2::getUri() { 651 return mSource->getUri(); 652} 653 654String8 NuCachedSource2::getMIMEType() const { 655 return mSource->getMIMEType(); 656} 657 658void NuCachedSource2::updateCacheParamsFromSystemProperty() { 659 char value[PROPERTY_VALUE_MAX]; 660 if (!property_get("media.stagefright.cache-params", value, NULL)) { 661 return; 662 } 663 664 updateCacheParamsFromString(value); 665} 666 667void NuCachedSource2::updateCacheParamsFromString(const char *s) { 668 ssize_t lowwaterMarkKb, highwaterMarkKb; 669 int keepAliveSecs; 670 671 if (sscanf(s, "%zd/%zd/%d", 672 &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) { 673 ALOGE("Failed to parse cache parameters from '%s'.", s); 674 return; 675 } 676 677 if (lowwaterMarkKb >= 0) { 678 mLowwaterThresholdBytes = lowwaterMarkKb * 1024; 679 } else { 680 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 681 } 682 683 if (highwaterMarkKb >= 0) { 684 mHighwaterThresholdBytes = highwaterMarkKb * 1024; 685 } else { 686 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 687 } 688 689 if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) { 690 ALOGE("Illegal low/highwater marks specified, reverting to defaults."); 691 692 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 693 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 694 } 695 696 if (keepAliveSecs >= 0) { 697 mKeepAliveIntervalUs = keepAliveSecs * 1000000ll; 698 } else { 699 mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs; 700 } 701 702 ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %" PRId64 " us", 703 mLowwaterThresholdBytes, 704 mHighwaterThresholdBytes, 705 mKeepAliveIntervalUs); 706} 707 708// static 709void NuCachedSource2::RemoveCacheSpecificHeaders( 710 KeyedVector<String8, String8> *headers, 711 String8 *cacheConfig, 712 bool *disconnectAtHighwatermark) { 713 *cacheConfig = String8(); 714 *disconnectAtHighwatermark = false; 715 716 if (headers == NULL) { 717 return; 718 } 719 720 ssize_t index; 721 if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) { 722 *cacheConfig = headers->valueAt(index); 723 724 headers->removeItemsAt(index); 725 726 ALOGV("Using special cache config '%s'", cacheConfig->string()); 727 } 728 729 if ((index = headers->indexOfKey( 730 String8("x-disconnect-at-highwatermark"))) >= 0) { 731 *disconnectAtHighwatermark = true; 732 headers->removeItemsAt(index); 733 734 ALOGV("Client requested disconnection at highwater mark"); 735 } 736} 737 738} // namespace android 739