NuCachedSource2.cpp revision 6511c9755c3a3360ba869772600c7aae048a7ffc
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NuCachedSource2" 19#include <utils/Log.h> 20 21#include "include/NuCachedSource2.h" 22 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/MediaErrors.h> 26 27namespace android { 28 29struct PageCache { 30 PageCache(size_t pageSize); 31 ~PageCache(); 32 33 struct Page { 34 void *mData; 35 size_t mSize; 36 }; 37 38 Page *acquirePage(); 39 void releasePage(Page *page); 40 41 void appendPage(Page *page); 42 size_t releaseFromStart(size_t maxBytes); 43 44 size_t totalSize() const { 45 return mTotalSize; 46 } 47 48 void copy(size_t from, void *data, size_t size); 49 50private: 51 size_t mPageSize; 52 size_t mTotalSize; 53 54 List<Page *> mActivePages; 55 List<Page *> mFreePages; 56 57 void freePages(List<Page *> *list); 58 59 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 60}; 61 62PageCache::PageCache(size_t pageSize) 63 : mPageSize(pageSize), 64 mTotalSize(0) { 65} 66 67PageCache::~PageCache() { 68 freePages(&mActivePages); 69 freePages(&mFreePages); 70} 71 72void PageCache::freePages(List<Page *> *list) { 73 List<Page *>::iterator it = list->begin(); 74 while (it != list->end()) { 75 Page *page = *it; 76 77 free(page->mData); 78 delete page; 79 page = NULL; 80 81 ++it; 82 } 83} 84 85PageCache::Page *PageCache::acquirePage() { 86 if (!mFreePages.empty()) { 87 List<Page *>::iterator it = mFreePages.begin(); 88 Page *page = *it; 89 mFreePages.erase(it); 90 91 return page; 92 } 93 94 Page *page = new Page; 95 page->mData = malloc(mPageSize); 96 page->mSize = 0; 97 98 return page; 99} 100 101void PageCache::releasePage(Page *page) { 102 page->mSize = 0; 103 mFreePages.push_back(page); 104} 105 106void PageCache::appendPage(Page *page) { 107 mTotalSize += page->mSize; 108 mActivePages.push_back(page); 109} 110 111size_t PageCache::releaseFromStart(size_t maxBytes) { 112 size_t bytesReleased = 0; 113 114 while (maxBytes > 0 && !mActivePages.empty()) { 115 List<Page *>::iterator it = mActivePages.begin(); 116 117 Page *page = *it; 118 119 if (maxBytes < page->mSize) { 120 break; 121 } 122 123 mActivePages.erase(it); 124 125 maxBytes -= page->mSize; 126 bytesReleased += page->mSize; 127 128 releasePage(page); 129 } 130 131 mTotalSize -= bytesReleased; 132 return bytesReleased; 133} 134 135void PageCache::copy(size_t from, void *data, size_t size) { 136 LOGV("copy from %d size %d", from, size); 137 138 if (size == 0) { 139 return; 140 } 141 142 CHECK_LE(from + size, mTotalSize); 143 144 size_t offset = 0; 145 List<Page *>::iterator it = mActivePages.begin(); 146 while (from >= offset + (*it)->mSize) { 147 offset += (*it)->mSize; 148 ++it; 149 } 150 151 size_t delta = from - offset; 152 size_t avail = (*it)->mSize - delta; 153 154 if (avail >= size) { 155 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 156 return; 157 } 158 159 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 160 ++it; 161 data = (uint8_t *)data + avail; 162 size -= avail; 163 164 while (size > 0) { 165 size_t copy = (*it)->mSize; 166 if (copy > size) { 167 copy = size; 168 } 169 memcpy(data, (*it)->mData, copy); 170 data = (uint8_t *)data + copy; 171 size -= copy; 172 ++it; 173 } 174} 175 176//////////////////////////////////////////////////////////////////////////////// 177 178NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 179 : mSource(source), 180 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 181 mLooper(new ALooper), 182 mCache(new PageCache(kPageSize)), 183 mCacheOffset(0), 184 mFinalStatus(OK), 185 mLastAccessPos(0), 186 mFetching(true), 187 mLastFetchTimeUs(-1) { 188 mLooper->setName("NuCachedSource2"); 189 mLooper->registerHandler(mReflector); 190 mLooper->start(); 191 192 Mutex::Autolock autoLock(mLock); 193 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 194} 195 196NuCachedSource2::~NuCachedSource2() { 197 mLooper->stop(); 198 mLooper->unregisterHandler(mReflector->id()); 199 200 delete mCache; 201 mCache = NULL; 202} 203 204status_t NuCachedSource2::initCheck() const { 205 return mSource->initCheck(); 206} 207 208status_t NuCachedSource2::getSize(off64_t *size) { 209 return mSource->getSize(size); 210} 211 212uint32_t NuCachedSource2::flags() { 213 return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource; 214} 215 216void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 217 switch (msg->what()) { 218 case kWhatFetchMore: 219 { 220 onFetch(); 221 break; 222 } 223 224 case kWhatRead: 225 { 226 onRead(msg); 227 break; 228 } 229 230 default: 231 TRESPASS(); 232 } 233} 234 235void NuCachedSource2::fetchInternal() { 236 LOGV("fetchInternal"); 237 238 CHECK_EQ(mFinalStatus, (status_t)OK); 239 240 PageCache::Page *page = mCache->acquirePage(); 241 242 ssize_t n = mSource->readAt( 243 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 244 245 Mutex::Autolock autoLock(mLock); 246 247 if (n < 0) { 248 LOGE("source returned error %ld", n); 249 mFinalStatus = n; 250 mCache->releasePage(page); 251 } else if (n == 0) { 252 LOGI("ERROR_END_OF_STREAM"); 253 mFinalStatus = ERROR_END_OF_STREAM; 254 mCache->releasePage(page); 255 } else { 256 page->mSize = n; 257 mCache->appendPage(page); 258 } 259} 260 261void NuCachedSource2::onFetch() { 262 LOGV("onFetch"); 263 264 if (mFinalStatus != OK) { 265 LOGV("EOS reached, done prefetching for now"); 266 mFetching = false; 267 } 268 269 bool keepAlive = 270 !mFetching 271 && mFinalStatus == OK 272 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 273 274 if (mFetching || keepAlive) { 275 if (keepAlive) { 276 LOGI("Keep alive"); 277 } 278 279 fetchInternal(); 280 281 mLastFetchTimeUs = ALooper::GetNowUs(); 282 283 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 284 LOGI("Cache full, done prefetching for now"); 285 mFetching = false; 286 } 287 } else { 288 Mutex::Autolock autoLock(mLock); 289 restartPrefetcherIfNecessary_l(); 290 } 291 292 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 293 mFetching ? 0 : 100000ll); 294} 295 296void NuCachedSource2::onRead(const sp<AMessage> &msg) { 297 LOGV("onRead"); 298 299 int64_t offset; 300 CHECK(msg->findInt64("offset", &offset)); 301 302 void *data; 303 CHECK(msg->findPointer("data", &data)); 304 305 size_t size; 306 CHECK(msg->findSize("size", &size)); 307 308 ssize_t result = readInternal(offset, data, size); 309 310 if (result == -EAGAIN) { 311 msg->post(50000); 312 return; 313 } 314 315 Mutex::Autolock autoLock(mLock); 316 317 CHECK(mAsyncResult == NULL); 318 319 mAsyncResult = new AMessage; 320 mAsyncResult->setInt32("result", result); 321 322 mCondition.signal(); 323} 324 325void NuCachedSource2::restartPrefetcherIfNecessary_l( 326 bool ignoreLowWaterThreshold) { 327 static const size_t kGrayArea = 1024 * 1024; 328 329 if (mFetching || mFinalStatus != OK) { 330 return; 331 } 332 333 if (!ignoreLowWaterThreshold 334 && mCacheOffset + mCache->totalSize() - mLastAccessPos 335 >= kLowWaterThreshold) { 336 return; 337 } 338 339 size_t maxBytes = mLastAccessPos - mCacheOffset; 340 if (maxBytes < kGrayArea) { 341 return; 342 } 343 344 maxBytes -= kGrayArea; 345 346 size_t actualBytes = mCache->releaseFromStart(maxBytes); 347 mCacheOffset += actualBytes; 348 349 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 350 mFetching = true; 351} 352 353ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 354 Mutex::Autolock autoSerializer(mSerializer); 355 356 LOGV("readAt offset %lld, size %d", offset, size); 357 358 Mutex::Autolock autoLock(mLock); 359 360 // If the request can be completely satisfied from the cache, do so. 361 362 if (offset >= mCacheOffset 363 && offset + size <= mCacheOffset + mCache->totalSize()) { 364 size_t delta = offset - mCacheOffset; 365 mCache->copy(delta, data, size); 366 367 mLastAccessPos = offset + size; 368 369 return size; 370 } 371 372 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 373 msg->setInt64("offset", offset); 374 msg->setPointer("data", data); 375 msg->setSize("size", size); 376 377 CHECK(mAsyncResult == NULL); 378 msg->post(); 379 380 while (mAsyncResult == NULL) { 381 mCondition.wait(mLock); 382 } 383 384 int32_t result; 385 CHECK(mAsyncResult->findInt32("result", &result)); 386 387 mAsyncResult.clear(); 388 389 if (result > 0) { 390 mLastAccessPos = offset + result; 391 } 392 393 return (ssize_t)result; 394} 395 396size_t NuCachedSource2::cachedSize() { 397 Mutex::Autolock autoLock(mLock); 398 return mCacheOffset + mCache->totalSize(); 399} 400 401size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) { 402 Mutex::Autolock autoLock(mLock); 403 return approxDataRemaining_l(finalStatus); 404} 405 406size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) { 407 *finalStatus = mFinalStatus; 408 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 409 if (mLastAccessPos < lastBytePosCached) { 410 return lastBytePosCached - mLastAccessPos; 411 } 412 return 0; 413} 414 415ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 416 LOGV("readInternal offset %lld size %d", offset, size); 417 418 Mutex::Autolock autoLock(mLock); 419 420 if (offset < mCacheOffset 421 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 422 static const off64_t kPadding = 256 * 1024; 423 424 // In the presence of multiple decoded streams, once of them will 425 // trigger this seek request, the other one will request data "nearby" 426 // soon, adjust the seek position so that that subsequent request 427 // does not trigger another seek. 428 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 429 430 seekInternal_l(seekOffset); 431 } 432 433 size_t delta = offset - mCacheOffset; 434 435 if (mFinalStatus != OK) { 436 if (delta >= mCache->totalSize()) { 437 return mFinalStatus; 438 } 439 440 size_t avail = mCache->totalSize() - delta; 441 mCache->copy(delta, data, avail); 442 443 return avail; 444 } 445 446 if (offset + size <= mCacheOffset + mCache->totalSize()) { 447 mCache->copy(delta, data, size); 448 449 return size; 450 } 451 452 LOGV("deferring read"); 453 454 return -EAGAIN; 455} 456 457status_t NuCachedSource2::seekInternal_l(off64_t offset) { 458 mLastAccessPos = offset; 459 460 if (offset >= mCacheOffset 461 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 462 return OK; 463 } 464 465 LOGI("new range: offset= %lld", offset); 466 467 mCacheOffset = offset; 468 469 size_t totalSize = mCache->totalSize(); 470 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 471 472 mFinalStatus = OK; 473 mFetching = true; 474 475 return OK; 476} 477 478void NuCachedSource2::resumeFetchingIfNecessary() { 479 Mutex::Autolock autoLock(mLock); 480 481 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 482} 483 484sp<DecryptHandle> NuCachedSource2::DrmInitialization() { 485 return mSource->DrmInitialization(); 486} 487 488void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 489 mSource->getDrmInfo(handle, client); 490} 491 492String8 NuCachedSource2::getUri() { 493 return mSource->getUri(); 494} 495 496String8 NuCachedSource2::getMIMEType() const { 497 return mSource->getMIMEType(); 498} 499 500} // namespace android 501