NuCachedSource2.cpp revision 48296b792a8d68358de74141fa80bd5bd84d0307
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <inttypes.h>
18
19//#define LOG_NDEBUG 0
20#define LOG_TAG "NuCachedSource2"
21#include <utils/Log.h>
22
23#include "include/NuCachedSource2.h"
24#include "include/HTTPBase.h"
25
26#include <cutils/properties.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/AMessage.h>
29#include <media/stagefright/MediaErrors.h>
30
31namespace android {
32
33struct PageCache {
34    PageCache(size_t pageSize);
35    ~PageCache();
36
37    struct Page {
38        void *mData;
39        size_t mSize;
40    };
41
42    Page *acquirePage();
43    void releasePage(Page *page);
44
45    void appendPage(Page *page);
46    size_t releaseFromStart(size_t maxBytes);
47
48    size_t totalSize() const {
49        return mTotalSize;
50    }
51
52    void copy(size_t from, void *data, size_t size);
53
54private:
55    size_t mPageSize;
56    size_t mTotalSize;
57
58    List<Page *> mActivePages;
59    List<Page *> mFreePages;
60
61    void freePages(List<Page *> *list);
62
63    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
64};
65
66PageCache::PageCache(size_t pageSize)
67    : mPageSize(pageSize),
68      mTotalSize(0) {
69}
70
71PageCache::~PageCache() {
72    freePages(&mActivePages);
73    freePages(&mFreePages);
74}
75
76void PageCache::freePages(List<Page *> *list) {
77    List<Page *>::iterator it = list->begin();
78    while (it != list->end()) {
79        Page *page = *it;
80
81        free(page->mData);
82        delete page;
83        page = NULL;
84
85        ++it;
86    }
87}
88
89PageCache::Page *PageCache::acquirePage() {
90    if (!mFreePages.empty()) {
91        List<Page *>::iterator it = mFreePages.begin();
92        Page *page = *it;
93        mFreePages.erase(it);
94
95        return page;
96    }
97
98    Page *page = new Page;
99    page->mData = malloc(mPageSize);
100    page->mSize = 0;
101
102    return page;
103}
104
105void PageCache::releasePage(Page *page) {
106    page->mSize = 0;
107    mFreePages.push_back(page);
108}
109
110void PageCache::appendPage(Page *page) {
111    mTotalSize += page->mSize;
112    mActivePages.push_back(page);
113}
114
115size_t PageCache::releaseFromStart(size_t maxBytes) {
116    size_t bytesReleased = 0;
117
118    while (maxBytes > 0 && !mActivePages.empty()) {
119        List<Page *>::iterator it = mActivePages.begin();
120
121        Page *page = *it;
122
123        if (maxBytes < page->mSize) {
124            break;
125        }
126
127        mActivePages.erase(it);
128
129        maxBytes -= page->mSize;
130        bytesReleased += page->mSize;
131
132        releasePage(page);
133    }
134
135    mTotalSize -= bytesReleased;
136    return bytesReleased;
137}
138
139void PageCache::copy(size_t from, void *data, size_t size) {
140    ALOGV("copy from %zu size %zu", from, size);
141
142    if (size == 0) {
143        return;
144    }
145
146    CHECK_LE(from + size, mTotalSize);
147
148    size_t offset = 0;
149    List<Page *>::iterator it = mActivePages.begin();
150    while (from >= offset + (*it)->mSize) {
151        offset += (*it)->mSize;
152        ++it;
153    }
154
155    size_t delta = from - offset;
156    size_t avail = (*it)->mSize - delta;
157
158    if (avail >= size) {
159        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
160        return;
161    }
162
163    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
164    ++it;
165    data = (uint8_t *)data + avail;
166    size -= avail;
167
168    while (size > 0) {
169        size_t copy = (*it)->mSize;
170        if (copy > size) {
171            copy = size;
172        }
173        memcpy(data, (*it)->mData, copy);
174        data = (uint8_t *)data + copy;
175        size -= copy;
176        ++it;
177    }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181
182NuCachedSource2::NuCachedSource2(
183        const sp<DataSource> &source,
184        const char *cacheConfig,
185        bool disconnectAtHighwatermark)
186    : mSource(source),
187      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
188      mLooper(new ALooper),
189      mCache(new PageCache(kPageSize)),
190      mCacheOffset(0),
191      mFinalStatus(OK),
192      mLastAccessPos(0),
193      mFetching(true),
194      mDisconnecting(false),
195      mLastFetchTimeUs(-1),
196      mNumRetriesLeft(kMaxNumRetries),
197      mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
198      mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
199      mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
200      mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
201    // We are NOT going to support disconnect-at-highwatermark indefinitely
202    // and we are not guaranteeing support for client-specified cache
203    // parameters. Both of these are temporary measures to solve a specific
204    // problem that will be solved in a better way going forward.
205
206    updateCacheParamsFromSystemProperty();
207
208    if (cacheConfig != NULL) {
209        updateCacheParamsFromString(cacheConfig);
210    }
211
212    if (mDisconnectAtHighwatermark) {
213        // Makes no sense to disconnect and do keep-alives...
214        mKeepAliveIntervalUs = 0;
215    }
216
217    mLooper->setName("NuCachedSource2");
218    mLooper->registerHandler(mReflector);
219
220    // Since it may not be obvious why our looper thread needs to be
221    // able to call into java since it doesn't appear to do so at all...
222    // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
223    // and a local JAVA IBinder will call directly into JNI methods.
224    // So whenever we call DataSource::readAt it may end up in a call to
225    // IMediaHTTPConnection::readAt and therefore call back into JAVA.
226    mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
227
228    Mutex::Autolock autoLock(mLock);
229    (new AMessage(kWhatFetchMore, mReflector->id()))->post();
230}
231
232NuCachedSource2::~NuCachedSource2() {
233    mLooper->stop();
234    mLooper->unregisterHandler(mReflector->id());
235
236    delete mCache;
237    mCache = NULL;
238}
239
240status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
241    if (mSource->flags() & kIsHTTPBasedSource) {
242        HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
243        return source->getEstimatedBandwidthKbps(kbps);
244    }
245    return ERROR_UNSUPPORTED;
246}
247
248void NuCachedSource2::disconnect() {
249    if (mSource->flags() & kIsHTTPBasedSource) {
250        ALOGV("disconnecting HTTPBasedSource");
251
252        {
253            Mutex::Autolock autoLock(mLock);
254            // set mDisconnecting to true, if a fetch returns after
255            // this, the source will be marked as EOS.
256            mDisconnecting = true;
257        }
258
259        // explicitly disconnect from the source, to allow any
260        // pending reads to return more promptly
261        static_cast<HTTPBase *>(mSource.get())->disconnect();
262    }
263}
264
265status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
266    if (mSource->flags() & kIsHTTPBasedSource) {
267        HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
268        return source->setBandwidthStatCollectFreq(freqMs);
269    }
270    return ERROR_UNSUPPORTED;
271}
272
273status_t NuCachedSource2::initCheck() const {
274    return mSource->initCheck();
275}
276
277status_t NuCachedSource2::getSize(off64_t *size) {
278    return mSource->getSize(size);
279}
280
281uint32_t NuCachedSource2::flags() {
282    // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
283    uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
284    return (flags | kIsCachingDataSource);
285}
286
287void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
288    switch (msg->what()) {
289        case kWhatFetchMore:
290        {
291            onFetch();
292            break;
293        }
294
295        case kWhatRead:
296        {
297            onRead(msg);
298            break;
299        }
300
301        default:
302            TRESPASS();
303    }
304}
305
306void NuCachedSource2::fetchInternal() {
307    ALOGV("fetchInternal");
308
309    bool reconnect = false;
310
311    {
312        Mutex::Autolock autoLock(mLock);
313        CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
314
315        if (mFinalStatus != OK) {
316            --mNumRetriesLeft;
317
318            reconnect = true;
319        }
320    }
321
322    if (reconnect) {
323        status_t err =
324            mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
325
326        Mutex::Autolock autoLock(mLock);
327
328        if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
329            // These are errors that are not likely to go away even if we
330            // retry, i.e. the server doesn't support range requests or similar.
331            mNumRetriesLeft = 0;
332            return;
333        } else if (err != OK) {
334            ALOGI("The attempt to reconnect failed, %d retries remaining",
335                 mNumRetriesLeft);
336
337            return;
338        }
339    }
340
341    PageCache::Page *page = mCache->acquirePage();
342
343    ssize_t n = mSource->readAt(
344            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
345
346    Mutex::Autolock autoLock(mLock);
347
348    if (n == 0 || mDisconnecting) {
349        ALOGI("ERROR_END_OF_STREAM");
350
351        mNumRetriesLeft = 0;
352        mFinalStatus = ERROR_END_OF_STREAM;
353
354        mCache->releasePage(page);
355    } else if (n < 0) {
356        mFinalStatus = n;
357        if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
358            // These are errors that are not likely to go away even if we
359            // retry, i.e. the server doesn't support range requests or similar.
360            mNumRetriesLeft = 0;
361        }
362
363        ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
364        mCache->releasePage(page);
365    } else {
366        if (mFinalStatus != OK) {
367            ALOGI("retrying a previously failed read succeeded.");
368        }
369        mNumRetriesLeft = kMaxNumRetries;
370        mFinalStatus = OK;
371
372        page->mSize = n;
373        mCache->appendPage(page);
374    }
375}
376
377void NuCachedSource2::onFetch() {
378    ALOGV("onFetch");
379
380    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
381        ALOGV("EOS reached, done prefetching for now");
382        mFetching = false;
383    }
384
385    bool keepAlive =
386        !mFetching
387            && mFinalStatus == OK
388            && mKeepAliveIntervalUs > 0
389            && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
390
391    if (mFetching || keepAlive) {
392        if (keepAlive) {
393            ALOGI("Keep alive");
394        }
395
396        fetchInternal();
397
398        mLastFetchTimeUs = ALooper::GetNowUs();
399
400        if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
401            ALOGI("Cache full, done prefetching for now");
402            mFetching = false;
403
404            if (mDisconnectAtHighwatermark
405                    && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
406                ALOGV("Disconnecting at high watermark");
407                static_cast<HTTPBase *>(mSource.get())->disconnect();
408                mFinalStatus = -EAGAIN;
409            }
410        }
411    } else {
412        Mutex::Autolock autoLock(mLock);
413        restartPrefetcherIfNecessary_l();
414    }
415
416    int64_t delayUs;
417    if (mFetching) {
418        if (mFinalStatus != OK && mNumRetriesLeft > 0) {
419            // We failed this time and will try again in 3 seconds.
420            delayUs = 3000000ll;
421        } else {
422            delayUs = 0;
423        }
424    } else {
425        delayUs = 100000ll;
426    }
427
428    (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
429}
430
431void NuCachedSource2::onRead(const sp<AMessage> &msg) {
432    ALOGV("onRead");
433
434    int64_t offset;
435    CHECK(msg->findInt64("offset", &offset));
436
437    void *data;
438    CHECK(msg->findPointer("data", &data));
439
440    size_t size;
441    CHECK(msg->findSize("size", &size));
442
443    ssize_t result = readInternal(offset, data, size);
444
445    if (result == -EAGAIN) {
446        msg->post(50000);
447        return;
448    }
449
450    Mutex::Autolock autoLock(mLock);
451
452    CHECK(mAsyncResult == NULL);
453
454    mAsyncResult = new AMessage;
455    mAsyncResult->setInt32("result", result);
456
457    mCondition.signal();
458}
459
460void NuCachedSource2::restartPrefetcherIfNecessary_l(
461        bool ignoreLowWaterThreshold, bool force) {
462    static const size_t kGrayArea = 1024 * 1024;
463
464    if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
465        return;
466    }
467
468    if (!ignoreLowWaterThreshold && !force
469            && mCacheOffset + mCache->totalSize() - mLastAccessPos
470                >= mLowwaterThresholdBytes) {
471        return;
472    }
473
474    size_t maxBytes = mLastAccessPos - mCacheOffset;
475
476    if (!force) {
477        if (maxBytes < kGrayArea) {
478            return;
479        }
480
481        maxBytes -= kGrayArea;
482    }
483
484    size_t actualBytes = mCache->releaseFromStart(maxBytes);
485    mCacheOffset += actualBytes;
486
487    ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
488    mFetching = true;
489}
490
491ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
492    Mutex::Autolock autoSerializer(mSerializer);
493
494    ALOGV("readAt offset %lld, size %zu", offset, size);
495
496    Mutex::Autolock autoLock(mLock);
497
498    // If the request can be completely satisfied from the cache, do so.
499
500    if (offset >= mCacheOffset
501            && offset + size <= mCacheOffset + mCache->totalSize()) {
502        size_t delta = offset - mCacheOffset;
503        mCache->copy(delta, data, size);
504
505        mLastAccessPos = offset + size;
506
507        return size;
508    }
509
510    sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
511    msg->setInt64("offset", offset);
512    msg->setPointer("data", data);
513    msg->setSize("size", size);
514
515    CHECK(mAsyncResult == NULL);
516    msg->post();
517
518    while (mAsyncResult == NULL) {
519        mCondition.wait(mLock);
520    }
521
522    int32_t result;
523    CHECK(mAsyncResult->findInt32("result", &result));
524
525    mAsyncResult.clear();
526
527    if (result > 0) {
528        mLastAccessPos = offset + result;
529    }
530
531    return (ssize_t)result;
532}
533
534size_t NuCachedSource2::cachedSize() {
535    Mutex::Autolock autoLock(mLock);
536    return mCacheOffset + mCache->totalSize();
537}
538
539size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
540    Mutex::Autolock autoLock(mLock);
541    return approxDataRemaining_l(finalStatus);
542}
543
544size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
545    *finalStatus = mFinalStatus;
546
547    if (mFinalStatus != OK && mNumRetriesLeft > 0) {
548        // Pretend that everything is fine until we're out of retries.
549        *finalStatus = OK;
550    }
551
552    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
553    if (mLastAccessPos < lastBytePosCached) {
554        return lastBytePosCached - mLastAccessPos;
555    }
556    return 0;
557}
558
559ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
560    CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
561
562    ALOGV("readInternal offset %lld size %zu", offset, size);
563
564    Mutex::Autolock autoLock(mLock);
565
566    if (!mFetching) {
567        mLastAccessPos = offset;
568        restartPrefetcherIfNecessary_l(
569                false, // ignoreLowWaterThreshold
570                true); // force
571    }
572
573    if (offset < mCacheOffset
574            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
575        static const off64_t kPadding = 256 * 1024;
576
577        // In the presence of multiple decoded streams, once of them will
578        // trigger this seek request, the other one will request data "nearby"
579        // soon, adjust the seek position so that that subsequent request
580        // does not trigger another seek.
581        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
582
583        seekInternal_l(seekOffset);
584    }
585
586    size_t delta = offset - mCacheOffset;
587
588    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
589        if (delta >= mCache->totalSize()) {
590            return mFinalStatus;
591        }
592
593        size_t avail = mCache->totalSize() - delta;
594
595        if (avail > size) {
596            avail = size;
597        }
598
599        mCache->copy(delta, data, avail);
600
601        return avail;
602    }
603
604    if (offset + size <= mCacheOffset + mCache->totalSize()) {
605        mCache->copy(delta, data, size);
606
607        return size;
608    }
609
610    ALOGV("deferring read");
611
612    return -EAGAIN;
613}
614
615status_t NuCachedSource2::seekInternal_l(off64_t offset) {
616    mLastAccessPos = offset;
617
618    if (offset >= mCacheOffset
619            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
620        return OK;
621    }
622
623    ALOGI("new range: offset= %lld", offset);
624
625    mCacheOffset = offset;
626
627    size_t totalSize = mCache->totalSize();
628    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
629
630    mNumRetriesLeft = kMaxNumRetries;
631    mFetching = true;
632
633    return OK;
634}
635
636void NuCachedSource2::resumeFetchingIfNecessary() {
637    Mutex::Autolock autoLock(mLock);
638
639    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
640}
641
642sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
643    return mSource->DrmInitialization(mime);
644}
645
646void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
647    mSource->getDrmInfo(handle, client);
648}
649
650String8 NuCachedSource2::getUri() {
651    return mSource->getUri();
652}
653
654String8 NuCachedSource2::getMIMEType() const {
655    return mSource->getMIMEType();
656}
657
658void NuCachedSource2::updateCacheParamsFromSystemProperty() {
659    char value[PROPERTY_VALUE_MAX];
660    if (!property_get("media.stagefright.cache-params", value, NULL)) {
661        return;
662    }
663
664    updateCacheParamsFromString(value);
665}
666
667void NuCachedSource2::updateCacheParamsFromString(const char *s) {
668    ssize_t lowwaterMarkKb, highwaterMarkKb;
669    int keepAliveSecs;
670
671    if (sscanf(s, "%zd/%zd/%d",
672               &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
673        ALOGE("Failed to parse cache parameters from '%s'.", s);
674        return;
675    }
676
677    if (lowwaterMarkKb >= 0) {
678        mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
679    } else {
680        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
681    }
682
683    if (highwaterMarkKb >= 0) {
684        mHighwaterThresholdBytes = highwaterMarkKb * 1024;
685    } else {
686        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
687    }
688
689    if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
690        ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
691
692        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
693        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
694    }
695
696    if (keepAliveSecs >= 0) {
697        mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
698    } else {
699        mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
700    }
701
702    ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %" PRId64 " us",
703         mLowwaterThresholdBytes,
704         mHighwaterThresholdBytes,
705         mKeepAliveIntervalUs);
706}
707
708// static
709void NuCachedSource2::RemoveCacheSpecificHeaders(
710        KeyedVector<String8, String8> *headers,
711        String8 *cacheConfig,
712        bool *disconnectAtHighwatermark) {
713    *cacheConfig = String8();
714    *disconnectAtHighwatermark = false;
715
716    if (headers == NULL) {
717        return;
718    }
719
720    ssize_t index;
721    if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
722        *cacheConfig = headers->valueAt(index);
723
724        headers->removeItemsAt(index);
725
726        ALOGV("Using special cache config '%s'", cacheConfig->string());
727    }
728
729    if ((index = headers->indexOfKey(
730                    String8("x-disconnect-at-highwatermark"))) >= 0) {
731        *disconnectAtHighwatermark = true;
732        headers->removeItemsAt(index);
733
734        ALOGV("Client requested disconnection at highwater mark");
735    }
736}
737
738}  // namespace android
739