NuCachedSource2.cpp revision 5b1b8a93a07326f1cbc627f09e02988375189e0a
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22#include "include/HTTPBase.h"
23
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
26#include <media/stagefright/MediaErrors.h>
27
28namespace android {
29
30struct PageCache {
31    PageCache(size_t pageSize);
32    ~PageCache();
33
34    struct Page {
35        void *mData;
36        size_t mSize;
37    };
38
39    Page *acquirePage();
40    void releasePage(Page *page);
41
42    void appendPage(Page *page);
43    size_t releaseFromStart(size_t maxBytes);
44
45    size_t totalSize() const {
46        return mTotalSize;
47    }
48
49    void copy(size_t from, void *data, size_t size);
50
51private:
52    size_t mPageSize;
53    size_t mTotalSize;
54
55    List<Page *> mActivePages;
56    List<Page *> mFreePages;
57
58    void freePages(List<Page *> *list);
59
60    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
61};
62
63PageCache::PageCache(size_t pageSize)
64    : mPageSize(pageSize),
65      mTotalSize(0) {
66}
67
68PageCache::~PageCache() {
69    freePages(&mActivePages);
70    freePages(&mFreePages);
71}
72
73void PageCache::freePages(List<Page *> *list) {
74    List<Page *>::iterator it = list->begin();
75    while (it != list->end()) {
76        Page *page = *it;
77
78        free(page->mData);
79        delete page;
80        page = NULL;
81
82        ++it;
83    }
84}
85
86PageCache::Page *PageCache::acquirePage() {
87    if (!mFreePages.empty()) {
88        List<Page *>::iterator it = mFreePages.begin();
89        Page *page = *it;
90        mFreePages.erase(it);
91
92        return page;
93    }
94
95    Page *page = new Page;
96    page->mData = malloc(mPageSize);
97    page->mSize = 0;
98
99    return page;
100}
101
102void PageCache::releasePage(Page *page) {
103    page->mSize = 0;
104    mFreePages.push_back(page);
105}
106
107void PageCache::appendPage(Page *page) {
108    mTotalSize += page->mSize;
109    mActivePages.push_back(page);
110}
111
112size_t PageCache::releaseFromStart(size_t maxBytes) {
113    size_t bytesReleased = 0;
114
115    while (maxBytes > 0 && !mActivePages.empty()) {
116        List<Page *>::iterator it = mActivePages.begin();
117
118        Page *page = *it;
119
120        if (maxBytes < page->mSize) {
121            break;
122        }
123
124        mActivePages.erase(it);
125
126        maxBytes -= page->mSize;
127        bytesReleased += page->mSize;
128
129        releasePage(page);
130    }
131
132    mTotalSize -= bytesReleased;
133    return bytesReleased;
134}
135
136void PageCache::copy(size_t from, void *data, size_t size) {
137    LOGV("copy from %d size %d", from, size);
138
139    if (size == 0) {
140        return;
141    }
142
143    CHECK_LE(from + size, mTotalSize);
144
145    size_t offset = 0;
146    List<Page *>::iterator it = mActivePages.begin();
147    while (from >= offset + (*it)->mSize) {
148        offset += (*it)->mSize;
149        ++it;
150    }
151
152    size_t delta = from - offset;
153    size_t avail = (*it)->mSize - delta;
154
155    if (avail >= size) {
156        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
157        return;
158    }
159
160    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
161    ++it;
162    data = (uint8_t *)data + avail;
163    size -= avail;
164
165    while (size > 0) {
166        size_t copy = (*it)->mSize;
167        if (copy > size) {
168            copy = size;
169        }
170        memcpy(data, (*it)->mData, copy);
171        data = (uint8_t *)data + copy;
172        size -= copy;
173        ++it;
174    }
175}
176
177////////////////////////////////////////////////////////////////////////////////
178
179NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
180    : mSource(source),
181      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
182      mLooper(new ALooper),
183      mCache(new PageCache(kPageSize)),
184      mCacheOffset(0),
185      mFinalStatus(OK),
186      mLastAccessPos(0),
187      mFetching(true),
188      mLastFetchTimeUs(-1) {
189    mLooper->setName("NuCachedSource2");
190    mLooper->registerHandler(mReflector);
191    mLooper->start();
192
193    Mutex::Autolock autoLock(mLock);
194    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
195}
196
197NuCachedSource2::~NuCachedSource2() {
198    mLooper->stop();
199    mLooper->unregisterHandler(mReflector->id());
200
201    delete mCache;
202    mCache = NULL;
203}
204
205status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
206    HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
207    return source->getEstimatedBandwidthKbps(kbps);
208}
209
210status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
211    HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
212    return source->setBandwidthStatCollectFreq(freqMs);
213}
214
215status_t NuCachedSource2::initCheck() const {
216    return mSource->initCheck();
217}
218
219status_t NuCachedSource2::getSize(off64_t *size) {
220    return mSource->getSize(size);
221}
222
223uint32_t NuCachedSource2::flags() {
224    return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
225}
226
227void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
228    switch (msg->what()) {
229        case kWhatFetchMore:
230        {
231            onFetch();
232            break;
233        }
234
235        case kWhatRead:
236        {
237            onRead(msg);
238            break;
239        }
240
241        default:
242            TRESPASS();
243    }
244}
245
246void NuCachedSource2::fetchInternal() {
247    LOGV("fetchInternal");
248
249    CHECK_EQ(mFinalStatus, (status_t)OK);
250
251    PageCache::Page *page = mCache->acquirePage();
252
253    ssize_t n = mSource->readAt(
254            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
255
256    Mutex::Autolock autoLock(mLock);
257
258    if (n < 0) {
259        LOGE("source returned error %ld", n);
260        mFinalStatus = n;
261        mCache->releasePage(page);
262    } else if (n == 0) {
263        LOGI("ERROR_END_OF_STREAM");
264        mFinalStatus = ERROR_END_OF_STREAM;
265        mCache->releasePage(page);
266    } else {
267        page->mSize = n;
268        mCache->appendPage(page);
269    }
270}
271
272void NuCachedSource2::onFetch() {
273    LOGV("onFetch");
274
275    if (mFinalStatus != OK) {
276        LOGV("EOS reached, done prefetching for now");
277        mFetching = false;
278    }
279
280    bool keepAlive =
281        !mFetching
282            && mFinalStatus == OK
283            && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
284
285    if (mFetching || keepAlive) {
286        if (keepAlive) {
287            LOGI("Keep alive");
288        }
289
290        fetchInternal();
291
292        mLastFetchTimeUs = ALooper::GetNowUs();
293
294        if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
295            LOGI("Cache full, done prefetching for now");
296            mFetching = false;
297        }
298    } else {
299        Mutex::Autolock autoLock(mLock);
300        restartPrefetcherIfNecessary_l();
301    }
302
303    (new AMessage(kWhatFetchMore, mReflector->id()))->post(
304            mFetching ? 0 : 100000ll);
305}
306
307void NuCachedSource2::onRead(const sp<AMessage> &msg) {
308    LOGV("onRead");
309
310    int64_t offset;
311    CHECK(msg->findInt64("offset", &offset));
312
313    void *data;
314    CHECK(msg->findPointer("data", &data));
315
316    size_t size;
317    CHECK(msg->findSize("size", &size));
318
319    ssize_t result = readInternal(offset, data, size);
320
321    if (result == -EAGAIN) {
322        msg->post(50000);
323        return;
324    }
325
326    Mutex::Autolock autoLock(mLock);
327
328    CHECK(mAsyncResult == NULL);
329
330    mAsyncResult = new AMessage;
331    mAsyncResult->setInt32("result", result);
332
333    mCondition.signal();
334}
335
336void NuCachedSource2::restartPrefetcherIfNecessary_l(
337        bool ignoreLowWaterThreshold, bool force) {
338    static const size_t kGrayArea = 1024 * 1024;
339
340    if (mFetching || mFinalStatus != OK) {
341        return;
342    }
343
344    if (!ignoreLowWaterThreshold && !force
345            && mCacheOffset + mCache->totalSize() - mLastAccessPos
346                >= kLowWaterThreshold) {
347        return;
348    }
349
350    size_t maxBytes = mLastAccessPos - mCacheOffset;
351
352    if (!force) {
353        if (maxBytes < kGrayArea) {
354            return;
355        }
356
357        maxBytes -= kGrayArea;
358    }
359
360    size_t actualBytes = mCache->releaseFromStart(maxBytes);
361    mCacheOffset += actualBytes;
362
363    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
364    mFetching = true;
365}
366
367ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
368    Mutex::Autolock autoSerializer(mSerializer);
369
370    LOGV("readAt offset %lld, size %d", offset, size);
371
372    Mutex::Autolock autoLock(mLock);
373
374    // If the request can be completely satisfied from the cache, do so.
375
376    if (offset >= mCacheOffset
377            && offset + size <= mCacheOffset + mCache->totalSize()) {
378        size_t delta = offset - mCacheOffset;
379        mCache->copy(delta, data, size);
380
381        mLastAccessPos = offset + size;
382
383        return size;
384    }
385
386    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
387    msg->setInt64("offset", offset);
388    msg->setPointer("data", data);
389    msg->setSize("size", size);
390
391    CHECK(mAsyncResult == NULL);
392    msg->post();
393
394    while (mAsyncResult == NULL) {
395        mCondition.wait(mLock);
396    }
397
398    int32_t result;
399    CHECK(mAsyncResult->findInt32("result", &result));
400
401    mAsyncResult.clear();
402
403    if (result > 0) {
404        mLastAccessPos = offset + result;
405    }
406
407    return (ssize_t)result;
408}
409
410size_t NuCachedSource2::cachedSize() {
411    Mutex::Autolock autoLock(mLock);
412    return mCacheOffset + mCache->totalSize();
413}
414
415size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
416    Mutex::Autolock autoLock(mLock);
417    return approxDataRemaining_l(finalStatus);
418}
419
420size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
421    *finalStatus = mFinalStatus;
422    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
423    if (mLastAccessPos < lastBytePosCached) {
424        return lastBytePosCached - mLastAccessPos;
425    }
426    return 0;
427}
428
429ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
430    CHECK_LE(size, (size_t)kHighWaterThreshold);
431
432    LOGV("readInternal offset %lld size %d", offset, size);
433
434    Mutex::Autolock autoLock(mLock);
435
436    if (!mFetching) {
437        mLastAccessPos = offset;
438        restartPrefetcherIfNecessary_l(
439                false, // ignoreLowWaterThreshold
440                true); // force
441    }
442
443    if (offset < mCacheOffset
444            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
445        static const off64_t kPadding = 256 * 1024;
446
447        // In the presence of multiple decoded streams, once of them will
448        // trigger this seek request, the other one will request data "nearby"
449        // soon, adjust the seek position so that that subsequent request
450        // does not trigger another seek.
451        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
452
453        seekInternal_l(seekOffset);
454    }
455
456    size_t delta = offset - mCacheOffset;
457
458    if (mFinalStatus != OK) {
459        if (delta >= mCache->totalSize()) {
460            return mFinalStatus;
461        }
462
463        size_t avail = mCache->totalSize() - delta;
464
465        if (avail > size) {
466            avail = size;
467        }
468
469        mCache->copy(delta, data, avail);
470
471        return avail;
472    }
473
474    if (offset + size <= mCacheOffset + mCache->totalSize()) {
475        mCache->copy(delta, data, size);
476
477        return size;
478    }
479
480    LOGV("deferring read");
481
482    return -EAGAIN;
483}
484
485status_t NuCachedSource2::seekInternal_l(off64_t offset) {
486    mLastAccessPos = offset;
487
488    if (offset >= mCacheOffset
489            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
490        return OK;
491    }
492
493    LOGI("new range: offset= %lld", offset);
494
495    mCacheOffset = offset;
496
497    size_t totalSize = mCache->totalSize();
498    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
499
500    mFinalStatus = OK;
501    mFetching = true;
502
503    return OK;
504}
505
506void NuCachedSource2::resumeFetchingIfNecessary() {
507    Mutex::Autolock autoLock(mLock);
508
509    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
510}
511
512sp<DecryptHandle> NuCachedSource2::DrmInitialization() {
513    return mSource->DrmInitialization();
514}
515
516void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
517    mSource->getDrmInfo(handle, client);
518}
519
520String8 NuCachedSource2::getUri() {
521    return mSource->getUri();
522}
523
524String8 NuCachedSource2::getMIMEType() const {
525    return mSource->getMIMEType();
526}
527
528}  // namespace android
529