NuCachedSource2.cpp revision b33d2ac90cfce0fe6db8c3e979e7ae2bbfc28163
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22#include "include/HTTPBase.h"
23
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
26#include <media/stagefright/MediaErrors.h>
27
28namespace android {
29
30struct PageCache {
31    PageCache(size_t pageSize);
32    ~PageCache();
33
34    struct Page {
35        void *mData;
36        size_t mSize;
37    };
38
39    Page *acquirePage();
40    void releasePage(Page *page);
41
42    void appendPage(Page *page);
43    size_t releaseFromStart(size_t maxBytes);
44
45    size_t totalSize() const {
46        return mTotalSize;
47    }
48
49    void copy(size_t from, void *data, size_t size);
50
51private:
52    size_t mPageSize;
53    size_t mTotalSize;
54
55    List<Page *> mActivePages;
56    List<Page *> mFreePages;
57
58    void freePages(List<Page *> *list);
59
60    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
61};
62
63PageCache::PageCache(size_t pageSize)
64    : mPageSize(pageSize),
65      mTotalSize(0) {
66}
67
68PageCache::~PageCache() {
69    freePages(&mActivePages);
70    freePages(&mFreePages);
71}
72
73void PageCache::freePages(List<Page *> *list) {
74    List<Page *>::iterator it = list->begin();
75    while (it != list->end()) {
76        Page *page = *it;
77
78        free(page->mData);
79        delete page;
80        page = NULL;
81
82        ++it;
83    }
84}
85
86PageCache::Page *PageCache::acquirePage() {
87    if (!mFreePages.empty()) {
88        List<Page *>::iterator it = mFreePages.begin();
89        Page *page = *it;
90        mFreePages.erase(it);
91
92        return page;
93    }
94
95    Page *page = new Page;
96    page->mData = malloc(mPageSize);
97    page->mSize = 0;
98
99    return page;
100}
101
102void PageCache::releasePage(Page *page) {
103    page->mSize = 0;
104    mFreePages.push_back(page);
105}
106
107void PageCache::appendPage(Page *page) {
108    mTotalSize += page->mSize;
109    mActivePages.push_back(page);
110}
111
112size_t PageCache::releaseFromStart(size_t maxBytes) {
113    size_t bytesReleased = 0;
114
115    while (maxBytes > 0 && !mActivePages.empty()) {
116        List<Page *>::iterator it = mActivePages.begin();
117
118        Page *page = *it;
119
120        if (maxBytes < page->mSize) {
121            break;
122        }
123
124        mActivePages.erase(it);
125
126        maxBytes -= page->mSize;
127        bytesReleased += page->mSize;
128
129        releasePage(page);
130    }
131
132    mTotalSize -= bytesReleased;
133    return bytesReleased;
134}
135
136void PageCache::copy(size_t from, void *data, size_t size) {
137    LOGV("copy from %d size %d", from, size);
138
139    if (size == 0) {
140        return;
141    }
142
143    CHECK_LE(from + size, mTotalSize);
144
145    size_t offset = 0;
146    List<Page *>::iterator it = mActivePages.begin();
147    while (from >= offset + (*it)->mSize) {
148        offset += (*it)->mSize;
149        ++it;
150    }
151
152    size_t delta = from - offset;
153    size_t avail = (*it)->mSize - delta;
154
155    if (avail >= size) {
156        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
157        return;
158    }
159
160    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
161    ++it;
162    data = (uint8_t *)data + avail;
163    size -= avail;
164
165    while (size > 0) {
166        size_t copy = (*it)->mSize;
167        if (copy > size) {
168            copy = size;
169        }
170        memcpy(data, (*it)->mData, copy);
171        data = (uint8_t *)data + copy;
172        size -= copy;
173        ++it;
174    }
175}
176
177////////////////////////////////////////////////////////////////////////////////
178
179NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
180    : mSource(source),
181      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
182      mLooper(new ALooper),
183      mCache(new PageCache(kPageSize)),
184      mCacheOffset(0),
185      mFinalStatus(OK),
186      mLastAccessPos(0),
187      mFetching(true),
188      mLastFetchTimeUs(-1) {
189    mLooper->setName("NuCachedSource2");
190    mLooper->registerHandler(mReflector);
191    mLooper->start();
192
193    Mutex::Autolock autoLock(mLock);
194    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
195}
196
197NuCachedSource2::~NuCachedSource2() {
198    mLooper->stop();
199    mLooper->unregisterHandler(mReflector->id());
200
201    delete mCache;
202    mCache = NULL;
203}
204
205status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
206    if (mSource->flags() & kIsHTTPBasedSource) {
207        HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
208        return source->getEstimatedBandwidthKbps(kbps);
209    }
210    return ERROR_UNSUPPORTED;
211}
212
213status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
214    if (mSource->flags() & kIsHTTPBasedSource) {
215        HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
216        return source->setBandwidthStatCollectFreq(freqMs);
217    }
218    return ERROR_UNSUPPORTED;
219}
220
221status_t NuCachedSource2::initCheck() const {
222    return mSource->initCheck();
223}
224
225status_t NuCachedSource2::getSize(off64_t *size) {
226    return mSource->getSize(size);
227}
228
229uint32_t NuCachedSource2::flags() {
230    // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
231    uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
232    return (flags | kIsCachingDataSource);
233}
234
235void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
236    switch (msg->what()) {
237        case kWhatFetchMore:
238        {
239            onFetch();
240            break;
241        }
242
243        case kWhatRead:
244        {
245            onRead(msg);
246            break;
247        }
248
249        default:
250            TRESPASS();
251    }
252}
253
254void NuCachedSource2::fetchInternal() {
255    LOGV("fetchInternal");
256
257    CHECK_EQ(mFinalStatus, (status_t)OK);
258
259    PageCache::Page *page = mCache->acquirePage();
260
261    ssize_t n = mSource->readAt(
262            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
263
264    Mutex::Autolock autoLock(mLock);
265
266    if (n < 0) {
267        LOGE("source returned error %ld", n);
268        mFinalStatus = n;
269        mCache->releasePage(page);
270    } else if (n == 0) {
271        LOGI("ERROR_END_OF_STREAM");
272        mFinalStatus = ERROR_END_OF_STREAM;
273        mCache->releasePage(page);
274    } else {
275        page->mSize = n;
276        mCache->appendPage(page);
277    }
278}
279
280void NuCachedSource2::onFetch() {
281    LOGV("onFetch");
282
283    if (mFinalStatus != OK) {
284        LOGV("EOS reached, done prefetching for now");
285        mFetching = false;
286    }
287
288    bool keepAlive =
289        !mFetching
290            && mFinalStatus == OK
291            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
292
293    if (mFetching || keepAlive) {
294        if (keepAlive) {
295            LOGI("Keep alive");
296        }
297
298        fetchInternal();
299
300        mLastFetchTimeUs = ALooper::GetNowUs();
301
302        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
303            LOGI("Cache full, done prefetching for now");
304            mFetching = false;
305        }
306    } else {
307        Mutex::Autolock autoLock(mLock);
308        restartPrefetcherIfNecessary_l();
309    }
310
311    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
312            mFetching ? 0 : 100000ll);
313}
314
315void NuCachedSource2::onRead(const sp<AMessage> &msg) {
316    LOGV("onRead");
317
318    int64_t offset;
319    CHECK(msg->findInt64("offset", &offset));
320
321    void *data;
322    CHECK(msg->findPointer("data", &data));
323
324    size_t size;
325    CHECK(msg->findSize("size", &size));
326
327    ssize_t result = readInternal(offset, data, size);
328
329    if (result == -EAGAIN) {
330        msg->post(50000);
331        return;
332    }
333
334    Mutex::Autolock autoLock(mLock);
335
336    CHECK(mAsyncResult == NULL);
337
338    mAsyncResult = new AMessage;
339    mAsyncResult->setInt32("result", result);
340
341    mCondition.signal();
342}
343
344void NuCachedSource2::restartPrefetcherIfNecessary_l(
345        bool ignoreLowWaterThreshold, bool force) {
346    static const size_t kGrayArea = 1024 * 1024;
347
348    if (mFetching || mFinalStatus != OK) {
349        return;
350    }
351
352    if (!ignoreLowWaterThreshold && !force
353            && mCacheOffset + mCache->totalSize() - mLastAccessPos
354                >= kLowWaterThreshold) {
355        return;
356    }
357
358    size_t maxBytes = mLastAccessPos - mCacheOffset;
359
360    if (!force) {
361        if (maxBytes < kGrayArea) {
362            return;
363        }
364
365        maxBytes -= kGrayArea;
366    }
367
368    size_t actualBytes = mCache->releaseFromStart(maxBytes);
369    mCacheOffset += actualBytes;
370
371    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
372    mFetching = true;
373}
374
375ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
376    Mutex::Autolock autoSerializer(mSerializer);
377
378    LOGV("readAt offset %lld, size %d", offset, size);
379
380    Mutex::Autolock autoLock(mLock);
381
382    // If the request can be completely satisfied from the cache, do so.
383
384    if (offset >= mCacheOffset
385            && offset + size <= mCacheOffset + mCache->totalSize()) {
386        size_t delta = offset - mCacheOffset;
387        mCache->copy(delta, data, size);
388
389        mLastAccessPos = offset + size;
390
391        return size;
392    }
393
394    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
395    msg->setInt64("offset", offset);
396    msg->setPointer("data", data);
397    msg->setSize("size", size);
398
399    CHECK(mAsyncResult == NULL);
400    msg->post();
401
402    while (mAsyncResult == NULL) {
403        mCondition.wait(mLock);
404    }
405
406    int32_t result;
407    CHECK(mAsyncResult->findInt32("result", &result));
408
409    mAsyncResult.clear();
410
411    if (result > 0) {
412        mLastAccessPos = offset + result;
413    }
414
415    return (ssize_t)result;
416}
417
418size_t NuCachedSource2::cachedSize() {
419    Mutex::Autolock autoLock(mLock);
420    return mCacheOffset + mCache->totalSize();
421}
422
423size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
424    Mutex::Autolock autoLock(mLock);
425    return approxDataRemaining_l(finalStatus);
426}
427
428size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
429    *finalStatus = mFinalStatus;
430    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
431    if (mLastAccessPos < lastBytePosCached) {
432        return lastBytePosCached - mLastAccessPos;
433    }
434    return 0;
435}
436
437ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
438    CHECK_LE(size, (size_t)kHighWaterThreshold);
439
440    LOGV("readInternal offset %lld size %d", offset, size);
441
442    Mutex::Autolock autoLock(mLock);
443
444    if (!mFetching) {
445        mLastAccessPos = offset;
446        restartPrefetcherIfNecessary_l(
447                false, // ignoreLowWaterThreshold
448                true); // force
449    }
450
451    if (offset < mCacheOffset
452            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
453        static const off64_t kPadding = 256 * 1024;
454
455        // In the presence of multiple decoded streams, once of them will
456        // trigger this seek request, the other one will request data "nearby"
457        // soon, adjust the seek position so that that subsequent request
458        // does not trigger another seek.
459        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
460
461        seekInternal_l(seekOffset);
462    }
463
464    size_t delta = offset - mCacheOffset;
465
466    if (mFinalStatus != OK) {
467        if (delta >= mCache->totalSize()) {
468            return mFinalStatus;
469        }
470
471        size_t avail = mCache->totalSize() - delta;
472
473        if (avail > size) {
474            avail = size;
475        }
476
477        mCache->copy(delta, data, avail);
478
479        return avail;
480    }
481
482    if (offset + size <= mCacheOffset + mCache->totalSize()) {
483        mCache->copy(delta, data, size);
484
485        return size;
486    }
487
488    LOGV("deferring read");
489
490    return -EAGAIN;
491}
492
493status_t NuCachedSource2::seekInternal_l(off64_t offset) {
494    mLastAccessPos = offset;
495
496    if (offset >= mCacheOffset
497            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
498        return OK;
499    }
500
501    LOGI("new range: offset= %lld", offset);
502
503    mCacheOffset = offset;
504
505    size_t totalSize = mCache->totalSize();
506    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
507
508    mFinalStatus = OK;
509    mFetching = true;
510
511    return OK;
512}
513
514void NuCachedSource2::resumeFetchingIfNecessary() {
515    Mutex::Autolock autoLock(mLock);
516
517    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
518}
519
520sp<DecryptHandle> NuCachedSource2::DrmInitialization() {
521    return mSource->DrmInitialization();
522}
523
524void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
525    mSource->getDrmInfo(handle, client);
526}
527
528String8 NuCachedSource2::getUri() {
529    return mSource->getUri();
530}
531
532String8 NuCachedSource2::getMIMEType() const {
533    return mSource->getMIMEType();
534}
535
536}  // namespace android
537