NuCachedSource2.cpp revision 67802977b6f0aa8d6f14f85dadcf32a3cadb9c07
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/MediaErrors.h>
26
27namespace android {
28
29struct PageCache {
30    PageCache(size_t pageSize);
31    ~PageCache();
32
33    struct Page {
34        void *mData;
35        size_t mSize;
36    };
37
38    Page *acquirePage();
39    void releasePage(Page *page);
40
41    void appendPage(Page *page);
42    size_t releaseFromStart(size_t maxBytes);
43
44    size_t totalSize() const {
45        return mTotalSize;
46    }
47
48    void copy(size_t from, void *data, size_t size);
49
50private:
51    size_t mPageSize;
52    size_t mTotalSize;
53
54    List<Page *> mActivePages;
55    List<Page *> mFreePages;
56
57    void freePages(List<Page *> *list);
58
59    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
60};
61
62PageCache::PageCache(size_t pageSize)
63    : mPageSize(pageSize),
64      mTotalSize(0) {
65}
66
67PageCache::~PageCache() {
68    freePages(&mActivePages);
69    freePages(&mFreePages);
70}
71
72void PageCache::freePages(List<Page *> *list) {
73    List<Page *>::iterator it = list->begin();
74    while (it != list->end()) {
75        Page *page = *it;
76
77        free(page->mData);
78        delete page;
79        page = NULL;
80
81        ++it;
82    }
83}
84
85PageCache::Page *PageCache::acquirePage() {
86    if (!mFreePages.empty()) {
87        List<Page *>::iterator it = mFreePages.begin();
88        Page *page = *it;
89        mFreePages.erase(it);
90
91        return page;
92    }
93
94    Page *page = new Page;
95    page->mData = malloc(mPageSize);
96    page->mSize = 0;
97
98    return page;
99}
100
101void PageCache::releasePage(Page *page) {
102    page->mSize = 0;
103    mFreePages.push_back(page);
104}
105
106void PageCache::appendPage(Page *page) {
107    mTotalSize += page->mSize;
108    mActivePages.push_back(page);
109}
110
111size_t PageCache::releaseFromStart(size_t maxBytes) {
112    size_t bytesReleased = 0;
113
114    while (maxBytes > 0 && !mActivePages.empty()) {
115        List<Page *>::iterator it = mActivePages.begin();
116
117        Page *page = *it;
118
119        if (maxBytes < page->mSize) {
120            break;
121        }
122
123        mActivePages.erase(it);
124
125        maxBytes -= page->mSize;
126        bytesReleased += page->mSize;
127
128        releasePage(page);
129    }
130
131    mTotalSize -= bytesReleased;
132    return bytesReleased;
133}
134
135void PageCache::copy(size_t from, void *data, size_t size) {
136    LOGV("copy from %d size %d", from, size);
137
138    if (size == 0) {
139        return;
140    }
141
142    CHECK_LE(from + size, mTotalSize);
143
144    size_t offset = 0;
145    List<Page *>::iterator it = mActivePages.begin();
146    while (from >= offset + (*it)->mSize) {
147        offset += (*it)->mSize;
148        ++it;
149    }
150
151    size_t delta = from - offset;
152    size_t avail = (*it)->mSize - delta;
153
154    if (avail >= size) {
155        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
156        return;
157    }
158
159    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
160    ++it;
161    data = (uint8_t *)data + avail;
162    size -= avail;
163
164    while (size > 0) {
165        size_t copy = (*it)->mSize;
166        if (copy > size) {
167            copy = size;
168        }
169        memcpy(data, (*it)->mData, copy);
170        data = (uint8_t *)data + copy;
171        size -= copy;
172        ++it;
173    }
174}
175
176////////////////////////////////////////////////////////////////////////////////
177
178NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
179    : mSource(source),
180      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
181      mLooper(new ALooper),
182      mCache(new PageCache(kPageSize)),
183      mCacheOffset(0),
184      mFinalStatus(OK),
185      mLastAccessPos(0),
186      mFetching(true),
187      mLastFetchTimeUs(-1) {
188    mLooper->setName("NuCachedSource2");
189    mLooper->registerHandler(mReflector);
190    mLooper->start();
191
192    Mutex::Autolock autoLock(mLock);
193    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
194}
195
196NuCachedSource2::~NuCachedSource2() {
197    mLooper->stop();
198    mLooper->unregisterHandler(mReflector->id());
199
200    delete mCache;
201    mCache = NULL;
202}
203
204status_t NuCachedSource2::initCheck() const {
205    return mSource->initCheck();
206}
207
208status_t NuCachedSource2::getSize(off64_t *size) {
209    return mSource->getSize(size);
210}
211
212uint32_t NuCachedSource2::flags() {
213    return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
214}
215
216void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
217    switch (msg->what()) {
218        case kWhatFetchMore:
219        {
220            onFetch();
221            break;
222        }
223
224        case kWhatRead:
225        {
226            onRead(msg);
227            break;
228        }
229
230        default:
231            TRESPASS();
232    }
233}
234
235void NuCachedSource2::fetchInternal() {
236    LOGV("fetchInternal");
237
238    CHECK_EQ(mFinalStatus, (status_t)OK);
239
240    PageCache::Page *page = mCache->acquirePage();
241
242    ssize_t n = mSource->readAt(
243            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
244
245    Mutex::Autolock autoLock(mLock);
246
247    if (n < 0) {
248        LOGE("source returned error %ld", n);
249        mFinalStatus = n;
250        mCache->releasePage(page);
251    } else if (n == 0) {
252        LOGI("ERROR_END_OF_STREAM");
253        mFinalStatus = ERROR_END_OF_STREAM;
254        mCache->releasePage(page);
255    } else {
256        page->mSize = n;
257        mCache->appendPage(page);
258    }
259}
260
261void NuCachedSource2::onFetch() {
262    LOGV("onFetch");
263
264    if (mFinalStatus != OK) {
265        LOGV("EOS reached, done prefetching for now");
266        mFetching = false;
267    }
268
269    bool keepAlive =
270        !mFetching
271            && mFinalStatus == OK
272            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
273
274    if (mFetching || keepAlive) {
275        if (keepAlive) {
276            LOGI("Keep alive");
277        }
278
279        fetchInternal();
280
281        mLastFetchTimeUs = ALooper::GetNowUs();
282
283        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
284            LOGI("Cache full, done prefetching for now");
285            mFetching = false;
286        }
287    } else {
288        Mutex::Autolock autoLock(mLock);
289        restartPrefetcherIfNecessary_l();
290    }
291
292    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
293            mFetching ? 0 : 100000ll);
294}
295
296void NuCachedSource2::onRead(const sp<AMessage> &msg) {
297    LOGV("onRead");
298
299    int64_t offset;
300    CHECK(msg->findInt64("offset", &offset));
301
302    void *data;
303    CHECK(msg->findPointer("data", &data));
304
305    size_t size;
306    CHECK(msg->findSize("size", &size));
307
308    ssize_t result = readInternal(offset, data, size);
309
310    if (result == -EAGAIN) {
311        msg->post(50000);
312        return;
313    }
314
315    Mutex::Autolock autoLock(mLock);
316
317    CHECK(mAsyncResult == NULL);
318
319    mAsyncResult = new AMessage;
320    mAsyncResult->setInt32("result", result);
321
322    mCondition.signal();
323}
324
325void NuCachedSource2::restartPrefetcherIfNecessary_l(
326        bool ignoreLowWaterThreshold, bool force) {
327    static const size_t kGrayArea = 1024 * 1024;
328
329    if (mFetching || mFinalStatus != OK) {
330        return;
331    }
332
333    if (!ignoreLowWaterThreshold && !force
334            && mCacheOffset + mCache->totalSize() - mLastAccessPos
335                >= kLowWaterThreshold) {
336        return;
337    }
338
339    size_t maxBytes = mLastAccessPos - mCacheOffset;
340
341    if (!force) {
342        if (maxBytes < kGrayArea) {
343            return;
344        }
345
346        maxBytes -= kGrayArea;
347    }
348
349    size_t actualBytes = mCache->releaseFromStart(maxBytes);
350    mCacheOffset += actualBytes;
351
352    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
353    mFetching = true;
354}
355
356ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
357    Mutex::Autolock autoSerializer(mSerializer);
358
359    LOGV("readAt offset %lld, size %d", offset, size);
360
361    Mutex::Autolock autoLock(mLock);
362
363    // If the request can be completely satisfied from the cache, do so.
364
365    if (offset >= mCacheOffset
366            && offset + size <= mCacheOffset + mCache->totalSize()) {
367        size_t delta = offset - mCacheOffset;
368        mCache->copy(delta, data, size);
369
370        mLastAccessPos = offset + size;
371
372        return size;
373    }
374
375    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
376    msg->setInt64("offset", offset);
377    msg->setPointer("data", data);
378    msg->setSize("size", size);
379
380    CHECK(mAsyncResult == NULL);
381    msg->post();
382
383    while (mAsyncResult == NULL) {
384        mCondition.wait(mLock);
385    }
386
387    int32_t result;
388    CHECK(mAsyncResult->findInt32("result", &result));
389
390    mAsyncResult.clear();
391
392    if (result > 0) {
393        mLastAccessPos = offset + result;
394    }
395
396    return (ssize_t)result;
397}
398
399size_t NuCachedSource2::cachedSize() {
400    Mutex::Autolock autoLock(mLock);
401    return mCacheOffset + mCache->totalSize();
402}
403
404size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
405    Mutex::Autolock autoLock(mLock);
406    return approxDataRemaining_l(finalStatus);
407}
408
409size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
410    *finalStatus = mFinalStatus;
411    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
412    if (mLastAccessPos < lastBytePosCached) {
413        return lastBytePosCached - mLastAccessPos;
414    }
415    return 0;
416}
417
418ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
419    CHECK_LE(size, (size_t)kHighWaterThreshold);
420
421    LOGV("readInternal offset %lld size %d", offset, size);
422
423    Mutex::Autolock autoLock(mLock);
424
425    if (!mFetching) {
426        mLastAccessPos = offset;
427        restartPrefetcherIfNecessary_l(
428                false, // ignoreLowWaterThreshold
429                true); // force
430    }
431
432    if (offset < mCacheOffset
433            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
434        static const off64_t kPadding = 256 * 1024;
435
436        // In the presence of multiple decoded streams, once of them will
437        // trigger this seek request, the other one will request data "nearby"
438        // soon, adjust the seek position so that that subsequent request
439        // does not trigger another seek.
440        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
441
442        seekInternal_l(seekOffset);
443    }
444
445    size_t delta = offset - mCacheOffset;
446
447    if (mFinalStatus != OK) {
448        if (delta >= mCache->totalSize()) {
449            return mFinalStatus;
450        }
451
452        size_t avail = mCache->totalSize() - delta;
453
454        if (avail > size) {
455            avail = size;
456        }
457
458        mCache->copy(delta, data, avail);
459
460        return avail;
461    }
462
463    if (offset + size <= mCacheOffset + mCache->totalSize()) {
464        mCache->copy(delta, data, size);
465
466        return size;
467    }
468
469    LOGV("deferring read");
470
471    return -EAGAIN;
472}
473
474status_t NuCachedSource2::seekInternal_l(off64_t offset) {
475    mLastAccessPos = offset;
476
477    if (offset >= mCacheOffset
478            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
479        return OK;
480    }
481
482    LOGI("new range: offset= %lld", offset);
483
484    mCacheOffset = offset;
485
486    size_t totalSize = mCache->totalSize();
487    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
488
489    mFinalStatus = OK;
490    mFetching = true;
491
492    return OK;
493}
494
495void NuCachedSource2::resumeFetchingIfNecessary() {
496    Mutex::Autolock autoLock(mLock);
497
498    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
499}
500
501sp<DecryptHandle> NuCachedSource2::DrmInitialization() {
502    return mSource->DrmInitialization();
503}
504
505void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
506    mSource->getDrmInfo(handle, client);
507}
508
509String8 NuCachedSource2::getUri() {
510    return mSource->getUri();
511}
512
513String8 NuCachedSource2::getMIMEType() const {
514    return mSource->getMIMEType();
515}
516
517}  // namespace android
518