NuCachedSource2.cpp revision 145e68fc778275963189b02a1adcbe27cce4d769
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "NuCachedSource2" 18#include <utils/Log.h> 19 20#include "include/NuCachedSource2.h" 21 22#include <media/stagefright/foundation/ADebug.h> 23#include <media/stagefright/foundation/AMessage.h> 24#include <media/stagefright/MediaErrors.h> 25 26namespace android { 27 28struct PageCache { 29 PageCache(size_t pageSize); 30 ~PageCache(); 31 32 struct Page { 33 void *mData; 34 size_t mSize; 35 }; 36 37 Page *acquirePage(); 38 void releasePage(Page *page); 39 40 void appendPage(Page *page); 41 size_t releaseFromStart(size_t maxBytes); 42 43 size_t totalSize() const { 44 return mTotalSize; 45 } 46 47 void copy(size_t from, void *data, size_t size); 48 49private: 50 size_t mPageSize; 51 size_t mTotalSize; 52 53 List<Page *> mActivePages; 54 List<Page *> mFreePages; 55 56 void freePages(List<Page *> *list); 57 58 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 59}; 60 61PageCache::PageCache(size_t pageSize) 62 : mPageSize(pageSize), 63 mTotalSize(0) { 64} 65 66PageCache::~PageCache() { 67 freePages(&mActivePages); 68 freePages(&mFreePages); 69} 70 71void PageCache::freePages(List<Page *> *list) { 72 List<Page *>::iterator it = list->begin(); 73 while (it != list->end()) { 74 Page *page = *it; 75 76 free(page->mData); 77 delete page; 78 page = NULL; 79 80 ++it; 81 } 82} 83 84PageCache::Page *PageCache::acquirePage() { 85 if (!mFreePages.empty()) { 86 List<Page *>::iterator it = mFreePages.begin(); 87 Page *page = *it; 88 mFreePages.erase(it); 89 90 return page; 91 } 92 93 Page *page = new Page; 94 page->mData = malloc(mPageSize); 95 page->mSize = 0; 96 97 return page; 98} 99 100void PageCache::releasePage(Page *page) { 101 page->mSize = 0; 102 mFreePages.push_back(page); 103} 104 105void PageCache::appendPage(Page *page) { 106 mTotalSize += page->mSize; 107 mActivePages.push_back(page); 108} 109 110size_t PageCache::releaseFromStart(size_t maxBytes) { 111 size_t bytesReleased = 0; 112 113 while (maxBytes > 0 && !mActivePages.empty()) { 114 List<Page *>::iterator it = mActivePages.begin(); 115 116 Page *page = *it; 117 118 if (maxBytes < page->mSize) { 119 break; 120 } 121 122 mActivePages.erase(it); 123 124 maxBytes -= page->mSize; 125 bytesReleased += page->mSize; 126 127 releasePage(page); 128 } 129 130 mTotalSize -= bytesReleased; 131 return bytesReleased; 132} 133 134void PageCache::copy(size_t from, void *data, size_t size) { 135 LOGV("copy from %d size %d", from, size); 136 137 CHECK_LE(from + size, mTotalSize); 138 139 size_t offset = 0; 140 List<Page *>::iterator it = mActivePages.begin(); 141 while (from >= offset + (*it)->mSize) { 142 offset += (*it)->mSize; 143 ++it; 144 } 145 146 size_t delta = from - offset; 147 size_t avail = (*it)->mSize - delta; 148 149 if (avail >= size) { 150 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 151 return; 152 } 153 154 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 155 ++it; 156 data = (uint8_t *)data + avail; 157 size -= avail; 158 159 while (size > 0) { 160 size_t copy = (*it)->mSize; 161 if (copy > size) { 162 copy = size; 163 } 164 memcpy(data, (*it)->mData, copy); 165 data = (uint8_t *)data + copy; 166 size -= copy; 167 ++it; 168 } 169} 170 171//////////////////////////////////////////////////////////////////////////////// 172 173NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 174 : mSource(source), 175 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 176 mLooper(new ALooper), 177 mCache(new PageCache(kPageSize)), 178 mCacheOffset(0), 179 mFinalStatus(OK), 180 mLastAccessPos(0), 181 mFetching(true), 182 mLastFetchTimeUs(-1) { 183 mLooper->setName("NuCachedSource2"); 184 mLooper->registerHandler(mReflector); 185 mLooper->start(); 186 187 Mutex::Autolock autoLock(mLock); 188 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 189} 190 191NuCachedSource2::~NuCachedSource2() { 192 mLooper->stop(); 193 mLooper->unregisterHandler(mReflector->id()); 194 195 delete mCache; 196 mCache = NULL; 197} 198 199status_t NuCachedSource2::initCheck() const { 200 return mSource->initCheck(); 201} 202 203status_t NuCachedSource2::getSize(off64_t *size) { 204 return mSource->getSize(size); 205} 206 207uint32_t NuCachedSource2::flags() { 208 return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource; 209} 210 211void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 212 switch (msg->what()) { 213 case kWhatFetchMore: 214 { 215 onFetch(); 216 break; 217 } 218 219 case kWhatRead: 220 { 221 onRead(msg); 222 break; 223 } 224 225 default: 226 TRESPASS(); 227 } 228} 229 230void NuCachedSource2::fetchInternal() { 231 LOGV("fetchInternal"); 232 233 CHECK_EQ(mFinalStatus, (status_t)OK); 234 235 PageCache::Page *page = mCache->acquirePage(); 236 237 ssize_t n = mSource->readAt( 238 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 239 240 Mutex::Autolock autoLock(mLock); 241 242 if (n < 0) { 243 LOGE("source returned error %ld", n); 244 mFinalStatus = n; 245 mCache->releasePage(page); 246 } else if (n == 0) { 247 LOGI("ERROR_END_OF_STREAM"); 248 mFinalStatus = ERROR_END_OF_STREAM; 249 mCache->releasePage(page); 250 } else { 251 page->mSize = n; 252 mCache->appendPage(page); 253 } 254} 255 256void NuCachedSource2::onFetch() { 257 LOGV("onFetch"); 258 259 if (mFinalStatus != OK) { 260 LOGV("EOS reached, done prefetching for now"); 261 mFetching = false; 262 } 263 264 bool keepAlive = 265 !mFetching 266 && mFinalStatus == OK 267 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 268 269 if (mFetching || keepAlive) { 270 if (keepAlive) { 271 LOGI("Keep alive"); 272 } 273 274 fetchInternal(); 275 276 mLastFetchTimeUs = ALooper::GetNowUs(); 277 278 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 279 LOGI("Cache full, done prefetching for now"); 280 mFetching = false; 281 } 282 } else { 283 Mutex::Autolock autoLock(mLock); 284 restartPrefetcherIfNecessary_l(); 285 } 286 287 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 288 mFetching ? 0 : 100000ll); 289} 290 291void NuCachedSource2::onRead(const sp<AMessage> &msg) { 292 LOGV("onRead"); 293 294 int64_t offset; 295 CHECK(msg->findInt64("offset", &offset)); 296 297 void *data; 298 CHECK(msg->findPointer("data", &data)); 299 300 size_t size; 301 CHECK(msg->findSize("size", &size)); 302 303 ssize_t result = readInternal(offset, data, size); 304 305 if (result == -EAGAIN) { 306 msg->post(50000); 307 return; 308 } 309 310 Mutex::Autolock autoLock(mLock); 311 312 CHECK(mAsyncResult == NULL); 313 314 mAsyncResult = new AMessage; 315 mAsyncResult->setInt32("result", result); 316 317 mCondition.signal(); 318} 319 320void NuCachedSource2::restartPrefetcherIfNecessary_l( 321 bool ignoreLowWaterThreshold) { 322 static const size_t kGrayArea = 256 * 1024; 323 324 if (mFetching || mFinalStatus != OK) { 325 return; 326 } 327 328 if (!ignoreLowWaterThreshold 329 && mCacheOffset + mCache->totalSize() - mLastAccessPos 330 >= kLowWaterThreshold) { 331 return; 332 } 333 334 size_t maxBytes = mLastAccessPos - mCacheOffset; 335 if (maxBytes < kGrayArea) { 336 return; 337 } 338 339 maxBytes -= kGrayArea; 340 341 size_t actualBytes = mCache->releaseFromStart(maxBytes); 342 mCacheOffset += actualBytes; 343 344 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 345 mFetching = true; 346} 347 348ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 349 Mutex::Autolock autoSerializer(mSerializer); 350 351 LOGV("readAt offset %lld, size %d", offset, size); 352 353 Mutex::Autolock autoLock(mLock); 354 355 // If the request can be completely satisfied from the cache, do so. 356 357 if (offset >= mCacheOffset 358 && offset + size <= mCacheOffset + mCache->totalSize()) { 359 size_t delta = offset - mCacheOffset; 360 mCache->copy(delta, data, size); 361 362 mLastAccessPos = offset + size; 363 364 return size; 365 } 366 367 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 368 msg->setInt64("offset", offset); 369 msg->setPointer("data", data); 370 msg->setSize("size", size); 371 372 CHECK(mAsyncResult == NULL); 373 msg->post(); 374 375 while (mAsyncResult == NULL) { 376 mCondition.wait(mLock); 377 } 378 379 int32_t result; 380 CHECK(mAsyncResult->findInt32("result", &result)); 381 382 mAsyncResult.clear(); 383 384 if (result > 0) { 385 mLastAccessPos = offset + result; 386 } 387 388 return (ssize_t)result; 389} 390 391size_t NuCachedSource2::cachedSize() { 392 Mutex::Autolock autoLock(mLock); 393 return mCacheOffset + mCache->totalSize(); 394} 395 396size_t NuCachedSource2::approxDataRemaining(bool *eos) { 397 Mutex::Autolock autoLock(mLock); 398 return approxDataRemaining_l(eos); 399} 400 401size_t NuCachedSource2::approxDataRemaining_l(bool *eos) { 402 *eos = (mFinalStatus != OK); 403 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 404 if (mLastAccessPos < lastBytePosCached) { 405 return lastBytePosCached - mLastAccessPos; 406 } 407 return 0; 408} 409 410ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 411 LOGV("readInternal offset %lld size %d", offset, size); 412 413 Mutex::Autolock autoLock(mLock); 414 415 if (offset < mCacheOffset 416 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 417 static const off64_t kPadding = 32768; 418 419 // In the presence of multiple decoded streams, once of them will 420 // trigger this seek request, the other one will request data "nearby" 421 // soon, adjust the seek position so that that subsequent request 422 // does not trigger another seek. 423 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 424 425 seekInternal_l(seekOffset); 426 } 427 428 size_t delta = offset - mCacheOffset; 429 430 if (mFinalStatus != OK) { 431 if (delta >= mCache->totalSize()) { 432 return mFinalStatus; 433 } 434 435 size_t avail = mCache->totalSize() - delta; 436 mCache->copy(delta, data, avail); 437 438 return avail; 439 } 440 441 if (offset + size <= mCacheOffset + mCache->totalSize()) { 442 mCache->copy(delta, data, size); 443 444 return size; 445 } 446 447 LOGV("deferring read"); 448 449 return -EAGAIN; 450} 451 452status_t NuCachedSource2::seekInternal_l(off64_t offset) { 453 mLastAccessPos = offset; 454 455 if (offset >= mCacheOffset 456 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 457 return OK; 458 } 459 460 LOGI("new range: offset= %lld", offset); 461 462 mCacheOffset = offset; 463 464 size_t totalSize = mCache->totalSize(); 465 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 466 467 mFinalStatus = OK; 468 mFetching = true; 469 470 return OK; 471} 472 473void NuCachedSource2::resumeFetchingIfNecessary() { 474 Mutex::Autolock autoLock(mLock); 475 476 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 477} 478 479DecryptHandle* NuCachedSource2::DrmInitialization(DrmManagerClient* client) { 480 return mSource->DrmInitialization(client); 481} 482 483void NuCachedSource2::getDrmInfo(DecryptHandle **handle, DrmManagerClient **client) { 484 mSource->getDrmInfo(handle, client); 485} 486 487String8 NuCachedSource2::getUri() { 488 return mSource->getUri(); 489} 490} // namespace android 491 492