NuCachedSource2.cpp revision b5ce361d19e69fe156f7188c9ee0f4734b259874
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/MediaErrors.h>
26
27namespace android {
28
29struct PageCache {
30    PageCache(size_t pageSize);
31    ~PageCache();
32
33    struct Page {
34        void *mData;
35        size_t mSize;
36    };
37
38    Page *acquirePage();
39    void releasePage(Page *page);
40
41    void appendPage(Page *page);
42    size_t releaseFromStart(size_t maxBytes);
43
44    size_t totalSize() const {
45        return mTotalSize;
46    }
47
48    void copy(size_t from, void *data, size_t size);
49
50private:
51    size_t mPageSize;
52    size_t mTotalSize;
53
54    List<Page *> mActivePages;
55    List<Page *> mFreePages;
56
57    void freePages(List<Page *> *list);
58
59    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
60};
61
62PageCache::PageCache(size_t pageSize)
63    : mPageSize(pageSize),
64      mTotalSize(0) {
65}
66
67PageCache::~PageCache() {
68    freePages(&mActivePages);
69    freePages(&mFreePages);
70}
71
72void PageCache::freePages(List<Page *> *list) {
73    List<Page *>::iterator it = list->begin();
74    while (it != list->end()) {
75        Page *page = *it;
76
77        free(page->mData);
78        delete page;
79        page = NULL;
80
81        ++it;
82    }
83}
84
85PageCache::Page *PageCache::acquirePage() {
86    if (!mFreePages.empty()) {
87        List<Page *>::iterator it = mFreePages.begin();
88        Page *page = *it;
89        mFreePages.erase(it);
90
91        return page;
92    }
93
94    Page *page = new Page;
95    page->mData = malloc(mPageSize);
96    page->mSize = 0;
97
98    return page;
99}
100
101void PageCache::releasePage(Page *page) {
102    page->mSize = 0;
103    mFreePages.push_back(page);
104}
105
106void PageCache::appendPage(Page *page) {
107    mTotalSize += page->mSize;
108    mActivePages.push_back(page);
109}
110
111size_t PageCache::releaseFromStart(size_t maxBytes) {
112    size_t bytesReleased = 0;
113
114    while (maxBytes > 0 && !mActivePages.empty()) {
115        List<Page *>::iterator it = mActivePages.begin();
116
117        Page *page = *it;
118
119        if (maxBytes < page->mSize) {
120            break;
121        }
122
123        mActivePages.erase(it);
124
125        maxBytes -= page->mSize;
126        bytesReleased += page->mSize;
127
128        releasePage(page);
129    }
130
131    mTotalSize -= bytesReleased;
132    return bytesReleased;
133}
134
135void PageCache::copy(size_t from, void *data, size_t size) {
136    LOGV("copy from %d size %d", from, size);
137
138    CHECK_LE(from + size, mTotalSize);
139
140    size_t offset = 0;
141    List<Page *>::iterator it = mActivePages.begin();
142    while (from >= offset + (*it)->mSize) {
143        offset += (*it)->mSize;
144        ++it;
145    }
146
147    size_t delta = from - offset;
148    size_t avail = (*it)->mSize - delta;
149
150    if (avail >= size) {
151        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
152        return;
153    }
154
155    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
156    ++it;
157    data = (uint8_t *)data + avail;
158    size -= avail;
159
160    while (size > 0) {
161        size_t copy = (*it)->mSize;
162        if (copy > size) {
163            copy = size;
164        }
165        memcpy(data, (*it)->mData, copy);
166        data = (uint8_t *)data + copy;
167        size -= copy;
168        ++it;
169    }
170}
171
172////////////////////////////////////////////////////////////////////////////////
173
174NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
175    : mSource(source),
176      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
177      mLooper(new ALooper),
178      mCache(new PageCache(kPageSize)),
179      mCacheOffset(0),
180      mFinalStatus(OK),
181      mLastAccessPos(0),
182      mFetching(true),
183      mLastFetchTimeUs(-1) {
184    mLooper->setName("NuCachedSource2");
185    mLooper->registerHandler(mReflector);
186    mLooper->start();
187
188    Mutex::Autolock autoLock(mLock);
189    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
190}
191
192NuCachedSource2::~NuCachedSource2() {
193    mLooper->stop();
194    mLooper->unregisterHandler(mReflector->id());
195
196    delete mCache;
197    mCache = NULL;
198}
199
200status_t NuCachedSource2::initCheck() const {
201    return mSource->initCheck();
202}
203
204status_t NuCachedSource2::getSize(off64_t *size) {
205    return mSource->getSize(size);
206}
207
208uint32_t NuCachedSource2::flags() {
209    return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
210}
211
212void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
213    switch (msg->what()) {
214        case kWhatFetchMore:
215        {
216            onFetch();
217            break;
218        }
219
220        case kWhatRead:
221        {
222            onRead(msg);
223            break;
224        }
225
226        default:
227            TRESPASS();
228    }
229}
230
231void NuCachedSource2::fetchInternal() {
232    LOGV("fetchInternal");
233
234    CHECK_EQ(mFinalStatus, (status_t)OK);
235
236    PageCache::Page *page = mCache->acquirePage();
237
238    ssize_t n = mSource->readAt(
239            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
240
241    Mutex::Autolock autoLock(mLock);
242
243    if (n < 0) {
244        LOGE("source returned error %ld", n);
245        mFinalStatus = n;
246        mCache->releasePage(page);
247    } else if (n == 0) {
248        LOGI("ERROR_END_OF_STREAM");
249        mFinalStatus = ERROR_END_OF_STREAM;
250        mCache->releasePage(page);
251    } else {
252        page->mSize = n;
253        mCache->appendPage(page);
254    }
255}
256
257void NuCachedSource2::onFetch() {
258    LOGV("onFetch");
259
260    if (mFinalStatus != OK) {
261        LOGV("EOS reached, done prefetching for now");
262        mFetching = false;
263    }
264
265    bool keepAlive =
266        !mFetching
267            && mFinalStatus == OK
268            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
269
270    if (mFetching || keepAlive) {
271        if (keepAlive) {
272            LOGI("Keep alive");
273        }
274
275        fetchInternal();
276
277        mLastFetchTimeUs = ALooper::GetNowUs();
278
279        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
280            LOGI("Cache full, done prefetching for now");
281            mFetching = false;
282        }
283    } else {
284        Mutex::Autolock autoLock(mLock);
285        restartPrefetcherIfNecessary_l();
286    }
287
288    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
289            mFetching ? 0 : 100000ll);
290}
291
292void NuCachedSource2::onRead(const sp<AMessage> &msg) {
293    LOGV("onRead");
294
295    int64_t offset;
296    CHECK(msg->findInt64("offset", &offset));
297
298    void *data;
299    CHECK(msg->findPointer("data", &data));
300
301    size_t size;
302    CHECK(msg->findSize("size", &size));
303
304    ssize_t result = readInternal(offset, data, size);
305
306    if (result == -EAGAIN) {
307        msg->post(50000);
308        return;
309    }
310
311    Mutex::Autolock autoLock(mLock);
312
313    CHECK(mAsyncResult == NULL);
314
315    mAsyncResult = new AMessage;
316    mAsyncResult->setInt32("result", result);
317
318    mCondition.signal();
319}
320
321void NuCachedSource2::restartPrefetcherIfNecessary_l(
322        bool ignoreLowWaterThreshold) {
323    static const size_t kGrayArea = 1024 * 1024;
324
325    if (mFetching || mFinalStatus != OK) {
326        return;
327    }
328
329    if (!ignoreLowWaterThreshold
330            && mCacheOffset + mCache->totalSize() - mLastAccessPos
331                >= kLowWaterThreshold) {
332        return;
333    }
334
335    size_t maxBytes = mLastAccessPos - mCacheOffset;
336    if (maxBytes < kGrayArea) {
337        return;
338    }
339
340    maxBytes -= kGrayArea;
341
342    size_t actualBytes = mCache->releaseFromStart(maxBytes);
343    mCacheOffset += actualBytes;
344
345    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
346    mFetching = true;
347}
348
349ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
350    Mutex::Autolock autoSerializer(mSerializer);
351
352    LOGV("readAt offset %lld, size %d", offset, size);
353
354    Mutex::Autolock autoLock(mLock);
355
356    // If the request can be completely satisfied from the cache, do so.
357
358    if (offset >= mCacheOffset
359            && offset + size <= mCacheOffset + mCache->totalSize()) {
360        size_t delta = offset - mCacheOffset;
361        mCache->copy(delta, data, size);
362
363        mLastAccessPos = offset + size;
364
365        return size;
366    }
367
368    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
369    msg->setInt64("offset", offset);
370    msg->setPointer("data", data);
371    msg->setSize("size", size);
372
373    CHECK(mAsyncResult == NULL);
374    msg->post();
375
376    while (mAsyncResult == NULL) {
377        mCondition.wait(mLock);
378    }
379
380    int32_t result;
381    CHECK(mAsyncResult->findInt32("result", &result));
382
383    mAsyncResult.clear();
384
385    if (result > 0) {
386        mLastAccessPos = offset + result;
387    }
388
389    return (ssize_t)result;
390}
391
392size_t NuCachedSource2::cachedSize() {
393    Mutex::Autolock autoLock(mLock);
394    return mCacheOffset + mCache->totalSize();
395}
396
397size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
398    Mutex::Autolock autoLock(mLock);
399    return approxDataRemaining_l(finalStatus);
400}
401
402size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
403    *finalStatus = mFinalStatus;
404    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
405    if (mLastAccessPos < lastBytePosCached) {
406        return lastBytePosCached - mLastAccessPos;
407    }
408    return 0;
409}
410
411ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
412    LOGV("readInternal offset %lld size %d", offset, size);
413
414    Mutex::Autolock autoLock(mLock);
415
416    if (offset < mCacheOffset
417            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
418        static const off64_t kPadding = 256 * 1024;
419
420        // In the presence of multiple decoded streams, once of them will
421        // trigger this seek request, the other one will request data "nearby"
422        // soon, adjust the seek position so that that subsequent request
423        // does not trigger another seek.
424        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
425
426        seekInternal_l(seekOffset);
427    }
428
429    size_t delta = offset - mCacheOffset;
430
431    if (mFinalStatus != OK) {
432        if (delta >= mCache->totalSize()) {
433            return mFinalStatus;
434        }
435
436        size_t avail = mCache->totalSize() - delta;
437        mCache->copy(delta, data, avail);
438
439        return avail;
440    }
441
442    if (offset + size <= mCacheOffset + mCache->totalSize()) {
443        mCache->copy(delta, data, size);
444
445        return size;
446    }
447
448    LOGV("deferring read");
449
450    return -EAGAIN;
451}
452
453status_t NuCachedSource2::seekInternal_l(off64_t offset) {
454    mLastAccessPos = offset;
455
456    if (offset >= mCacheOffset
457            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
458        return OK;
459    }
460
461    LOGI("new range: offset= %lld", offset);
462
463    mCacheOffset = offset;
464
465    size_t totalSize = mCache->totalSize();
466    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
467
468    mFinalStatus = OK;
469    mFetching = true;
470
471    return OK;
472}
473
474void NuCachedSource2::resumeFetchingIfNecessary() {
475    Mutex::Autolock autoLock(mLock);
476
477    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
478}
479
480sp<DecryptHandle> NuCachedSource2::DrmInitialization() {
481    return mSource->DrmInitialization();
482}
483
484void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
485    mSource->getDrmInfo(handle, client);
486}
487
488String8 NuCachedSource2::getUri() {
489    return mSource->getUri();
490}
491
492}  // namespace android
493