NuCachedSource2.cpp revision 310962976d575c0a97ec7a768e9cca0b2361daea
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/MediaErrors.h>
26
27namespace android {
28
29struct PageCache {
30    PageCache(size_t pageSize);
31    ~PageCache();
32
33    struct Page {
34        void *mData;
35        size_t mSize;
36    };
37
38    Page *acquirePage();
39    void releasePage(Page *page);
40
41    void appendPage(Page *page);
42    size_t releaseFromStart(size_t maxBytes);
43
44    size_t totalSize() const {
45        return mTotalSize;
46    }
47
48    void copy(size_t from, void *data, size_t size);
49
50private:
51    size_t mPageSize;
52    size_t mTotalSize;
53
54    List<Page *> mActivePages;
55    List<Page *> mFreePages;
56
57    void freePages(List<Page *> *list);
58
59    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
60};
61
62PageCache::PageCache(size_t pageSize)
63    : mPageSize(pageSize),
64      mTotalSize(0) {
65}
66
67PageCache::~PageCache() {
68    freePages(&mActivePages);
69    freePages(&mFreePages);
70}
71
72void PageCache::freePages(List<Page *> *list) {
73    List<Page *>::iterator it = list->begin();
74    while (it != list->end()) {
75        Page *page = *it;
76
77        free(page->mData);
78        delete page;
79        page = NULL;
80
81        ++it;
82    }
83}
84
85PageCache::Page *PageCache::acquirePage() {
86    if (!mFreePages.empty()) {
87        List<Page *>::iterator it = mFreePages.begin();
88        Page *page = *it;
89        mFreePages.erase(it);
90
91        return page;
92    }
93
94    Page *page = new Page;
95    page->mData = malloc(mPageSize);
96    page->mSize = 0;
97
98    return page;
99}
100
101void PageCache::releasePage(Page *page) {
102    page->mSize = 0;
103    mFreePages.push_back(page);
104}
105
106void PageCache::appendPage(Page *page) {
107    mTotalSize += page->mSize;
108    mActivePages.push_back(page);
109}
110
111size_t PageCache::releaseFromStart(size_t maxBytes) {
112    size_t bytesReleased = 0;
113
114    while (maxBytes > 0 && !mActivePages.empty()) {
115        List<Page *>::iterator it = mActivePages.begin();
116
117        Page *page = *it;
118
119        if (maxBytes < page->mSize) {
120            break;
121        }
122
123        mActivePages.erase(it);
124
125        maxBytes -= page->mSize;
126        bytesReleased += page->mSize;
127
128        releasePage(page);
129    }
130
131    mTotalSize -= bytesReleased;
132    return bytesReleased;
133}
134
135void PageCache::copy(size_t from, void *data, size_t size) {
136    LOGV("copy from %d size %d", from, size);
137
138    if (size == 0) {
139        return;
140    }
141
142    CHECK_LE(from + size, mTotalSize);
143
144    size_t offset = 0;
145    List<Page *>::iterator it = mActivePages.begin();
146    while (from >= offset + (*it)->mSize) {
147        offset += (*it)->mSize;
148        ++it;
149    }
150
151    size_t delta = from - offset;
152    size_t avail = (*it)->mSize - delta;
153
154    if (avail >= size) {
155        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
156        return;
157    }
158
159    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
160    ++it;
161    data = (uint8_t *)data + avail;
162    size -= avail;
163
164    while (size > 0) {
165        size_t copy = (*it)->mSize;
166        if (copy > size) {
167            copy = size;
168        }
169        memcpy(data, (*it)->mData, copy);
170        data = (uint8_t *)data + copy;
171        size -= copy;
172        ++it;
173    }
174}
175
176////////////////////////////////////////////////////////////////////////////////
177
178NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
179    : mSource(source),
180      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
181      mLooper(new ALooper),
182      mCache(new PageCache(kPageSize)),
183      mCacheOffset(0),
184      mFinalStatus(OK),
185      mLastAccessPos(0),
186      mFetching(true),
187      mLastFetchTimeUs(-1) {
188    mLooper->setName("NuCachedSource2");
189    mLooper->registerHandler(mReflector);
190    mLooper->start();
191
192    Mutex::Autolock autoLock(mLock);
193    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
194}
195
196NuCachedSource2::~NuCachedSource2() {
197    mLooper->stop();
198    mLooper->unregisterHandler(mReflector->id());
199
200    delete mCache;
201    mCache = NULL;
202}
203
204status_t NuCachedSource2::initCheck() const {
205    return mSource->initCheck();
206}
207
208status_t NuCachedSource2::getSize(off64_t *size) {
209    return mSource->getSize(size);
210}
211
212uint32_t NuCachedSource2::flags() {
213    return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
214}
215
216void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
217    switch (msg->what()) {
218        case kWhatFetchMore:
219        {
220            onFetch();
221            break;
222        }
223
224        case kWhatRead:
225        {
226            onRead(msg);
227            break;
228        }
229
230        default:
231            TRESPASS();
232    }
233}
234
235void NuCachedSource2::fetchInternal() {
236    LOGV("fetchInternal");
237
238    CHECK_EQ(mFinalStatus, (status_t)OK);
239
240    PageCache::Page *page = mCache->acquirePage();
241
242    ssize_t n = mSource->readAt(
243            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
244
245    Mutex::Autolock autoLock(mLock);
246
247    if (n < 0) {
248        LOGE("source returned error %ld", n);
249        mFinalStatus = n;
250        mCache->releasePage(page);
251    } else if (n == 0) {
252        LOGI("ERROR_END_OF_STREAM");
253        mFinalStatus = ERROR_END_OF_STREAM;
254        mCache->releasePage(page);
255    } else {
256        page->mSize = n;
257        mCache->appendPage(page);
258    }
259}
260
261void NuCachedSource2::onFetch() {
262    LOGV("onFetch");
263
264    if (mFinalStatus != OK) {
265        LOGV("EOS reached, done prefetching for now");
266        mFetching = false;
267    }
268
269    bool keepAlive =
270        !mFetching
271            && mFinalStatus == OK
272            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
273
274    if (mFetching || keepAlive) {
275        if (keepAlive) {
276            LOGI("Keep alive");
277        }
278
279        fetchInternal();
280
281        mLastFetchTimeUs = ALooper::GetNowUs();
282
283        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
284            LOGI("Cache full, done prefetching for now");
285            mFetching = false;
286        }
287    } else {
288        Mutex::Autolock autoLock(mLock);
289        restartPrefetcherIfNecessary_l();
290    }
291
292    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
293            mFetching ? 0 : 100000ll);
294}
295
296void NuCachedSource2::onRead(const sp<AMessage> &msg) {
297    LOGV("onRead");
298
299    int64_t offset;
300    CHECK(msg->findInt64("offset", &offset));
301
302    void *data;
303    CHECK(msg->findPointer("data", &data));
304
305    size_t size;
306    CHECK(msg->findSize("size", &size));
307
308    ssize_t result = readInternal(offset, data, size);
309
310    if (result == -EAGAIN) {
311        msg->post(50000);
312        return;
313    }
314
315    Mutex::Autolock autoLock(mLock);
316
317    CHECK(mAsyncResult == NULL);
318
319    mAsyncResult = new AMessage;
320    mAsyncResult->setInt32("result", result);
321
322    mCondition.signal();
323}
324
325void NuCachedSource2::restartPrefetcherIfNecessary_l(
326        bool ignoreLowWaterThreshold) {
327    static const size_t kGrayArea = 1024 * 1024;
328
329    if (mFetching || mFinalStatus != OK) {
330        return;
331    }
332
333    if (!ignoreLowWaterThreshold
334            && mCacheOffset + mCache->totalSize() - mLastAccessPos
335                >= kLowWaterThreshold) {
336        return;
337    }
338
339    size_t maxBytes = mLastAccessPos - mCacheOffset;
340    if (maxBytes < kGrayArea) {
341        return;
342    }
343
344    maxBytes -= kGrayArea;
345
346    size_t actualBytes = mCache->releaseFromStart(maxBytes);
347    mCacheOffset += actualBytes;
348
349    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
350    mFetching = true;
351}
352
353ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
354    Mutex::Autolock autoSerializer(mSerializer);
355
356    LOGV("readAt offset %lld, size %d", offset, size);
357
358    Mutex::Autolock autoLock(mLock);
359
360    // If the request can be completely satisfied from the cache, do so.
361
362    if (offset >= mCacheOffset
363            && offset + size <= mCacheOffset + mCache->totalSize()) {
364        size_t delta = offset - mCacheOffset;
365        mCache->copy(delta, data, size);
366
367        mLastAccessPos = offset + size;
368
369        return size;
370    }
371
372    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
373    msg->setInt64("offset", offset);
374    msg->setPointer("data", data);
375    msg->setSize("size", size);
376
377    CHECK(mAsyncResult == NULL);
378    msg->post();
379
380    while (mAsyncResult == NULL) {
381        mCondition.wait(mLock);
382    }
383
384    int32_t result;
385    CHECK(mAsyncResult->findInt32("result", &result));
386
387    mAsyncResult.clear();
388
389    if (result > 0) {
390        mLastAccessPos = offset + result;
391    }
392
393    return (ssize_t)result;
394}
395
396size_t NuCachedSource2::cachedSize() {
397    Mutex::Autolock autoLock(mLock);
398    return mCacheOffset + mCache->totalSize();
399}
400
401size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
402    Mutex::Autolock autoLock(mLock);
403    return approxDataRemaining_l(finalStatus);
404}
405
406size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
407    *finalStatus = mFinalStatus;
408    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
409    if (mLastAccessPos < lastBytePosCached) {
410        return lastBytePosCached - mLastAccessPos;
411    }
412    return 0;
413}
414
415ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
416    LOGV("readInternal offset %lld size %d", offset, size);
417
418    Mutex::Autolock autoLock(mLock);
419
420    if (offset < mCacheOffset
421            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
422        static const off64_t kPadding = 256 * 1024;
423
424        // In the presence of multiple decoded streams, once of them will
425        // trigger this seek request, the other one will request data "nearby"
426        // soon, adjust the seek position so that that subsequent request
427        // does not trigger another seek.
428        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
429
430        seekInternal_l(seekOffset);
431    }
432
433    size_t delta = offset - mCacheOffset;
434
435    if (mFinalStatus != OK) {
436        if (delta >= mCache->totalSize()) {
437            return mFinalStatus;
438        }
439
440        size_t avail = mCache->totalSize() - delta;
441        mCache->copy(delta, data, avail);
442
443        return avail;
444    }
445
446    if (offset + size <= mCacheOffset + mCache->totalSize()) {
447        mCache->copy(delta, data, size);
448
449        return size;
450    }
451
452    LOGV("deferring read");
453
454    return -EAGAIN;
455}
456
457status_t NuCachedSource2::seekInternal_l(off64_t offset) {
458    mLastAccessPos = offset;
459
460    if (offset >= mCacheOffset
461            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
462        return OK;
463    }
464
465    LOGI("new range: offset= %lld", offset);
466
467    mCacheOffset = offset;
468
469    size_t totalSize = mCache->totalSize();
470    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
471
472    mFinalStatus = OK;
473    mFetching = true;
474
475    return OK;
476}
477
478void NuCachedSource2::resumeFetchingIfNecessary() {
479    Mutex::Autolock autoLock(mLock);
480
481    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
482}
483
484DecryptHandle* NuCachedSource2::DrmInitialization() {
485    return mSource->DrmInitialization();
486}
487
488void NuCachedSource2::getDrmInfo(DecryptHandle **handle, DrmManagerClient **client) {
489    mSource->getDrmInfo(handle, client);
490}
491
492String8 NuCachedSource2::getUri() {
493    return mSource->getUri();
494}
495
496}  // namespace android
497