NuCachedSource2.cpp revision b33d2ac90cfce0fe6db8c3e979e7ae2bbfc28163
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NuCachedSource2" 19#include <utils/Log.h> 20 21#include "include/NuCachedSource2.h" 22#include "include/HTTPBase.h" 23 24#include <media/stagefright/foundation/ADebug.h> 25#include <media/stagefright/foundation/AMessage.h> 26#include <media/stagefright/MediaErrors.h> 27 28namespace android { 29 30struct PageCache { 31 PageCache(size_t pageSize); 32 ~PageCache(); 33 34 struct Page { 35 void *mData; 36 size_t mSize; 37 }; 38 39 Page *acquirePage(); 40 void releasePage(Page *page); 41 42 void appendPage(Page *page); 43 size_t releaseFromStart(size_t maxBytes); 44 45 size_t totalSize() const { 46 return mTotalSize; 47 } 48 49 void copy(size_t from, void *data, size_t size); 50 51private: 52 size_t mPageSize; 53 size_t mTotalSize; 54 55 List<Page *> mActivePages; 56 List<Page *> mFreePages; 57 58 void freePages(List<Page *> *list); 59 60 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 61}; 62 63PageCache::PageCache(size_t pageSize) 64 : mPageSize(pageSize), 65 mTotalSize(0) { 66} 67 68PageCache::~PageCache() { 69 freePages(&mActivePages); 70 freePages(&mFreePages); 71} 72 73void PageCache::freePages(List<Page *> *list) { 74 List<Page *>::iterator it = list->begin(); 75 while (it != list->end()) { 76 Page *page = *it; 77 78 free(page->mData); 79 delete page; 80 page = NULL; 81 82 ++it; 83 } 84} 85 86PageCache::Page *PageCache::acquirePage() { 87 if (!mFreePages.empty()) { 88 List<Page *>::iterator it = mFreePages.begin(); 89 Page *page = *it; 90 mFreePages.erase(it); 91 92 return page; 93 } 94 95 Page *page = new Page; 96 page->mData = malloc(mPageSize); 97 page->mSize = 0; 98 99 return page; 100} 101 102void PageCache::releasePage(Page *page) { 103 page->mSize = 0; 104 mFreePages.push_back(page); 105} 106 107void PageCache::appendPage(Page *page) { 108 mTotalSize += page->mSize; 109 mActivePages.push_back(page); 110} 111 112size_t PageCache::releaseFromStart(size_t maxBytes) { 113 size_t bytesReleased = 0; 114 115 while (maxBytes > 0 && !mActivePages.empty()) { 116 List<Page *>::iterator it = mActivePages.begin(); 117 118 Page *page = *it; 119 120 if (maxBytes < page->mSize) { 121 break; 122 } 123 124 mActivePages.erase(it); 125 126 maxBytes -= page->mSize; 127 bytesReleased += page->mSize; 128 129 releasePage(page); 130 } 131 132 mTotalSize -= bytesReleased; 133 return bytesReleased; 134} 135 136void PageCache::copy(size_t from, void *data, size_t size) { 137 LOGV("copy from %d size %d", from, size); 138 139 if (size == 0) { 140 return; 141 } 142 143 CHECK_LE(from + size, mTotalSize); 144 145 size_t offset = 0; 146 List<Page *>::iterator it = mActivePages.begin(); 147 while (from >= offset + (*it)->mSize) { 148 offset += (*it)->mSize; 149 ++it; 150 } 151 152 size_t delta = from - offset; 153 size_t avail = (*it)->mSize - delta; 154 155 if (avail >= size) { 156 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 157 return; 158 } 159 160 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 161 ++it; 162 data = (uint8_t *)data + avail; 163 size -= avail; 164 165 while (size > 0) { 166 size_t copy = (*it)->mSize; 167 if (copy > size) { 168 copy = size; 169 } 170 memcpy(data, (*it)->mData, copy); 171 data = (uint8_t *)data + copy; 172 size -= copy; 173 ++it; 174 } 175} 176 177//////////////////////////////////////////////////////////////////////////////// 178 179NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 180 : mSource(source), 181 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 182 mLooper(new ALooper), 183 mCache(new PageCache(kPageSize)), 184 mCacheOffset(0), 185 mFinalStatus(OK), 186 mLastAccessPos(0), 187 mFetching(true), 188 mLastFetchTimeUs(-1) { 189 mLooper->setName("NuCachedSource2"); 190 mLooper->registerHandler(mReflector); 191 mLooper->start(); 192 193 Mutex::Autolock autoLock(mLock); 194 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 195} 196 197NuCachedSource2::~NuCachedSource2() { 198 mLooper->stop(); 199 mLooper->unregisterHandler(mReflector->id()); 200 201 delete mCache; 202 mCache = NULL; 203} 204 205status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 206 if (mSource->flags() & kIsHTTPBasedSource) { 207 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 208 return source->getEstimatedBandwidthKbps(kbps); 209 } 210 return ERROR_UNSUPPORTED; 211} 212 213status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 214 if (mSource->flags() & kIsHTTPBasedSource) { 215 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 216 return source->setBandwidthStatCollectFreq(freqMs); 217 } 218 return ERROR_UNSUPPORTED; 219} 220 221status_t NuCachedSource2::initCheck() const { 222 return mSource->initCheck(); 223} 224 225status_t NuCachedSource2::getSize(off64_t *size) { 226 return mSource->getSize(size); 227} 228 229uint32_t NuCachedSource2::flags() { 230 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 231 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 232 return (flags | kIsCachingDataSource); 233} 234 235void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 236 switch (msg->what()) { 237 case kWhatFetchMore: 238 { 239 onFetch(); 240 break; 241 } 242 243 case kWhatRead: 244 { 245 onRead(msg); 246 break; 247 } 248 249 default: 250 TRESPASS(); 251 } 252} 253 254void NuCachedSource2::fetchInternal() { 255 LOGV("fetchInternal"); 256 257 CHECK_EQ(mFinalStatus, (status_t)OK); 258 259 PageCache::Page *page = mCache->acquirePage(); 260 261 ssize_t n = mSource->readAt( 262 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 263 264 Mutex::Autolock autoLock(mLock); 265 266 if (n < 0) { 267 LOGE("source returned error %ld", n); 268 mFinalStatus = n; 269 mCache->releasePage(page); 270 } else if (n == 0) { 271 LOGI("ERROR_END_OF_STREAM"); 272 mFinalStatus = ERROR_END_OF_STREAM; 273 mCache->releasePage(page); 274 } else { 275 page->mSize = n; 276 mCache->appendPage(page); 277 } 278} 279 280void NuCachedSource2::onFetch() { 281 LOGV("onFetch"); 282 283 if (mFinalStatus != OK) { 284 LOGV("EOS reached, done prefetching for now"); 285 mFetching = false; 286 } 287 288 bool keepAlive = 289 !mFetching 290 && mFinalStatus == OK 291 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 292 293 if (mFetching || keepAlive) { 294 if (keepAlive) { 295 LOGI("Keep alive"); 296 } 297 298 fetchInternal(); 299 300 mLastFetchTimeUs = ALooper::GetNowUs(); 301 302 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 303 LOGI("Cache full, done prefetching for now"); 304 mFetching = false; 305 } 306 } else { 307 Mutex::Autolock autoLock(mLock); 308 restartPrefetcherIfNecessary_l(); 309 } 310 311 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 312 mFetching ? 0 : 100000ll); 313} 314 315void NuCachedSource2::onRead(const sp<AMessage> &msg) { 316 LOGV("onRead"); 317 318 int64_t offset; 319 CHECK(msg->findInt64("offset", &offset)); 320 321 void *data; 322 CHECK(msg->findPointer("data", &data)); 323 324 size_t size; 325 CHECK(msg->findSize("size", &size)); 326 327 ssize_t result = readInternal(offset, data, size); 328 329 if (result == -EAGAIN) { 330 msg->post(50000); 331 return; 332 } 333 334 Mutex::Autolock autoLock(mLock); 335 336 CHECK(mAsyncResult == NULL); 337 338 mAsyncResult = new AMessage; 339 mAsyncResult->setInt32("result", result); 340 341 mCondition.signal(); 342} 343 344void NuCachedSource2::restartPrefetcherIfNecessary_l( 345 bool ignoreLowWaterThreshold, bool force) { 346 static const size_t kGrayArea = 1024 * 1024; 347 348 if (mFetching || mFinalStatus != OK) { 349 return; 350 } 351 352 if (!ignoreLowWaterThreshold && !force 353 && mCacheOffset + mCache->totalSize() - mLastAccessPos 354 >= kLowWaterThreshold) { 355 return; 356 } 357 358 size_t maxBytes = mLastAccessPos - mCacheOffset; 359 360 if (!force) { 361 if (maxBytes < kGrayArea) { 362 return; 363 } 364 365 maxBytes -= kGrayArea; 366 } 367 368 size_t actualBytes = mCache->releaseFromStart(maxBytes); 369 mCacheOffset += actualBytes; 370 371 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 372 mFetching = true; 373} 374 375ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 376 Mutex::Autolock autoSerializer(mSerializer); 377 378 LOGV("readAt offset %lld, size %d", offset, size); 379 380 Mutex::Autolock autoLock(mLock); 381 382 // If the request can be completely satisfied from the cache, do so. 383 384 if (offset >= mCacheOffset 385 && offset + size <= mCacheOffset + mCache->totalSize()) { 386 size_t delta = offset - mCacheOffset; 387 mCache->copy(delta, data, size); 388 389 mLastAccessPos = offset + size; 390 391 return size; 392 } 393 394 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 395 msg->setInt64("offset", offset); 396 msg->setPointer("data", data); 397 msg->setSize("size", size); 398 399 CHECK(mAsyncResult == NULL); 400 msg->post(); 401 402 while (mAsyncResult == NULL) { 403 mCondition.wait(mLock); 404 } 405 406 int32_t result; 407 CHECK(mAsyncResult->findInt32("result", &result)); 408 409 mAsyncResult.clear(); 410 411 if (result > 0) { 412 mLastAccessPos = offset + result; 413 } 414 415 return (ssize_t)result; 416} 417 418size_t NuCachedSource2::cachedSize() { 419 Mutex::Autolock autoLock(mLock); 420 return mCacheOffset + mCache->totalSize(); 421} 422 423size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) { 424 Mutex::Autolock autoLock(mLock); 425 return approxDataRemaining_l(finalStatus); 426} 427 428size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) { 429 *finalStatus = mFinalStatus; 430 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 431 if (mLastAccessPos < lastBytePosCached) { 432 return lastBytePosCached - mLastAccessPos; 433 } 434 return 0; 435} 436 437ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 438 CHECK_LE(size, (size_t)kHighWaterThreshold); 439 440 LOGV("readInternal offset %lld size %d", offset, size); 441 442 Mutex::Autolock autoLock(mLock); 443 444 if (!mFetching) { 445 mLastAccessPos = offset; 446 restartPrefetcherIfNecessary_l( 447 false, // ignoreLowWaterThreshold 448 true); // force 449 } 450 451 if (offset < mCacheOffset 452 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 453 static const off64_t kPadding = 256 * 1024; 454 455 // In the presence of multiple decoded streams, once of them will 456 // trigger this seek request, the other one will request data "nearby" 457 // soon, adjust the seek position so that that subsequent request 458 // does not trigger another seek. 459 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 460 461 seekInternal_l(seekOffset); 462 } 463 464 size_t delta = offset - mCacheOffset; 465 466 if (mFinalStatus != OK) { 467 if (delta >= mCache->totalSize()) { 468 return mFinalStatus; 469 } 470 471 size_t avail = mCache->totalSize() - delta; 472 473 if (avail > size) { 474 avail = size; 475 } 476 477 mCache->copy(delta, data, avail); 478 479 return avail; 480 } 481 482 if (offset + size <= mCacheOffset + mCache->totalSize()) { 483 mCache->copy(delta, data, size); 484 485 return size; 486 } 487 488 LOGV("deferring read"); 489 490 return -EAGAIN; 491} 492 493status_t NuCachedSource2::seekInternal_l(off64_t offset) { 494 mLastAccessPos = offset; 495 496 if (offset >= mCacheOffset 497 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 498 return OK; 499 } 500 501 LOGI("new range: offset= %lld", offset); 502 503 mCacheOffset = offset; 504 505 size_t totalSize = mCache->totalSize(); 506 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 507 508 mFinalStatus = OK; 509 mFetching = true; 510 511 return OK; 512} 513 514void NuCachedSource2::resumeFetchingIfNecessary() { 515 Mutex::Autolock autoLock(mLock); 516 517 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 518} 519 520sp<DecryptHandle> NuCachedSource2::DrmInitialization() { 521 return mSource->DrmInitialization(); 522} 523 524void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 525 mSource->getDrmInfo(handle, client); 526} 527 528String8 NuCachedSource2::getUri() { 529 return mSource->getUri(); 530} 531 532String8 NuCachedSource2::getMIMEType() const { 533 return mSource->getMIMEType(); 534} 535 536} // namespace android 537