NuCachedSource2.cpp revision 1b86fe063badb5f28c467ade39be0f4008688947
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NuCachedSource2"
19#include <utils/Log.h>
20
21#include "include/NuCachedSource2.h"
22#include "include/HTTPBase.h"
23
24#include <cutils/properties.h>
25#include <media/stagefright/foundation/ADebug.h>
26#include <media/stagefright/foundation/AMessage.h>
27#include <media/stagefright/MediaErrors.h>
28
29namespace android {
30
31struct PageCache {
32    PageCache(size_t pageSize);
33    ~PageCache();
34
35    struct Page {
36        void *mData;
37        size_t mSize;
38    };
39
40    Page *acquirePage();
41    void releasePage(Page *page);
42
43    void appendPage(Page *page);
44    size_t releaseFromStart(size_t maxBytes);
45
46    size_t totalSize() const {
47        return mTotalSize;
48    }
49
50    void copy(size_t from, void *data, size_t size);
51
52private:
53    size_t mPageSize;
54    size_t mTotalSize;
55
56    List<Page *> mActivePages;
57    List<Page *> mFreePages;
58
59    void freePages(List<Page *> *list);
60
61    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
62};
63
64PageCache::PageCache(size_t pageSize)
65    : mPageSize(pageSize),
66      mTotalSize(0) {
67}
68
69PageCache::~PageCache() {
70    freePages(&mActivePages);
71    freePages(&mFreePages);
72}
73
74void PageCache::freePages(List<Page *> *list) {
75    List<Page *>::iterator it = list->begin();
76    while (it != list->end()) {
77        Page *page = *it;
78
79        free(page->mData);
80        delete page;
81        page = NULL;
82
83        ++it;
84    }
85}
86
87PageCache::Page *PageCache::acquirePage() {
88    if (!mFreePages.empty()) {
89        List<Page *>::iterator it = mFreePages.begin();
90        Page *page = *it;
91        mFreePages.erase(it);
92
93        return page;
94    }
95
96    Page *page = new Page;
97    page->mData = malloc(mPageSize);
98    page->mSize = 0;
99
100    return page;
101}
102
103void PageCache::releasePage(Page *page) {
104    page->mSize = 0;
105    mFreePages.push_back(page);
106}
107
108void PageCache::appendPage(Page *page) {
109    mTotalSize += page->mSize;
110    mActivePages.push_back(page);
111}
112
113size_t PageCache::releaseFromStart(size_t maxBytes) {
114    size_t bytesReleased = 0;
115
116    while (maxBytes > 0 && !mActivePages.empty()) {
117        List<Page *>::iterator it = mActivePages.begin();
118
119        Page *page = *it;
120
121        if (maxBytes < page->mSize) {
122            break;
123        }
124
125        mActivePages.erase(it);
126
127        maxBytes -= page->mSize;
128        bytesReleased += page->mSize;
129
130        releasePage(page);
131    }
132
133    mTotalSize -= bytesReleased;
134    return bytesReleased;
135}
136
137void PageCache::copy(size_t from, void *data, size_t size) {
138    ALOGV("copy from %d size %d", from, size);
139
140    if (size == 0) {
141        return;
142    }
143
144    CHECK_LE(from + size, mTotalSize);
145
146    size_t offset = 0;
147    List<Page *>::iterator it = mActivePages.begin();
148    while (from >= offset + (*it)->mSize) {
149        offset += (*it)->mSize;
150        ++it;
151    }
152
153    size_t delta = from - offset;
154    size_t avail = (*it)->mSize - delta;
155
156    if (avail >= size) {
157        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
158        return;
159    }
160
161    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
162    ++it;
163    data = (uint8_t *)data + avail;
164    size -= avail;
165
166    while (size > 0) {
167        size_t copy = (*it)->mSize;
168        if (copy > size) {
169            copy = size;
170        }
171        memcpy(data, (*it)->mData, copy);
172        data = (uint8_t *)data + copy;
173        size -= copy;
174        ++it;
175    }
176}
177
178////////////////////////////////////////////////////////////////////////////////
179
180NuCachedSource2::NuCachedSource2(
181        const sp<DataSource> &source,
182        const char *cacheConfig,
183        bool disconnectAtHighwatermark)
184    : mSource(source),
185      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
186      mLooper(new ALooper),
187      mCache(new PageCache(kPageSize)),
188      mCacheOffset(0),
189      mFinalStatus(OK),
190      mLastAccessPos(0),
191      mFetching(true),
192      mLastFetchTimeUs(-1),
193      mNumRetriesLeft(kMaxNumRetries),
194      mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
195      mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
196      mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
197      mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
198    // We are NOT going to support disconnect-at-highwatermark indefinitely
199    // and we are not guaranteeing support for client-specified cache
200    // parameters. Both of these are temporary measures to solve a specific
201    // problem that will be solved in a better way going forward.
202
203    updateCacheParamsFromSystemProperty();
204
205    if (cacheConfig != NULL) {
206        updateCacheParamsFromString(cacheConfig);
207    }
208
209    if (mDisconnectAtHighwatermark) {
210        // Makes no sense to disconnect and do keep-alives...
211        mKeepAliveIntervalUs = 0;
212    }
213
214    mLooper->setName("NuCachedSource2");
215    mLooper->registerHandler(mReflector);
216
217    // Since it may not be obvious why our looper thread needs to be
218    // able to call into java since it doesn't appear to do so at all...
219    // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
220    // and a local JAVA IBinder will call directly into JNI methods.
221    // So whenever we call DataSource::readAt it may end up in a call to
222    // IMediaHTTPConnection::readAt and therefore call back into JAVA.
223    mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
224
225    Mutex::Autolock autoLock(mLock);
226    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
227}
228
229NuCachedSource2::~NuCachedSource2() {
230    mLooper->stop();
231    mLooper->unregisterHandler(mReflector->id());
232
233    delete mCache;
234    mCache = NULL;
235}
236
237status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
238    if (mSource->flags() & kIsHTTPBasedSource) {
239        HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
240        return source->getEstimatedBandwidthKbps(kbps);
241    }
242    return ERROR_UNSUPPORTED;
243}
244
245status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
246    if (mSource->flags() & kIsHTTPBasedSource) {
247        HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
248        return source->setBandwidthStatCollectFreq(freqMs);
249    }
250    return ERROR_UNSUPPORTED;
251}
252
253status_t NuCachedSource2::initCheck() const {
254    return mSource->initCheck();
255}
256
257status_t NuCachedSource2::getSize(off64_t *size) {
258    return mSource->getSize(size);
259}
260
261uint32_t NuCachedSource2::flags() {
262    // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
263    uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
264    return (flags | kIsCachingDataSource);
265}
266
267void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
268    switch (msg->what()) {
269        case kWhatFetchMore:
270        {
271            onFetch();
272            break;
273        }
274
275        case kWhatRead:
276        {
277            onRead(msg);
278            break;
279        }
280
281        default:
282            TRESPASS();
283    }
284}
285
286void NuCachedSource2::fetchInternal() {
287    ALOGV("fetchInternal");
288
289    bool reconnect = false;
290
291    {
292        Mutex::Autolock autoLock(mLock);
293        CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
294
295        if (mFinalStatus != OK) {
296            --mNumRetriesLeft;
297
298            reconnect = true;
299        }
300    }
301
302    if (reconnect) {
303        status_t err =
304            mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
305
306        Mutex::Autolock autoLock(mLock);
307
308        if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
309            // These are errors that are not likely to go away even if we
310            // retry, i.e. the server doesn't support range requests or similar.
311            mNumRetriesLeft = 0;
312            return;
313        } else if (err != OK) {
314            ALOGI("The attempt to reconnect failed, %d retries remaining",
315                 mNumRetriesLeft);
316
317            return;
318        }
319    }
320
321    PageCache::Page *page = mCache->acquirePage();
322
323    ssize_t n = mSource->readAt(
324            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
325
326    Mutex::Autolock autoLock(mLock);
327
328    if (n < 0) {
329        mFinalStatus = n;
330        if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
331            // These are errors that are not likely to go away even if we
332            // retry, i.e. the server doesn't support range requests or similar.
333            mNumRetriesLeft = 0;
334        }
335
336        ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft);
337        mCache->releasePage(page);
338    } else if (n == 0) {
339        ALOGI("ERROR_END_OF_STREAM");
340
341        mNumRetriesLeft = 0;
342        mFinalStatus = ERROR_END_OF_STREAM;
343
344        mCache->releasePage(page);
345    } else {
346        if (mFinalStatus != OK) {
347            ALOGI("retrying a previously failed read succeeded.");
348        }
349        mNumRetriesLeft = kMaxNumRetries;
350        mFinalStatus = OK;
351
352        page->mSize = n;
353        mCache->appendPage(page);
354    }
355}
356
357void NuCachedSource2::onFetch() {
358    ALOGV("onFetch");
359
360    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
361        ALOGV("EOS reached, done prefetching for now");
362        mFetching = false;
363    }
364
365    bool keepAlive =
366        !mFetching
367            && mFinalStatus == OK
368            && mKeepAliveIntervalUs > 0
369            && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
370
371    if (mFetching || keepAlive) {
372        if (keepAlive) {
373            ALOGI("Keep alive");
374        }
375
376        fetchInternal();
377
378        mLastFetchTimeUs = ALooper::GetNowUs();
379
380        if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
381            ALOGI("Cache full, done prefetching for now");
382            mFetching = false;
383
384            if (mDisconnectAtHighwatermark
385                    && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
386                ALOGV("Disconnecting at high watermark");
387                static_cast<HTTPBase *>(mSource.get())->disconnect();
388                mFinalStatus = -EAGAIN;
389            }
390        }
391    } else {
392        Mutex::Autolock autoLock(mLock);
393        restartPrefetcherIfNecessary_l();
394    }
395
396    int64_t delayUs;
397    if (mFetching) {
398        if (mFinalStatus != OK && mNumRetriesLeft > 0) {
399            // We failed this time and will try again in 3 seconds.
400            delayUs = 3000000ll;
401        } else {
402            delayUs = 0;
403        }
404    } else {
405        delayUs = 100000ll;
406    }
407
408    (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
409}
410
411void NuCachedSource2::onRead(const sp<AMessage> &msg) {
412    ALOGV("onRead");
413
414    int64_t offset;
415    CHECK(msg->findInt64("offset", &offset));
416
417    void *data;
418    CHECK(msg->findPointer("data", &data));
419
420    size_t size;
421    CHECK(msg->findSize("size", &size));
422
423    ssize_t result = readInternal(offset, data, size);
424
425    if (result == -EAGAIN) {
426        msg->post(50000);
427        return;
428    }
429
430    Mutex::Autolock autoLock(mLock);
431
432    CHECK(mAsyncResult == NULL);
433
434    mAsyncResult = new AMessage;
435    mAsyncResult->setInt32("result", result);
436
437    mCondition.signal();
438}
439
440void NuCachedSource2::restartPrefetcherIfNecessary_l(
441        bool ignoreLowWaterThreshold, bool force) {
442    static const size_t kGrayArea = 1024 * 1024;
443
444    if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
445        return;
446    }
447
448    if (!ignoreLowWaterThreshold && !force
449            && mCacheOffset + mCache->totalSize() - mLastAccessPos
450                >= mLowwaterThresholdBytes) {
451        return;
452    }
453
454    size_t maxBytes = mLastAccessPos - mCacheOffset;
455
456    if (!force) {
457        if (maxBytes < kGrayArea) {
458            return;
459        }
460
461        maxBytes -= kGrayArea;
462    }
463
464    size_t actualBytes = mCache->releaseFromStart(maxBytes);
465    mCacheOffset += actualBytes;
466
467    ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
468    mFetching = true;
469}
470
471ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
472    Mutex::Autolock autoSerializer(mSerializer);
473
474    ALOGV("readAt offset %lld, size %d", offset, size);
475
476    Mutex::Autolock autoLock(mLock);
477
478    // If the request can be completely satisfied from the cache, do so.
479
480    if (offset >= mCacheOffset
481            && offset + size <= mCacheOffset + mCache->totalSize()) {
482        size_t delta = offset - mCacheOffset;
483        mCache->copy(delta, data, size);
484
485        mLastAccessPos = offset + size;
486
487        return size;
488    }
489
490    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
491    msg->setInt64("offset", offset);
492    msg->setPointer("data", data);
493    msg->setSize("size", size);
494
495    CHECK(mAsyncResult == NULL);
496    msg->post();
497
498    while (mAsyncResult == NULL) {
499        mCondition.wait(mLock);
500    }
501
502    int32_t result;
503    CHECK(mAsyncResult->findInt32("result", &result));
504
505    mAsyncResult.clear();
506
507    if (result > 0) {
508        mLastAccessPos = offset + result;
509    }
510
511    return (ssize_t)result;
512}
513
514size_t NuCachedSource2::cachedSize() {
515    Mutex::Autolock autoLock(mLock);
516    return mCacheOffset + mCache->totalSize();
517}
518
519size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
520    Mutex::Autolock autoLock(mLock);
521    return approxDataRemaining_l(finalStatus);
522}
523
524size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
525    *finalStatus = mFinalStatus;
526
527    if (mFinalStatus != OK && mNumRetriesLeft > 0) {
528        // Pretend that everything is fine until we're out of retries.
529        *finalStatus = OK;
530    }
531
532    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
533    if (mLastAccessPos < lastBytePosCached) {
534        return lastBytePosCached - mLastAccessPos;
535    }
536    return 0;
537}
538
539ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
540    CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
541
542    ALOGV("readInternal offset %lld size %d", offset, size);
543
544    Mutex::Autolock autoLock(mLock);
545
546    if (!mFetching) {
547        mLastAccessPos = offset;
548        restartPrefetcherIfNecessary_l(
549                false, // ignoreLowWaterThreshold
550                true); // force
551    }
552
553    if (offset < mCacheOffset
554            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
555        static const off64_t kPadding = 256 * 1024;
556
557        // In the presence of multiple decoded streams, once of them will
558        // trigger this seek request, the other one will request data "nearby"
559        // soon, adjust the seek position so that that subsequent request
560        // does not trigger another seek.
561        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
562
563        seekInternal_l(seekOffset);
564    }
565
566    size_t delta = offset - mCacheOffset;
567
568    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
569        if (delta >= mCache->totalSize()) {
570            return mFinalStatus;
571        }
572
573        size_t avail = mCache->totalSize() - delta;
574
575        if (avail > size) {
576            avail = size;
577        }
578
579        mCache->copy(delta, data, avail);
580
581        return avail;
582    }
583
584    if (offset + size <= mCacheOffset + mCache->totalSize()) {
585        mCache->copy(delta, data, size);
586
587        return size;
588    }
589
590    ALOGV("deferring read");
591
592    return -EAGAIN;
593}
594
595status_t NuCachedSource2::seekInternal_l(off64_t offset) {
596    mLastAccessPos = offset;
597
598    if (offset >= mCacheOffset
599            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
600        return OK;
601    }
602
603    ALOGI("new range: offset= %lld", offset);
604
605    mCacheOffset = offset;
606
607    size_t totalSize = mCache->totalSize();
608    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
609
610    mNumRetriesLeft = kMaxNumRetries;
611    mFetching = true;
612
613    return OK;
614}
615
616void NuCachedSource2::resumeFetchingIfNecessary() {
617    Mutex::Autolock autoLock(mLock);
618
619    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
620}
621
622sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
623    return mSource->DrmInitialization(mime);
624}
625
626void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
627    mSource->getDrmInfo(handle, client);
628}
629
630String8 NuCachedSource2::getUri() {
631    return mSource->getUri();
632}
633
634String8 NuCachedSource2::getMIMEType() const {
635    return mSource->getMIMEType();
636}
637
638void NuCachedSource2::updateCacheParamsFromSystemProperty() {
639    char value[PROPERTY_VALUE_MAX];
640    if (!property_get("media.stagefright.cache-params", value, NULL)) {
641        return;
642    }
643
644    updateCacheParamsFromString(value);
645}
646
647void NuCachedSource2::updateCacheParamsFromString(const char *s) {
648    ssize_t lowwaterMarkKb, highwaterMarkKb;
649    int keepAliveSecs;
650
651    if (sscanf(s, "%ld/%ld/%d",
652               &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
653        ALOGE("Failed to parse cache parameters from '%s'.", s);
654        return;
655    }
656
657    if (lowwaterMarkKb >= 0) {
658        mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
659    } else {
660        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
661    }
662
663    if (highwaterMarkKb >= 0) {
664        mHighwaterThresholdBytes = highwaterMarkKb * 1024;
665    } else {
666        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
667    }
668
669    if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
670        ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
671
672        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
673        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
674    }
675
676    if (keepAliveSecs >= 0) {
677        mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
678    } else {
679        mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
680    }
681
682    ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us",
683         mLowwaterThresholdBytes,
684         mHighwaterThresholdBytes,
685         mKeepAliveIntervalUs);
686}
687
688// static
689void NuCachedSource2::RemoveCacheSpecificHeaders(
690        KeyedVector<String8, String8> *headers,
691        String8 *cacheConfig,
692        bool *disconnectAtHighwatermark) {
693    *cacheConfig = String8();
694    *disconnectAtHighwatermark = false;
695
696    if (headers == NULL) {
697        return;
698    }
699
700    ssize_t index;
701    if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
702        *cacheConfig = headers->valueAt(index);
703
704        headers->removeItemsAt(index);
705
706        ALOGV("Using special cache config '%s'", cacheConfig->string());
707    }
708
709    if ((index = headers->indexOfKey(
710                    String8("x-disconnect-at-highwatermark"))) >= 0) {
711        *disconnectAtHighwatermark = true;
712        headers->removeItemsAt(index);
713
714        ALOGV("Client requested disconnection at highwater mark");
715    }
716}
717
718}  // namespace android
719