NuCachedSource2.cpp revision a045cb0e77097120e86e367e1cab5494ce2a5d5e
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22#include "include/HTTPBase.h"
23
24#include <cutils/properties.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/MediaErrors.h>
28
29namespace android {
30
31struct PageCache {
32    PageCache(size_t pageSize);
33    ~PageCache();
34
35    struct Page {
36        void *mData;
37        size_t mSize;
38    };
39
40    Page *acquirePage();
41    void releasePage(Page *page);
42
43    void appendPage(Page *page);
44    size_t releaseFromStart(size_t maxBytes);
45
46    size_t totalSize() const {
47        return mTotalSize;
48    }
49
50    void copy(size_t from, void *data, size_t size);
51
52private:
53    size_t mPageSize;
54    size_t mTotalSize;
55
56    List<Page *> mActivePages;
57    List<Page *> mFreePages;
58
59    void freePages(List<Page *> *list);
60
61    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
62};
63
64PageCache::PageCache(size_t pageSize)
65    : mPageSize(pageSize),
66      mTotalSize(0) {
67}
68
69PageCache::~PageCache() {
70    freePages(&mActivePages);
71    freePages(&mFreePages);
72}
73
74void PageCache::freePages(List<Page *> *list) {
75    List<Page *>::iterator it = list->begin();
76    while (it != list->end()) {
77        Page *page = *it;
78
79        free(page->mData);
80        delete page;
81        page = NULL;
82
83        ++it;
84    }
85}
86
87PageCache::Page *PageCache::acquirePage() {
88    if (!mFreePages.empty()) {
89        List<Page *>::iterator it = mFreePages.begin();
90        Page *page = *it;
91        mFreePages.erase(it);
92
93        return page;
94    }
95
96    Page *page = new Page;
97    page->mData = malloc(mPageSize);
98    page->mSize = 0;
99
100    return page;
101}
102
103void PageCache::releasePage(Page *page) {
104    page->mSize = 0;
105    mFreePages.push_back(page);
106}
107
108void PageCache::appendPage(Page *page) {
109    mTotalSize += page->mSize;
110    mActivePages.push_back(page);
111}
112
113size_t PageCache::releaseFromStart(size_t maxBytes) {
114    size_t bytesReleased = 0;
115
116    while (maxBytes > 0 && !mActivePages.empty()) {
117        List<Page *>::iterator it = mActivePages.begin();
118
119        Page *page = *it;
120
121        if (maxBytes < page->mSize) {
122            break;
123        }
124
125        mActivePages.erase(it);
126
127        maxBytes -= page->mSize;
128        bytesReleased += page->mSize;
129
130        releasePage(page);
131    }
132
133    mTotalSize -= bytesReleased;
134    return bytesReleased;
135}
136
137void PageCache::copy(size_t from, void *data, size_t size) {
138    LOGV("copy from %d size %d", from, size);
139
140    if (size == 0) {
141        return;
142    }
143
144    CHECK_LE(from + size, mTotalSize);
145
146    size_t offset = 0;
147    List<Page *>::iterator it = mActivePages.begin();
148    while (from >= offset + (*it)->mSize) {
149        offset += (*it)->mSize;
150        ++it;
151    }
152
153    size_t delta = from - offset;
154    size_t avail = (*it)->mSize - delta;
155
156    if (avail >= size) {
157        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
158        return;
159    }
160
161    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
162    ++it;
163    data = (uint8_t *)data + avail;
164    size -= avail;
165
166    while (size > 0) {
167        size_t copy = (*it)->mSize;
168        if (copy > size) {
169            copy = size;
170        }
171        memcpy(data, (*it)->mData, copy);
172        data = (uint8_t *)data + copy;
173        size -= copy;
174        ++it;
175    }
176}
177
178////////////////////////////////////////////////////////////////////////////////
179
180NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
181    : mSource(source),
182      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
183      mLooper(new ALooper),
184      mCache(new PageCache(kPageSize)),
185      mCacheOffset(0),
186      mFinalStatus(OK),
187      mLastAccessPos(0),
188      mFetching(true),
189      mLastFetchTimeUs(-1),
190      mNumRetriesLeft(kMaxNumRetries),
191      mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
192      mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
193      mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs) {
194    updateCacheParamsFromSystemProperty();
195
196    mLooper->setName("NuCachedSource2");
197    mLooper->registerHandler(mReflector);
198    mLooper->start();
199
200    Mutex::Autolock autoLock(mLock);
201    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
202}
203
204NuCachedSource2::~NuCachedSource2() {
205    mLooper->stop();
206    mLooper->unregisterHandler(mReflector->id());
207
208    delete mCache;
209    mCache = NULL;
210}
211
212status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
213    if (mSource->flags() & kIsHTTPBasedSource) {
214        HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
215        return source->getEstimatedBandwidthKbps(kbps);
216    }
217    return ERROR_UNSUPPORTED;
218}
219
220status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
221    if (mSource->flags() & kIsHTTPBasedSource) {
222        HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
223        return source->setBandwidthStatCollectFreq(freqMs);
224    }
225    return ERROR_UNSUPPORTED;
226}
227
228status_t NuCachedSource2::initCheck() const {
229    return mSource->initCheck();
230}
231
232status_t NuCachedSource2::getSize(off64_t *size) {
233    return mSource->getSize(size);
234}
235
236uint32_t NuCachedSource2::flags() {
237    // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
238    uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
239    return (flags | kIsCachingDataSource);
240}
241
242void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
243    switch (msg->what()) {
244        case kWhatFetchMore:
245        {
246            onFetch();
247            break;
248        }
249
250        case kWhatRead:
251        {
252            onRead(msg);
253            break;
254        }
255
256        default:
257            TRESPASS();
258    }
259}
260
261void NuCachedSource2::fetchInternal() {
262    LOGV("fetchInternal");
263
264    {
265        Mutex::Autolock autoLock(mLock);
266        CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
267
268        if (mFinalStatus != OK) {
269            --mNumRetriesLeft;
270
271            status_t err =
272                mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
273
274            if (err == ERROR_UNSUPPORTED) {
275                mNumRetriesLeft = 0;
276                return;
277            } else if (err != OK) {
278                LOGI("The attempt to reconnect failed, %d retries remaining",
279                     mNumRetriesLeft);
280
281                return;
282            }
283        }
284    }
285
286    PageCache::Page *page = mCache->acquirePage();
287
288    ssize_t n = mSource->readAt(
289            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
290
291    Mutex::Autolock autoLock(mLock);
292
293    if (n < 0) {
294        LOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft);
295        mFinalStatus = n;
296        mCache->releasePage(page);
297    } else if (n == 0) {
298        LOGI("ERROR_END_OF_STREAM");
299
300        mNumRetriesLeft = 0;
301        mFinalStatus = ERROR_END_OF_STREAM;
302
303        mCache->releasePage(page);
304    } else {
305        if (mFinalStatus != OK) {
306            LOGI("retrying a previously failed read succeeded.");
307        }
308        mNumRetriesLeft = kMaxNumRetries;
309        mFinalStatus = OK;
310
311        page->mSize = n;
312        mCache->appendPage(page);
313    }
314}
315
316void NuCachedSource2::onFetch() {
317    LOGV("onFetch");
318
319    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
320        LOGV("EOS reached, done prefetching for now");
321        mFetching = false;
322    }
323
324    bool keepAlive =
325        !mFetching
326            && mFinalStatus == OK
327            && mKeepAliveIntervalUs > 0
328            && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
329
330    if (mFetching || keepAlive) {
331        if (keepAlive) {
332            LOGI("Keep alive");
333        }
334
335        fetchInternal();
336
337        mLastFetchTimeUs = ALooper::GetNowUs();
338
339        if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
340            LOGI("Cache full, done prefetching for now");
341            mFetching = false;
342        }
343    } else {
344        Mutex::Autolock autoLock(mLock);
345        restartPrefetcherIfNecessary_l();
346    }
347
348    int64_t delayUs;
349    if (mFetching) {
350        if (mFinalStatus != OK && mNumRetriesLeft > 0) {
351            // We failed this time and will try again in 3 seconds.
352            delayUs = 3000000ll;
353        } else {
354            delayUs = 0;
355        }
356    } else {
357        delayUs = 100000ll;
358    }
359
360    (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
361}
362
363void NuCachedSource2::onRead(const sp<AMessage> &msg) {
364    LOGV("onRead");
365
366    int64_t offset;
367    CHECK(msg->findInt64("offset", &offset));
368
369    void *data;
370    CHECK(msg->findPointer("data", &data));
371
372    size_t size;
373    CHECK(msg->findSize("size", &size));
374
375    ssize_t result = readInternal(offset, data, size);
376
377    if (result == -EAGAIN) {
378        msg->post(50000);
379        return;
380    }
381
382    Mutex::Autolock autoLock(mLock);
383
384    CHECK(mAsyncResult == NULL);
385
386    mAsyncResult = new AMessage;
387    mAsyncResult->setInt32("result", result);
388
389    mCondition.signal();
390}
391
392void NuCachedSource2::restartPrefetcherIfNecessary_l(
393        bool ignoreLowWaterThreshold, bool force) {
394    static const size_t kGrayArea = 1024 * 1024;
395
396    if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
397        return;
398    }
399
400    if (!ignoreLowWaterThreshold && !force
401            && mCacheOffset + mCache->totalSize() - mLastAccessPos
402                >= mLowwaterThresholdBytes) {
403        return;
404    }
405
406    size_t maxBytes = mLastAccessPos - mCacheOffset;
407
408    if (!force) {
409        if (maxBytes < kGrayArea) {
410            return;
411        }
412
413        maxBytes -= kGrayArea;
414    }
415
416    size_t actualBytes = mCache->releaseFromStart(maxBytes);
417    mCacheOffset += actualBytes;
418
419    LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
420    mFetching = true;
421}
422
423ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
424    Mutex::Autolock autoSerializer(mSerializer);
425
426    LOGV("readAt offset %lld, size %d", offset, size);
427
428    Mutex::Autolock autoLock(mLock);
429
430    // If the request can be completely satisfied from the cache, do so.
431
432    if (offset >= mCacheOffset
433            && offset + size <= mCacheOffset + mCache->totalSize()) {
434        size_t delta = offset - mCacheOffset;
435        mCache->copy(delta, data, size);
436
437        mLastAccessPos = offset + size;
438
439        return size;
440    }
441
442    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
443    msg->setInt64("offset", offset);
444    msg->setPointer("data", data);
445    msg->setSize("size", size);
446
447    CHECK(mAsyncResult == NULL);
448    msg->post();
449
450    while (mAsyncResult == NULL) {
451        mCondition.wait(mLock);
452    }
453
454    int32_t result;
455    CHECK(mAsyncResult->findInt32("result", &result));
456
457    mAsyncResult.clear();
458
459    if (result > 0) {
460        mLastAccessPos = offset + result;
461    }
462
463    return (ssize_t)result;
464}
465
466size_t NuCachedSource2::cachedSize() {
467    Mutex::Autolock autoLock(mLock);
468    return mCacheOffset + mCache->totalSize();
469}
470
471size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
472    Mutex::Autolock autoLock(mLock);
473    return approxDataRemaining_l(finalStatus);
474}
475
476size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
477    *finalStatus = mFinalStatus;
478
479    if (mFinalStatus != OK && mNumRetriesLeft > 0) {
480        // Pretend that everything is fine until we're out of retries.
481        *finalStatus = OK;
482    }
483
484    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
485    if (mLastAccessPos < lastBytePosCached) {
486        return lastBytePosCached - mLastAccessPos;
487    }
488    return 0;
489}
490
491ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
492    CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
493
494    LOGV("readInternal offset %lld size %d", offset, size);
495
496    Mutex::Autolock autoLock(mLock);
497
498    if (!mFetching) {
499        mLastAccessPos = offset;
500        restartPrefetcherIfNecessary_l(
501                false, // ignoreLowWaterThreshold
502                true); // force
503    }
504
505    if (offset < mCacheOffset
506            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
507        static const off64_t kPadding = 256 * 1024;
508
509        // In the presence of multiple decoded streams, once of them will
510        // trigger this seek request, the other one will request data "nearby"
511        // soon, adjust the seek position so that that subsequent request
512        // does not trigger another seek.
513        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
514
515        seekInternal_l(seekOffset);
516    }
517
518    size_t delta = offset - mCacheOffset;
519
520    if (mFinalStatus != OK) {
521        if (delta >= mCache->totalSize()) {
522            return mFinalStatus;
523        }
524
525        size_t avail = mCache->totalSize() - delta;
526
527        if (avail > size) {
528            avail = size;
529        }
530
531        mCache->copy(delta, data, avail);
532
533        return avail;
534    }
535
536    if (offset + size <= mCacheOffset + mCache->totalSize()) {
537        mCache->copy(delta, data, size);
538
539        return size;
540    }
541
542    LOGV("deferring read");
543
544    return -EAGAIN;
545}
546
547status_t NuCachedSource2::seekInternal_l(off64_t offset) {
548    mLastAccessPos = offset;
549
550    if (offset >= mCacheOffset
551            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
552        return OK;
553    }
554
555    LOGI("new range: offset= %lld", offset);
556
557    mCacheOffset = offset;
558
559    size_t totalSize = mCache->totalSize();
560    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
561
562    mFinalStatus = OK;
563    mFetching = true;
564
565    return OK;
566}
567
568void NuCachedSource2::resumeFetchingIfNecessary() {
569    Mutex::Autolock autoLock(mLock);
570
571    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
572}
573
574sp<DecryptHandle> NuCachedSource2::DrmInitialization() {
575    return mSource->DrmInitialization();
576}
577
578void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
579    mSource->getDrmInfo(handle, client);
580}
581
582String8 NuCachedSource2::getUri() {
583    return mSource->getUri();
584}
585
586String8 NuCachedSource2::getMIMEType() const {
587    return mSource->getMIMEType();
588}
589
590void NuCachedSource2::updateCacheParamsFromSystemProperty() {
591    char value[PROPERTY_VALUE_MAX];
592    if (!property_get("media.stagefright.cache-params", value, NULL)) {
593        return;
594    }
595
596    updateCacheParamsFromString(value);
597}
598
599void NuCachedSource2::updateCacheParamsFromString(const char *s) {
600    ssize_t lowwaterMarkKb, highwaterMarkKb;
601    unsigned keepAliveSecs;
602
603    if (sscanf(s, "%ld/%ld/%u",
604               &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3
605        || lowwaterMarkKb >= highwaterMarkKb) {
606        LOGE("Failed to parse cache parameters from '%s'.", s);
607        return;
608    }
609
610    if (lowwaterMarkKb >= 0) {
611        mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
612    } else {
613        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
614    }
615
616    if (highwaterMarkKb >= 0) {
617        mHighwaterThresholdBytes = highwaterMarkKb * 1024;
618    } else {
619        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
620    }
621
622    mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
623
624    LOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us",
625         mLowwaterThresholdBytes,
626         mHighwaterThresholdBytes,
627         mKeepAliveIntervalUs);
628}
629
630}  // namespace android
631