NuCachedSource2.cpp revision b5ce361d19e69fe156f7188c9ee0f4734b259874
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NuCachedSource2" 19#include <utils/Log.h> 20 21#include "include/NuCachedSource2.h" 22 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/MediaErrors.h> 26 27namespace android { 28 29struct PageCache { 30 PageCache(size_t pageSize); 31 ~PageCache(); 32 33 struct Page { 34 void *mData; 35 size_t mSize; 36 }; 37 38 Page *acquirePage(); 39 void releasePage(Page *page); 40 41 void appendPage(Page *page); 42 size_t releaseFromStart(size_t maxBytes); 43 44 size_t totalSize() const { 45 return mTotalSize; 46 } 47 48 void copy(size_t from, void *data, size_t size); 49 50private: 51 size_t mPageSize; 52 size_t mTotalSize; 53 54 List<Page *> mActivePages; 55 List<Page *> mFreePages; 56 57 void freePages(List<Page *> *list); 58 59 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 60}; 61 62PageCache::PageCache(size_t pageSize) 63 : mPageSize(pageSize), 64 mTotalSize(0) { 65} 66 67PageCache::~PageCache() { 68 freePages(&mActivePages); 69 freePages(&mFreePages); 70} 71 72void PageCache::freePages(List<Page *> *list) { 73 List<Page *>::iterator it = list->begin(); 74 while (it != list->end()) { 75 Page *page = *it; 76 77 free(page->mData); 78 delete page; 79 page = NULL; 80 81 ++it; 82 } 83} 84 85PageCache::Page *PageCache::acquirePage() { 86 if (!mFreePages.empty()) { 87 List<Page *>::iterator it = mFreePages.begin(); 88 Page *page = *it; 89 mFreePages.erase(it); 90 91 return page; 92 } 93 94 Page *page = new Page; 95 page->mData = malloc(mPageSize); 96 page->mSize = 0; 97 98 return page; 99} 100 101void PageCache::releasePage(Page *page) { 102 page->mSize = 0; 103 mFreePages.push_back(page); 104} 105 106void PageCache::appendPage(Page *page) { 107 mTotalSize += page->mSize; 108 mActivePages.push_back(page); 109} 110 111size_t PageCache::releaseFromStart(size_t maxBytes) { 112 size_t bytesReleased = 0; 113 114 while (maxBytes > 0 && !mActivePages.empty()) { 115 List<Page *>::iterator it = mActivePages.begin(); 116 117 Page *page = *it; 118 119 if (maxBytes < page->mSize) { 120 break; 121 } 122 123 mActivePages.erase(it); 124 125 maxBytes -= page->mSize; 126 bytesReleased += page->mSize; 127 128 releasePage(page); 129 } 130 131 mTotalSize -= bytesReleased; 132 return bytesReleased; 133} 134 135void PageCache::copy(size_t from, void *data, size_t size) { 136 LOGV("copy from %d size %d", from, size); 137 138 CHECK_LE(from + size, mTotalSize); 139 140 size_t offset = 0; 141 List<Page *>::iterator it = mActivePages.begin(); 142 while (from >= offset + (*it)->mSize) { 143 offset += (*it)->mSize; 144 ++it; 145 } 146 147 size_t delta = from - offset; 148 size_t avail = (*it)->mSize - delta; 149 150 if (avail >= size) { 151 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 152 return; 153 } 154 155 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 156 ++it; 157 data = (uint8_t *)data + avail; 158 size -= avail; 159 160 while (size > 0) { 161 size_t copy = (*it)->mSize; 162 if (copy > size) { 163 copy = size; 164 } 165 memcpy(data, (*it)->mData, copy); 166 data = (uint8_t *)data + copy; 167 size -= copy; 168 ++it; 169 } 170} 171 172//////////////////////////////////////////////////////////////////////////////// 173 174NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 175 : mSource(source), 176 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 177 mLooper(new ALooper), 178 mCache(new PageCache(kPageSize)), 179 mCacheOffset(0), 180 mFinalStatus(OK), 181 mLastAccessPos(0), 182 mFetching(true), 183 mLastFetchTimeUs(-1) { 184 mLooper->setName("NuCachedSource2"); 185 mLooper->registerHandler(mReflector); 186 mLooper->start(); 187 188 Mutex::Autolock autoLock(mLock); 189 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 190} 191 192NuCachedSource2::~NuCachedSource2() { 193 mLooper->stop(); 194 mLooper->unregisterHandler(mReflector->id()); 195 196 delete mCache; 197 mCache = NULL; 198} 199 200status_t NuCachedSource2::initCheck() const { 201 return mSource->initCheck(); 202} 203 204status_t NuCachedSource2::getSize(off64_t *size) { 205 return mSource->getSize(size); 206} 207 208uint32_t NuCachedSource2::flags() { 209 return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource; 210} 211 212void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 213 switch (msg->what()) { 214 case kWhatFetchMore: 215 { 216 onFetch(); 217 break; 218 } 219 220 case kWhatRead: 221 { 222 onRead(msg); 223 break; 224 } 225 226 default: 227 TRESPASS(); 228 } 229} 230 231void NuCachedSource2::fetchInternal() { 232 LOGV("fetchInternal"); 233 234 CHECK_EQ(mFinalStatus, (status_t)OK); 235 236 PageCache::Page *page = mCache->acquirePage(); 237 238 ssize_t n = mSource->readAt( 239 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 240 241 Mutex::Autolock autoLock(mLock); 242 243 if (n < 0) { 244 LOGE("source returned error %ld", n); 245 mFinalStatus = n; 246 mCache->releasePage(page); 247 } else if (n == 0) { 248 LOGI("ERROR_END_OF_STREAM"); 249 mFinalStatus = ERROR_END_OF_STREAM; 250 mCache->releasePage(page); 251 } else { 252 page->mSize = n; 253 mCache->appendPage(page); 254 } 255} 256 257void NuCachedSource2::onFetch() { 258 LOGV("onFetch"); 259 260 if (mFinalStatus != OK) { 261 LOGV("EOS reached, done prefetching for now"); 262 mFetching = false; 263 } 264 265 bool keepAlive = 266 !mFetching 267 && mFinalStatus == OK 268 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 269 270 if (mFetching || keepAlive) { 271 if (keepAlive) { 272 LOGI("Keep alive"); 273 } 274 275 fetchInternal(); 276 277 mLastFetchTimeUs = ALooper::GetNowUs(); 278 279 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 280 LOGI("Cache full, done prefetching for now"); 281 mFetching = false; 282 } 283 } else { 284 Mutex::Autolock autoLock(mLock); 285 restartPrefetcherIfNecessary_l(); 286 } 287 288 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 289 mFetching ? 0 : 100000ll); 290} 291 292void NuCachedSource2::onRead(const sp<AMessage> &msg) { 293 LOGV("onRead"); 294 295 int64_t offset; 296 CHECK(msg->findInt64("offset", &offset)); 297 298 void *data; 299 CHECK(msg->findPointer("data", &data)); 300 301 size_t size; 302 CHECK(msg->findSize("size", &size)); 303 304 ssize_t result = readInternal(offset, data, size); 305 306 if (result == -EAGAIN) { 307 msg->post(50000); 308 return; 309 } 310 311 Mutex::Autolock autoLock(mLock); 312 313 CHECK(mAsyncResult == NULL); 314 315 mAsyncResult = new AMessage; 316 mAsyncResult->setInt32("result", result); 317 318 mCondition.signal(); 319} 320 321void NuCachedSource2::restartPrefetcherIfNecessary_l( 322 bool ignoreLowWaterThreshold) { 323 static const size_t kGrayArea = 1024 * 1024; 324 325 if (mFetching || mFinalStatus != OK) { 326 return; 327 } 328 329 if (!ignoreLowWaterThreshold 330 && mCacheOffset + mCache->totalSize() - mLastAccessPos 331 >= kLowWaterThreshold) { 332 return; 333 } 334 335 size_t maxBytes = mLastAccessPos - mCacheOffset; 336 if (maxBytes < kGrayArea) { 337 return; 338 } 339 340 maxBytes -= kGrayArea; 341 342 size_t actualBytes = mCache->releaseFromStart(maxBytes); 343 mCacheOffset += actualBytes; 344 345 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 346 mFetching = true; 347} 348 349ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 350 Mutex::Autolock autoSerializer(mSerializer); 351 352 LOGV("readAt offset %lld, size %d", offset, size); 353 354 Mutex::Autolock autoLock(mLock); 355 356 // If the request can be completely satisfied from the cache, do so. 357 358 if (offset >= mCacheOffset 359 && offset + size <= mCacheOffset + mCache->totalSize()) { 360 size_t delta = offset - mCacheOffset; 361 mCache->copy(delta, data, size); 362 363 mLastAccessPos = offset + size; 364 365 return size; 366 } 367 368 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 369 msg->setInt64("offset", offset); 370 msg->setPointer("data", data); 371 msg->setSize("size", size); 372 373 CHECK(mAsyncResult == NULL); 374 msg->post(); 375 376 while (mAsyncResult == NULL) { 377 mCondition.wait(mLock); 378 } 379 380 int32_t result; 381 CHECK(mAsyncResult->findInt32("result", &result)); 382 383 mAsyncResult.clear(); 384 385 if (result > 0) { 386 mLastAccessPos = offset + result; 387 } 388 389 return (ssize_t)result; 390} 391 392size_t NuCachedSource2::cachedSize() { 393 Mutex::Autolock autoLock(mLock); 394 return mCacheOffset + mCache->totalSize(); 395} 396 397size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) { 398 Mutex::Autolock autoLock(mLock); 399 return approxDataRemaining_l(finalStatus); 400} 401 402size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) { 403 *finalStatus = mFinalStatus; 404 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 405 if (mLastAccessPos < lastBytePosCached) { 406 return lastBytePosCached - mLastAccessPos; 407 } 408 return 0; 409} 410 411ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 412 LOGV("readInternal offset %lld size %d", offset, size); 413 414 Mutex::Autolock autoLock(mLock); 415 416 if (offset < mCacheOffset 417 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 418 static const off64_t kPadding = 256 * 1024; 419 420 // In the presence of multiple decoded streams, once of them will 421 // trigger this seek request, the other one will request data "nearby" 422 // soon, adjust the seek position so that that subsequent request 423 // does not trigger another seek. 424 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 425 426 seekInternal_l(seekOffset); 427 } 428 429 size_t delta = offset - mCacheOffset; 430 431 if (mFinalStatus != OK) { 432 if (delta >= mCache->totalSize()) { 433 return mFinalStatus; 434 } 435 436 size_t avail = mCache->totalSize() - delta; 437 mCache->copy(delta, data, avail); 438 439 return avail; 440 } 441 442 if (offset + size <= mCacheOffset + mCache->totalSize()) { 443 mCache->copy(delta, data, size); 444 445 return size; 446 } 447 448 LOGV("deferring read"); 449 450 return -EAGAIN; 451} 452 453status_t NuCachedSource2::seekInternal_l(off64_t offset) { 454 mLastAccessPos = offset; 455 456 if (offset >= mCacheOffset 457 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 458 return OK; 459 } 460 461 LOGI("new range: offset= %lld", offset); 462 463 mCacheOffset = offset; 464 465 size_t totalSize = mCache->totalSize(); 466 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 467 468 mFinalStatus = OK; 469 mFetching = true; 470 471 return OK; 472} 473 474void NuCachedSource2::resumeFetchingIfNecessary() { 475 Mutex::Autolock autoLock(mLock); 476 477 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 478} 479 480sp<DecryptHandle> NuCachedSource2::DrmInitialization() { 481 return mSource->DrmInitialization(); 482} 483 484void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 485 mSource->getDrmInfo(handle, client); 486} 487 488String8 NuCachedSource2::getUri() { 489 return mSource->getUri(); 490} 491 492} // namespace android 493