NuCachedSource2.cpp revision 34ef0f32c8fc0186236a27e07405328cc1f7c56d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "NuCachedSource2"
18#include <utils/Log.h>
19
20#include "include/NuCachedSource2.h"
21
22#include <media/stagefright/foundation/ADebug.h>
23#include <media/stagefright/foundation/AMessage.h>
24#include <media/stagefright/MediaErrors.h>
25
26namespace android {
27
28struct PageCache {
29    PageCache(size_t pageSize);
30    ~PageCache();
31
32    struct Page {
33        void *mData;
34        size_t mSize;
35    };
36
37    Page *acquirePage();
38    void releasePage(Page *page);
39
40    void appendPage(Page *page);
41    size_t releaseFromStart(size_t maxBytes);
42
43    size_t totalSize() const {
44        return mTotalSize;
45    }
46
47    void copy(size_t from, void *data, size_t size);
48
49private:
50    size_t mPageSize;
51    size_t mTotalSize;
52
53    List<Page *> mActivePages;
54    List<Page *> mFreePages;
55
56    void freePages(List<Page *> *list);
57
58    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
59};
60
61PageCache::PageCache(size_t pageSize)
62    : mPageSize(pageSize),
63      mTotalSize(0) {
64}
65
66PageCache::~PageCache() {
67    freePages(&mActivePages);
68    freePages(&mFreePages);
69}
70
71void PageCache::freePages(List<Page *> *list) {
72    List<Page *>::iterator it = list->begin();
73    while (it != list->end()) {
74        Page *page = *it;
75
76        free(page->mData);
77        delete page;
78        page = NULL;
79
80        ++it;
81    }
82}
83
84PageCache::Page *PageCache::acquirePage() {
85    if (!mFreePages.empty()) {
86        List<Page *>::iterator it = mFreePages.begin();
87        Page *page = *it;
88        mFreePages.erase(it);
89
90        return page;
91    }
92
93    Page *page = new Page;
94    page->mData = malloc(mPageSize);
95    page->mSize = 0;
96
97    return page;
98}
99
100void PageCache::releasePage(Page *page) {
101    page->mSize = 0;
102    mFreePages.push_back(page);
103}
104
105void PageCache::appendPage(Page *page) {
106    mTotalSize += page->mSize;
107    mActivePages.push_back(page);
108}
109
110size_t PageCache::releaseFromStart(size_t maxBytes) {
111    size_t bytesReleased = 0;
112
113    while (maxBytes > 0 && !mActivePages.empty()) {
114        List<Page *>::iterator it = mActivePages.begin();
115
116        Page *page = *it;
117
118        if (maxBytes < page->mSize) {
119            break;
120        }
121
122        mActivePages.erase(it);
123
124        maxBytes -= page->mSize;
125        bytesReleased += page->mSize;
126
127        releasePage(page);
128    }
129
130    mTotalSize -= bytesReleased;
131    return bytesReleased;
132}
133
134void PageCache::copy(size_t from, void *data, size_t size) {
135    LOGV("copy from %d size %d", from, size);
136
137    CHECK_LE(from + size, mTotalSize);
138
139    size_t offset = 0;
140    List<Page *>::iterator it = mActivePages.begin();
141    while (from >= offset + (*it)->mSize) {
142        offset += (*it)->mSize;
143        ++it;
144    }
145
146    size_t delta = from - offset;
147    size_t avail = (*it)->mSize - delta;
148
149    if (avail >= size) {
150        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
151        return;
152    }
153
154    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
155    ++it;
156    data = (uint8_t *)data + avail;
157    size -= avail;
158
159    while (size > 0) {
160        size_t copy = (*it)->mSize;
161        if (copy > size) {
162            copy = size;
163        }
164        memcpy(data, (*it)->mData, copy);
165        data = (uint8_t *)data + copy;
166        size -= copy;
167        ++it;
168    }
169}
170
171////////////////////////////////////////////////////////////////////////////////
172
173NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
174    : mSource(source),
175      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
176      mLooper(new ALooper),
177      mCache(new PageCache(kPageSize)),
178      mCacheOffset(0),
179      mFinalStatus(OK),
180      mLastAccessPos(0),
181      mFetching(true),
182      mLastFetchTimeUs(-1),
183      mSuspended(false) {
184    mLooper->setName("NuCachedSource2");
185    mLooper->registerHandler(mReflector);
186    mLooper->start();
187
188    Mutex::Autolock autoLock(mLock);
189    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
190}
191
192NuCachedSource2::~NuCachedSource2() {
193    mLooper->stop();
194    mLooper->unregisterHandler(mReflector->id());
195
196    delete mCache;
197    mCache = NULL;
198}
199
200status_t NuCachedSource2::initCheck() const {
201    return mSource->initCheck();
202}
203
204status_t NuCachedSource2::getSize(off_t *size) {
205    return mSource->getSize(size);
206}
207
208uint32_t NuCachedSource2::flags() {
209    return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
210}
211
212void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
213    switch (msg->what()) {
214        case kWhatFetchMore:
215        {
216            onFetch();
217            break;
218        }
219
220        case kWhatRead:
221        {
222            onRead(msg);
223            break;
224        }
225
226        case kWhatSuspend:
227        {
228            onSuspend();
229            break;
230        }
231
232        default:
233            TRESPASS();
234    }
235}
236
237void NuCachedSource2::fetchInternal() {
238    LOGV("fetchInternal");
239
240    CHECK_EQ(mFinalStatus, (status_t)OK);
241
242    PageCache::Page *page = mCache->acquirePage();
243
244    ssize_t n = mSource->readAt(
245            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
246
247    Mutex::Autolock autoLock(mLock);
248
249    if (n < 0) {
250        LOGE("source returned error %ld", n);
251        mFinalStatus = n;
252        mCache->releasePage(page);
253    } else if (n == 0) {
254        LOGI("ERROR_END_OF_STREAM");
255        mFinalStatus = ERROR_END_OF_STREAM;
256        mCache->releasePage(page);
257    } else {
258        page->mSize = n;
259        mCache->appendPage(page);
260    }
261}
262
263void NuCachedSource2::onFetch() {
264    LOGV("onFetch");
265
266    if (mFinalStatus != OK) {
267        LOGV("EOS reached, done prefetching for now");
268        mFetching = false;
269    }
270
271    bool keepAlive =
272        !mFetching
273            && !mSuspended
274            && mFinalStatus == OK
275            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
276
277    if (mFetching || keepAlive) {
278        if (keepAlive) {
279            LOGI("Keep alive");
280        }
281
282        fetchInternal();
283
284        mLastFetchTimeUs = ALooper::GetNowUs();
285
286        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
287            LOGI("Cache full, done prefetching for now");
288            mFetching = false;
289        }
290    } else if (!mSuspended) {
291        Mutex::Autolock autoLock(mLock);
292        restartPrefetcherIfNecessary_l();
293    }
294
295    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
296            mFetching ? 0 : 100000ll);
297}
298
299void NuCachedSource2::onRead(const sp<AMessage> &msg) {
300    LOGV("onRead");
301
302    int64_t offset;
303    CHECK(msg->findInt64("offset", &offset));
304
305    void *data;
306    CHECK(msg->findPointer("data", &data));
307
308    size_t size;
309    CHECK(msg->findSize("size", &size));
310
311    ssize_t result = readInternal(offset, data, size);
312
313    if (result == -EAGAIN) {
314        msg->post(50000);
315        return;
316    }
317
318    Mutex::Autolock autoLock(mLock);
319
320    CHECK(mAsyncResult == NULL);
321
322    mAsyncResult = new AMessage;
323    mAsyncResult->setInt32("result", result);
324
325    mCondition.signal();
326}
327
328void NuCachedSource2::restartPrefetcherIfNecessary_l(
329        bool ignoreLowWaterThreshold) {
330    static const size_t kGrayArea = 256 * 1024;
331
332    if (mFetching || mFinalStatus != OK) {
333        return;
334    }
335
336    if (!ignoreLowWaterThreshold
337            && mCacheOffset + mCache->totalSize() - mLastAccessPos
338                >= kLowWaterThreshold) {
339        return;
340    }
341
342    size_t maxBytes = mLastAccessPos - mCacheOffset;
343    if (maxBytes < kGrayArea) {
344        return;
345    }
346
347    maxBytes -= kGrayArea;
348
349    size_t actualBytes = mCache->releaseFromStart(maxBytes);
350    mCacheOffset += actualBytes;
351
352    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
353    mFetching = true;
354}
355
356ssize_t NuCachedSource2::readAt(off_t offset, void *data, size_t size) {
357    Mutex::Autolock autoSerializer(mSerializer);
358
359    LOGV("readAt offset %ld, size %d", offset, size);
360
361    Mutex::Autolock autoLock(mLock);
362
363    // If the request can be completely satisfied from the cache, do so.
364
365    if (offset >= mCacheOffset
366            && offset + size <= mCacheOffset + mCache->totalSize()) {
367        size_t delta = offset - mCacheOffset;
368        mCache->copy(delta, data, size);
369
370        mLastAccessPos = offset + size;
371
372        return size;
373    }
374
375    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
376    msg->setInt64("offset", offset);
377    msg->setPointer("data", data);
378    msg->setSize("size", size);
379
380    CHECK(mAsyncResult == NULL);
381    msg->post();
382
383    while (mAsyncResult == NULL) {
384        mCondition.wait(mLock);
385    }
386
387    int32_t result;
388    CHECK(mAsyncResult->findInt32("result", &result));
389
390    mAsyncResult.clear();
391
392    if (result > 0) {
393        mLastAccessPos = offset + result;
394    }
395
396    return (ssize_t)result;
397}
398
399size_t NuCachedSource2::cachedSize() {
400    Mutex::Autolock autoLock(mLock);
401    return mCacheOffset + mCache->totalSize();
402}
403
404size_t NuCachedSource2::approxDataRemaining(bool *eos) {
405    Mutex::Autolock autoLock(mLock);
406    return approxDataRemaining_l(eos);
407}
408
409size_t NuCachedSource2::approxDataRemaining_l(bool *eos) {
410    *eos = (mFinalStatus != OK);
411    off_t lastBytePosCached = mCacheOffset + mCache->totalSize();
412    if (mLastAccessPos < lastBytePosCached) {
413        return lastBytePosCached - mLastAccessPos;
414    }
415    return 0;
416}
417
418ssize_t NuCachedSource2::readInternal(off_t offset, void *data, size_t size) {
419    LOGV("readInternal offset %ld size %d", offset, size);
420
421    Mutex::Autolock autoLock(mLock);
422
423    if (offset < mCacheOffset
424            || offset >= (off_t)(mCacheOffset + mCache->totalSize())) {
425        static const off_t kPadding = 32768;
426
427        // In the presence of multiple decoded streams, once of them will
428        // trigger this seek request, the other one will request data "nearby"
429        // soon, adjust the seek position so that that subsequent request
430        // does not trigger another seek.
431        off_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
432
433        seekInternal_l(seekOffset);
434    }
435
436    size_t delta = offset - mCacheOffset;
437
438    if (mFinalStatus != OK) {
439        if (delta >= mCache->totalSize()) {
440            return mFinalStatus;
441        }
442
443        size_t avail = mCache->totalSize() - delta;
444        mCache->copy(delta, data, avail);
445
446        return avail;
447    }
448
449    if (offset + size <= mCacheOffset + mCache->totalSize()) {
450        mCache->copy(delta, data, size);
451
452        return size;
453    }
454
455    LOGV("deferring read");
456
457    return -EAGAIN;
458}
459
460status_t NuCachedSource2::seekInternal_l(off_t offset) {
461    mLastAccessPos = offset;
462
463    if (offset >= mCacheOffset
464            && offset <= (off_t)(mCacheOffset + mCache->totalSize())) {
465        return OK;
466    }
467
468    LOGI("new range: offset= %ld", offset);
469
470    mCacheOffset = offset;
471
472    size_t totalSize = mCache->totalSize();
473    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
474
475    mFinalStatus = OK;
476    mFetching = true;
477
478    return OK;
479}
480
481void NuCachedSource2::clearCacheAndResume() {
482    LOGV("clearCacheAndResume");
483
484    Mutex::Autolock autoLock(mLock);
485
486    CHECK(mSuspended);
487
488    mCacheOffset = 0;
489    mFinalStatus = OK;
490    mLastAccessPos = 0;
491    mLastFetchTimeUs = -1;
492
493    size_t totalSize = mCache->totalSize();
494    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
495
496    mFetching = true;
497    mSuspended = false;
498}
499
500void NuCachedSource2::suspend() {
501    (new AMessage(kWhatSuspend, mReflector->id()))->post();
502
503    while (!mSuspended) {
504        usleep(10000);
505    }
506}
507
508void NuCachedSource2::onSuspend() {
509    Mutex::Autolock autoLock(mLock);
510
511    mFetching = false;
512    mSuspended = true;
513}
514
515void NuCachedSource2::resumeFetchingIfNecessary() {
516    Mutex::Autolock autoLock(mLock);
517
518    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
519}
520
521DecryptHandle* NuCachedSource2::DrmInitialization(DrmManagerClient* client) {
522    return mSource->DrmInitialization(client);
523}
524
525void NuCachedSource2::getDrmInfo(DecryptHandle **handle, DrmManagerClient **client) {
526    mSource->getDrmInfo(handle, client);
527}
528
529}  // namespace android
530
531