NuCachedSource2.cpp revision c7fc37a3dab9bd1f96713649f351b5990e6316ff
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "NuCachedSource2" 18#include <utils/Log.h> 19 20#include "include/NuCachedSource2.h" 21 22#include <media/stagefright/foundation/ADebug.h> 23#include <media/stagefright/foundation/AMessage.h> 24#include <media/stagefright/MediaErrors.h> 25 26namespace android { 27 28struct PageCache { 29 PageCache(size_t pageSize); 30 ~PageCache(); 31 32 struct Page { 33 void *mData; 34 size_t mSize; 35 }; 36 37 Page *acquirePage(); 38 void releasePage(Page *page); 39 40 void appendPage(Page *page); 41 size_t releaseFromStart(size_t maxBytes); 42 43 size_t totalSize() const { 44 return mTotalSize; 45 } 46 47 void copy(size_t from, void *data, size_t size); 48 49private: 50 size_t mPageSize; 51 size_t mTotalSize; 52 53 List<Page *> mActivePages; 54 List<Page *> mFreePages; 55 56 void freePages(List<Page *> *list); 57 58 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 59}; 60 61PageCache::PageCache(size_t pageSize) 62 : mPageSize(pageSize), 63 mTotalSize(0) { 64} 65 66PageCache::~PageCache() { 67 freePages(&mActivePages); 68 freePages(&mFreePages); 69} 70 71void PageCache::freePages(List<Page *> *list) { 72 List<Page *>::iterator it = list->begin(); 73 while (it != list->end()) { 74 Page *page = *it; 75 76 free(page->mData); 77 delete page; 78 page = NULL; 79 80 ++it; 81 } 82} 83 84PageCache::Page *PageCache::acquirePage() { 85 if (!mFreePages.empty()) { 86 List<Page *>::iterator it = mFreePages.begin(); 87 Page *page = *it; 88 mFreePages.erase(it); 89 90 return page; 91 } 92 93 Page *page = new Page; 94 page->mData = malloc(mPageSize); 95 page->mSize = 0; 96 97 return page; 98} 99 100void PageCache::releasePage(Page *page) { 101 page->mSize = 0; 102 mFreePages.push_back(page); 103} 104 105void PageCache::appendPage(Page *page) { 106 mTotalSize += page->mSize; 107 mActivePages.push_back(page); 108} 109 110size_t PageCache::releaseFromStart(size_t maxBytes) { 111 size_t bytesReleased = 0; 112 113 while (maxBytes > 0 && !mActivePages.empty()) { 114 List<Page *>::iterator it = mActivePages.begin(); 115 116 Page *page = *it; 117 118 if (maxBytes < page->mSize) { 119 break; 120 } 121 122 mActivePages.erase(it); 123 124 maxBytes -= page->mSize; 125 bytesReleased += page->mSize; 126 127 releasePage(page); 128 } 129 130 mTotalSize -= bytesReleased; 131 return bytesReleased; 132} 133 134void PageCache::copy(size_t from, void *data, size_t size) { 135 LOGV("copy from %d size %d", from, size); 136 137 CHECK_LE(from + size, mTotalSize); 138 139 size_t offset = 0; 140 List<Page *>::iterator it = mActivePages.begin(); 141 while (from >= offset + (*it)->mSize) { 142 offset += (*it)->mSize; 143 ++it; 144 } 145 146 size_t delta = from - offset; 147 size_t avail = (*it)->mSize - delta; 148 149 if (avail >= size) { 150 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 151 return; 152 } 153 154 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 155 ++it; 156 data = (uint8_t *)data + avail; 157 size -= avail; 158 159 while (size > 0) { 160 size_t copy = (*it)->mSize; 161 if (copy > size) { 162 copy = size; 163 } 164 memcpy(data, (*it)->mData, copy); 165 data = (uint8_t *)data + copy; 166 size -= copy; 167 ++it; 168 } 169} 170 171//////////////////////////////////////////////////////////////////////////////// 172 173NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 174 : mSource(source), 175 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 176 mLooper(new ALooper), 177 mCache(new PageCache(kPageSize)), 178 mCacheOffset(0), 179 mFinalStatus(OK), 180 mLastAccessPos(0), 181 mFetching(true), 182 mLastFetchTimeUs(-1), 183 mSuspended(false) { 184 mLooper->setName("NuCachedSource2"); 185 mLooper->registerHandler(mReflector); 186 mLooper->start(); 187 188 Mutex::Autolock autoLock(mLock); 189 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 190} 191 192NuCachedSource2::~NuCachedSource2() { 193 mLooper->stop(); 194 mLooper->unregisterHandler(mReflector->id()); 195 196 delete mCache; 197 mCache = NULL; 198} 199 200status_t NuCachedSource2::initCheck() const { 201 return mSource->initCheck(); 202} 203 204status_t NuCachedSource2::getSize(off64_t *size) { 205 return mSource->getSize(size); 206} 207 208uint32_t NuCachedSource2::flags() { 209 return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource; 210} 211 212void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 213 switch (msg->what()) { 214 case kWhatFetchMore: 215 { 216 onFetch(); 217 break; 218 } 219 220 case kWhatRead: 221 { 222 onRead(msg); 223 break; 224 } 225 226 case kWhatSuspend: 227 { 228 onSuspend(); 229 break; 230 } 231 232 default: 233 TRESPASS(); 234 } 235} 236 237void NuCachedSource2::fetchInternal() { 238 LOGV("fetchInternal"); 239 240 CHECK_EQ(mFinalStatus, (status_t)OK); 241 242 PageCache::Page *page = mCache->acquirePage(); 243 244 ssize_t n = mSource->readAt( 245 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 246 247 Mutex::Autolock autoLock(mLock); 248 249 if (n < 0) { 250 LOGE("source returned error %ld", n); 251 mFinalStatus = n; 252 mCache->releasePage(page); 253 } else if (n == 0) { 254 LOGI("ERROR_END_OF_STREAM"); 255 mFinalStatus = ERROR_END_OF_STREAM; 256 mCache->releasePage(page); 257 } else { 258 page->mSize = n; 259 mCache->appendPage(page); 260 } 261} 262 263void NuCachedSource2::onFetch() { 264 LOGV("onFetch"); 265 266 if (mFinalStatus != OK) { 267 LOGV("EOS reached, done prefetching for now"); 268 mFetching = false; 269 } 270 271 bool keepAlive = 272 !mFetching 273 && !mSuspended 274 && mFinalStatus == OK 275 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 276 277 if (mFetching || keepAlive) { 278 if (keepAlive) { 279 LOGI("Keep alive"); 280 } 281 282 fetchInternal(); 283 284 mLastFetchTimeUs = ALooper::GetNowUs(); 285 286 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 287 LOGI("Cache full, done prefetching for now"); 288 mFetching = false; 289 } 290 } else if (!mSuspended) { 291 Mutex::Autolock autoLock(mLock); 292 restartPrefetcherIfNecessary_l(); 293 } 294 295 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 296 mFetching ? 0 : 100000ll); 297} 298 299void NuCachedSource2::onRead(const sp<AMessage> &msg) { 300 LOGV("onRead"); 301 302 int64_t offset; 303 CHECK(msg->findInt64("offset", &offset)); 304 305 void *data; 306 CHECK(msg->findPointer("data", &data)); 307 308 size_t size; 309 CHECK(msg->findSize("size", &size)); 310 311 ssize_t result = readInternal(offset, data, size); 312 313 if (result == -EAGAIN) { 314 msg->post(50000); 315 return; 316 } 317 318 Mutex::Autolock autoLock(mLock); 319 320 CHECK(mAsyncResult == NULL); 321 322 mAsyncResult = new AMessage; 323 mAsyncResult->setInt32("result", result); 324 325 mCondition.signal(); 326} 327 328void NuCachedSource2::restartPrefetcherIfNecessary_l( 329 bool ignoreLowWaterThreshold) { 330 static const size_t kGrayArea = 256 * 1024; 331 332 if (mFetching || mFinalStatus != OK) { 333 return; 334 } 335 336 if (!ignoreLowWaterThreshold 337 && mCacheOffset + mCache->totalSize() - mLastAccessPos 338 >= kLowWaterThreshold) { 339 return; 340 } 341 342 size_t maxBytes = mLastAccessPos - mCacheOffset; 343 if (maxBytes < kGrayArea) { 344 return; 345 } 346 347 maxBytes -= kGrayArea; 348 349 size_t actualBytes = mCache->releaseFromStart(maxBytes); 350 mCacheOffset += actualBytes; 351 352 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 353 mFetching = true; 354} 355 356ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 357 Mutex::Autolock autoSerializer(mSerializer); 358 359 LOGV("readAt offset %ld, size %d", offset, size); 360 361 Mutex::Autolock autoLock(mLock); 362 363 // If the request can be completely satisfied from the cache, do so. 364 365 if (offset >= mCacheOffset 366 && offset + size <= mCacheOffset + mCache->totalSize()) { 367 size_t delta = offset - mCacheOffset; 368 mCache->copy(delta, data, size); 369 370 mLastAccessPos = offset + size; 371 372 return size; 373 } 374 375 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 376 msg->setInt64("offset", offset); 377 msg->setPointer("data", data); 378 msg->setSize("size", size); 379 380 CHECK(mAsyncResult == NULL); 381 msg->post(); 382 383 while (mAsyncResult == NULL) { 384 mCondition.wait(mLock); 385 } 386 387 int32_t result; 388 CHECK(mAsyncResult->findInt32("result", &result)); 389 390 mAsyncResult.clear(); 391 392 if (result > 0) { 393 mLastAccessPos = offset + result; 394 } 395 396 return (ssize_t)result; 397} 398 399size_t NuCachedSource2::cachedSize() { 400 Mutex::Autolock autoLock(mLock); 401 return mCacheOffset + mCache->totalSize(); 402} 403 404size_t NuCachedSource2::approxDataRemaining(bool *eos) { 405 Mutex::Autolock autoLock(mLock); 406 return approxDataRemaining_l(eos); 407} 408 409size_t NuCachedSource2::approxDataRemaining_l(bool *eos) { 410 *eos = (mFinalStatus != OK); 411 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 412 if (mLastAccessPos < lastBytePosCached) { 413 return lastBytePosCached - mLastAccessPos; 414 } 415 return 0; 416} 417 418ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 419 LOGV("readInternal offset %ld size %d", offset, size); 420 421 Mutex::Autolock autoLock(mLock); 422 423 if (offset < mCacheOffset 424 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 425 static const off64_t kPadding = 32768; 426 427 // In the presence of multiple decoded streams, once of them will 428 // trigger this seek request, the other one will request data "nearby" 429 // soon, adjust the seek position so that that subsequent request 430 // does not trigger another seek. 431 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 432 433 seekInternal_l(seekOffset); 434 } 435 436 size_t delta = offset - mCacheOffset; 437 438 if (mFinalStatus != OK) { 439 if (delta >= mCache->totalSize()) { 440 return mFinalStatus; 441 } 442 443 size_t avail = mCache->totalSize() - delta; 444 mCache->copy(delta, data, avail); 445 446 return avail; 447 } 448 449 if (offset + size <= mCacheOffset + mCache->totalSize()) { 450 mCache->copy(delta, data, size); 451 452 return size; 453 } 454 455 LOGV("deferring read"); 456 457 return -EAGAIN; 458} 459 460status_t NuCachedSource2::seekInternal_l(off64_t offset) { 461 mLastAccessPos = offset; 462 463 if (offset >= mCacheOffset 464 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 465 return OK; 466 } 467 468 LOGI("new range: offset= %ld", offset); 469 470 mCacheOffset = offset; 471 472 size_t totalSize = mCache->totalSize(); 473 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 474 475 mFinalStatus = OK; 476 mFetching = true; 477 478 return OK; 479} 480 481void NuCachedSource2::clearCacheAndResume() { 482 LOGV("clearCacheAndResume"); 483 484 Mutex::Autolock autoLock(mLock); 485 486 CHECK(mSuspended); 487 488 mCacheOffset = 0; 489 mFinalStatus = OK; 490 mLastAccessPos = 0; 491 mLastFetchTimeUs = -1; 492 493 size_t totalSize = mCache->totalSize(); 494 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 495 496 mFetching = true; 497 mSuspended = false; 498} 499 500void NuCachedSource2::suspend() { 501 (new AMessage(kWhatSuspend, mReflector->id()))->post(); 502 503 while (!mSuspended) { 504 usleep(10000); 505 } 506} 507 508void NuCachedSource2::onSuspend() { 509 Mutex::Autolock autoLock(mLock); 510 511 mFetching = false; 512 mSuspended = true; 513} 514 515void NuCachedSource2::resumeFetchingIfNecessary() { 516 Mutex::Autolock autoLock(mLock); 517 518 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 519} 520 521DecryptHandle* NuCachedSource2::DrmInitialization(DrmManagerClient* client) { 522 return mSource->DrmInitialization(client); 523} 524 525void NuCachedSource2::getDrmInfo(DecryptHandle **handle, DrmManagerClient **client) { 526 mSource->getDrmInfo(handle, client); 527} 528 529String8 NuCachedSource2::getUri() { 530 return mSource->getUri(); 531} 532} // namespace android 533 534