1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <inttypes.h>
18
19//#define LOG_NDEBUG 0
20#define LOG_TAG "NuCachedSource2"
21#include <utils/Log.h>
22
23#include "include/NuCachedSource2.h"
24#include "include/HTTPBase.h"
25
26#include <cutils/properties.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/AMessage.h>
29#include <media/stagefright/MediaErrors.h>
30
31namespace android {
32
33struct PageCache {
34    explicit PageCache(size_t pageSize);
35    ~PageCache();
36
37    struct Page {
38        void *mData;
39        size_t mSize;
40    };
41
42    Page *acquirePage();
43    void releasePage(Page *page);
44
45    void appendPage(Page *page);
46    size_t releaseFromStart(size_t maxBytes);
47
48    size_t totalSize() const {
49        return mTotalSize;
50    }
51
52    void copy(size_t from, void *data, size_t size);
53
54private:
55    size_t mPageSize;
56    size_t mTotalSize;
57
58    List<Page *> mActivePages;
59    List<Page *> mFreePages;
60
61    void freePages(List<Page *> *list);
62
63    DISALLOW_EVIL_CONSTRUCTORS(PageCache);
64};
65
66PageCache::PageCache(size_t pageSize)
67    : mPageSize(pageSize),
68      mTotalSize(0) {
69}
70
71PageCache::~PageCache() {
72    freePages(&mActivePages);
73    freePages(&mFreePages);
74}
75
76void PageCache::freePages(List<Page *> *list) {
77    List<Page *>::iterator it = list->begin();
78    while (it != list->end()) {
79        Page *page = *it;
80
81        free(page->mData);
82        delete page;
83        page = NULL;
84
85        ++it;
86    }
87}
88
89PageCache::Page *PageCache::acquirePage() {
90    if (!mFreePages.empty()) {
91        List<Page *>::iterator it = mFreePages.begin();
92        Page *page = *it;
93        mFreePages.erase(it);
94
95        return page;
96    }
97
98    Page *page = new Page;
99    page->mData = malloc(mPageSize);
100    page->mSize = 0;
101
102    return page;
103}
104
105void PageCache::releasePage(Page *page) {
106    page->mSize = 0;
107    mFreePages.push_back(page);
108}
109
110void PageCache::appendPage(Page *page) {
111    mTotalSize += page->mSize;
112    mActivePages.push_back(page);
113}
114
115size_t PageCache::releaseFromStart(size_t maxBytes) {
116    size_t bytesReleased = 0;
117
118    while (maxBytes > 0 && !mActivePages.empty()) {
119        List<Page *>::iterator it = mActivePages.begin();
120
121        Page *page = *it;
122
123        if (maxBytes < page->mSize) {
124            break;
125        }
126
127        mActivePages.erase(it);
128
129        maxBytes -= page->mSize;
130        bytesReleased += page->mSize;
131
132        releasePage(page);
133    }
134
135    mTotalSize -= bytesReleased;
136    return bytesReleased;
137}
138
139void PageCache::copy(size_t from, void *data, size_t size) {
140    ALOGV("copy from %zu size %zu", from, size);
141
142    if (size == 0) {
143        return;
144    }
145
146    CHECK_LE(from + size, mTotalSize);
147
148    size_t offset = 0;
149    List<Page *>::iterator it = mActivePages.begin();
150    while (from >= offset + (*it)->mSize) {
151        offset += (*it)->mSize;
152        ++it;
153    }
154
155    size_t delta = from - offset;
156    size_t avail = (*it)->mSize - delta;
157
158    if (avail >= size) {
159        memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
160        return;
161    }
162
163    memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
164    ++it;
165    data = (uint8_t *)data + avail;
166    size -= avail;
167
168    while (size > 0) {
169        size_t copy = (*it)->mSize;
170        if (copy > size) {
171            copy = size;
172        }
173        memcpy(data, (*it)->mData, copy);
174        data = (uint8_t *)data + copy;
175        size -= copy;
176        ++it;
177    }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181
182NuCachedSource2::NuCachedSource2(
183        const sp<DataSource> &source,
184        const char *cacheConfig,
185        bool disconnectAtHighwatermark)
186    : mSource(source),
187      mReflector(new AHandlerReflector<NuCachedSource2>(this)),
188      mLooper(new ALooper),
189      mCache(new PageCache(kPageSize)),
190      mCacheOffset(0),
191      mFinalStatus(OK),
192      mLastAccessPos(0),
193      mFetching(true),
194      mDisconnecting(false),
195      mLastFetchTimeUs(-1),
196      mNumRetriesLeft(kMaxNumRetries),
197      mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
198      mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
199      mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
200      mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
201    // We are NOT going to support disconnect-at-highwatermark indefinitely
202    // and we are not guaranteeing support for client-specified cache
203    // parameters. Both of these are temporary measures to solve a specific
204    // problem that will be solved in a better way going forward.
205
206    updateCacheParamsFromSystemProperty();
207
208    if (cacheConfig != NULL) {
209        updateCacheParamsFromString(cacheConfig);
210    }
211
212    if (mDisconnectAtHighwatermark) {
213        // Makes no sense to disconnect and do keep-alives...
214        mKeepAliveIntervalUs = 0;
215    }
216
217    mLooper->setName("NuCachedSource2");
218    mLooper->registerHandler(mReflector);
219
220    // Since it may not be obvious why our looper thread needs to be
221    // able to call into java since it doesn't appear to do so at all...
222    // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
223    // and a local JAVA IBinder will call directly into JNI methods.
224    // So whenever we call DataSource::readAt it may end up in a call to
225    // IMediaHTTPConnection::readAt and therefore call back into JAVA.
226    mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
227
228    mName = String8::format("NuCachedSource2(%s)", mSource->toString().string());
229}
230
231NuCachedSource2::~NuCachedSource2() {
232    mLooper->stop();
233    mLooper->unregisterHandler(mReflector->id());
234
235    delete mCache;
236    mCache = NULL;
237}
238
239// static
240sp<NuCachedSource2> NuCachedSource2::Create(
241        const sp<DataSource> &source,
242        const char *cacheConfig,
243        bool disconnectAtHighwatermark) {
244    sp<NuCachedSource2> instance = new NuCachedSource2(
245            source, cacheConfig, disconnectAtHighwatermark);
246    Mutex::Autolock autoLock(instance->mLock);
247    (new AMessage(kWhatFetchMore, instance->mReflector))->post();
248    return instance;
249}
250
251status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
252    if (mSource->flags() & kIsHTTPBasedSource) {
253        HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
254        return source->getEstimatedBandwidthKbps(kbps);
255    }
256    return ERROR_UNSUPPORTED;
257}
258
259void NuCachedSource2::disconnect() {
260    if (mSource->flags() & kIsHTTPBasedSource) {
261        ALOGV("disconnecting HTTPBasedSource");
262
263        {
264            Mutex::Autolock autoLock(mLock);
265            // set mDisconnecting to true, if a fetch returns after
266            // this, the source will be marked as EOS.
267            mDisconnecting = true;
268
269            // explicitly signal mCondition so that the pending readAt()
270            // will immediately return
271            mCondition.signal();
272        }
273
274        // explicitly disconnect from the source, to allow any
275        // pending reads to return more promptly
276        static_cast<HTTPBase *>(mSource.get())->disconnect();
277    }
278}
279
280status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
281    if (mSource->flags() & kIsHTTPBasedSource) {
282        HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
283        return source->setBandwidthStatCollectFreq(freqMs);
284    }
285    return ERROR_UNSUPPORTED;
286}
287
288status_t NuCachedSource2::initCheck() const {
289    return mSource->initCheck();
290}
291
292status_t NuCachedSource2::getSize(off64_t *size) {
293    return mSource->getSize(size);
294}
295
296uint32_t NuCachedSource2::flags() {
297    // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
298    uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
299    return (flags | kIsCachingDataSource);
300}
301
302void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
303    switch (msg->what()) {
304        case kWhatFetchMore:
305        {
306            onFetch();
307            break;
308        }
309
310        case kWhatRead:
311        {
312            onRead(msg);
313            break;
314        }
315
316        default:
317            TRESPASS();
318    }
319}
320
321void NuCachedSource2::fetchInternal() {
322    ALOGV("fetchInternal");
323
324    bool reconnect = false;
325
326    {
327        Mutex::Autolock autoLock(mLock);
328        CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
329
330        if (mFinalStatus != OK) {
331            --mNumRetriesLeft;
332
333            reconnect = true;
334        }
335    }
336
337    if (reconnect) {
338        status_t err =
339            mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
340
341        Mutex::Autolock autoLock(mLock);
342
343        if (mDisconnecting) {
344            mNumRetriesLeft = 0;
345            mFinalStatus = ERROR_END_OF_STREAM;
346            return;
347        } else if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
348            // These are errors that are not likely to go away even if we
349            // retry, i.e. the server doesn't support range requests or similar.
350            mNumRetriesLeft = 0;
351            return;
352        } else if (err != OK) {
353            ALOGI("The attempt to reconnect failed, %d retries remaining",
354                 mNumRetriesLeft);
355
356            return;
357        }
358    }
359
360    PageCache::Page *page = mCache->acquirePage();
361
362    ssize_t n = mSource->readAt(
363            mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
364
365    Mutex::Autolock autoLock(mLock);
366
367    if (n == 0 || mDisconnecting) {
368        ALOGI("caching reached eos.");
369
370        mNumRetriesLeft = 0;
371        mFinalStatus = ERROR_END_OF_STREAM;
372
373        mCache->releasePage(page);
374    } else if (n < 0) {
375        mFinalStatus = n;
376        if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
377            // These are errors that are not likely to go away even if we
378            // retry, i.e. the server doesn't support range requests or similar.
379            mNumRetriesLeft = 0;
380        }
381
382        ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
383        mCache->releasePage(page);
384    } else {
385        if (mFinalStatus != OK) {
386            ALOGI("retrying a previously failed read succeeded.");
387        }
388        mNumRetriesLeft = kMaxNumRetries;
389        mFinalStatus = OK;
390
391        page->mSize = n;
392        mCache->appendPage(page);
393    }
394}
395
396void NuCachedSource2::onFetch() {
397    ALOGV("onFetch");
398
399    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
400        ALOGV("EOS reached, done prefetching for now");
401        mFetching = false;
402    }
403
404    bool keepAlive =
405        !mFetching
406            && mFinalStatus == OK
407            && mKeepAliveIntervalUs > 0
408            && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
409
410    if (mFetching || keepAlive) {
411        if (keepAlive) {
412            ALOGI("Keep alive");
413        }
414
415        fetchInternal();
416
417        mLastFetchTimeUs = ALooper::GetNowUs();
418
419        if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
420            ALOGI("Cache full, done prefetching for now");
421            mFetching = false;
422
423            if (mDisconnectAtHighwatermark
424                    && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
425                ALOGV("Disconnecting at high watermark");
426                static_cast<HTTPBase *>(mSource.get())->disconnect();
427                mFinalStatus = -EAGAIN;
428            }
429        }
430    } else {
431        Mutex::Autolock autoLock(mLock);
432        restartPrefetcherIfNecessary_l();
433    }
434
435    int64_t delayUs;
436    if (mFetching) {
437        if (mFinalStatus != OK && mNumRetriesLeft > 0) {
438            // We failed this time and will try again in 3 seconds.
439            delayUs = 3000000ll;
440        } else {
441            delayUs = 0;
442        }
443    } else {
444        delayUs = 100000ll;
445    }
446
447    (new AMessage(kWhatFetchMore, mReflector))->post(delayUs);
448}
449
450void NuCachedSource2::onRead(const sp<AMessage> &msg) {
451    ALOGV("onRead");
452
453    int64_t offset;
454    CHECK(msg->findInt64("offset", &offset));
455
456    void *data;
457    CHECK(msg->findPointer("data", &data));
458
459    size_t size;
460    CHECK(msg->findSize("size", &size));
461
462    ssize_t result = readInternal(offset, data, size);
463
464    if (result == -EAGAIN) {
465        msg->post(50000);
466        return;
467    }
468
469    Mutex::Autolock autoLock(mLock);
470    if (mDisconnecting) {
471        mCondition.signal();
472        return;
473    }
474
475    CHECK(mAsyncResult == NULL);
476
477    mAsyncResult = new AMessage;
478    mAsyncResult->setInt32("result", result);
479
480    mCondition.signal();
481}
482
483void NuCachedSource2::restartPrefetcherIfNecessary_l(
484        bool ignoreLowWaterThreshold, bool force) {
485    static const size_t kGrayArea = 1024 * 1024;
486
487    if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
488        return;
489    }
490
491    if (!ignoreLowWaterThreshold && !force
492            && mCacheOffset + mCache->totalSize() - mLastAccessPos
493                >= mLowwaterThresholdBytes) {
494        return;
495    }
496
497    size_t maxBytes = mLastAccessPos - mCacheOffset;
498
499    if (!force) {
500        if (maxBytes < kGrayArea) {
501            return;
502        }
503
504        maxBytes -= kGrayArea;
505    }
506
507    size_t actualBytes = mCache->releaseFromStart(maxBytes);
508    mCacheOffset += actualBytes;
509
510    ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
511    mFetching = true;
512}
513
514ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
515    Mutex::Autolock autoSerializer(mSerializer);
516
517    ALOGV("readAt offset %lld, size %zu", (long long)offset, size);
518
519    Mutex::Autolock autoLock(mLock);
520    if (mDisconnecting) {
521        return ERROR_END_OF_STREAM;
522    }
523
524    // If the request can be completely satisfied from the cache, do so.
525
526    if (offset >= mCacheOffset
527            && offset + size <= mCacheOffset + mCache->totalSize()) {
528        size_t delta = offset - mCacheOffset;
529        mCache->copy(delta, data, size);
530
531        mLastAccessPos = offset + size;
532
533        return size;
534    }
535
536    sp<AMessage> msg = new AMessage(kWhatRead, mReflector);
537    msg->setInt64("offset", offset);
538    msg->setPointer("data", data);
539    msg->setSize("size", size);
540
541    CHECK(mAsyncResult == NULL);
542    msg->post();
543
544    while (mAsyncResult == NULL && !mDisconnecting) {
545        mCondition.wait(mLock);
546    }
547
548    if (mDisconnecting) {
549        mAsyncResult.clear();
550        return ERROR_END_OF_STREAM;
551    }
552
553    int32_t result;
554    CHECK(mAsyncResult->findInt32("result", &result));
555
556    mAsyncResult.clear();
557
558    if (result > 0) {
559        mLastAccessPos = offset + result;
560    }
561
562    return (ssize_t)result;
563}
564
565size_t NuCachedSource2::cachedSize() {
566    Mutex::Autolock autoLock(mLock);
567    return mCacheOffset + mCache->totalSize();
568}
569
570size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
571    Mutex::Autolock autoLock(mLock);
572    return approxDataRemaining_l(finalStatus);
573}
574
575size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
576    *finalStatus = mFinalStatus;
577
578    if (mFinalStatus != OK && mNumRetriesLeft > 0) {
579        // Pretend that everything is fine until we're out of retries.
580        *finalStatus = OK;
581    }
582
583    off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
584    if (mLastAccessPos < lastBytePosCached) {
585        return lastBytePosCached - mLastAccessPos;
586    }
587    return 0;
588}
589
590ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
591    CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
592
593    ALOGV("readInternal offset %lld size %zu", (long long)offset, size);
594
595    Mutex::Autolock autoLock(mLock);
596
597    // If we're disconnecting, return EOS and don't access *data pointer.
598    // data could be on the stack of the caller to NuCachedSource2::readAt(),
599    // which may have exited already.
600    if (mDisconnecting) {
601        return ERROR_END_OF_STREAM;
602    }
603
604    if (!mFetching) {
605        mLastAccessPos = offset;
606        restartPrefetcherIfNecessary_l(
607                false, // ignoreLowWaterThreshold
608                true); // force
609    }
610
611    if (offset < mCacheOffset
612            || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
613        static const off64_t kPadding = 256 * 1024;
614
615        // In the presence of multiple decoded streams, once of them will
616        // trigger this seek request, the other one will request data "nearby"
617        // soon, adjust the seek position so that that subsequent request
618        // does not trigger another seek.
619        off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
620
621        seekInternal_l(seekOffset);
622    }
623
624    size_t delta = offset - mCacheOffset;
625
626    if (mFinalStatus != OK && mNumRetriesLeft == 0) {
627        if (delta >= mCache->totalSize()) {
628            return mFinalStatus;
629        }
630
631        size_t avail = mCache->totalSize() - delta;
632
633        if (avail > size) {
634            avail = size;
635        }
636
637        mCache->copy(delta, data, avail);
638
639        return avail;
640    }
641
642    if (offset + size <= mCacheOffset + mCache->totalSize()) {
643        mCache->copy(delta, data, size);
644
645        return size;
646    }
647
648    ALOGV("deferring read");
649
650    return -EAGAIN;
651}
652
653status_t NuCachedSource2::seekInternal_l(off64_t offset) {
654    mLastAccessPos = offset;
655
656    if (offset >= mCacheOffset
657            && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
658        return OK;
659    }
660
661    ALOGI("new range: offset= %lld", (long long)offset);
662
663    mCacheOffset = offset;
664
665    size_t totalSize = mCache->totalSize();
666    CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
667
668    mNumRetriesLeft = kMaxNumRetries;
669    mFetching = true;
670
671    return OK;
672}
673
674void NuCachedSource2::resumeFetchingIfNecessary() {
675    Mutex::Autolock autoLock(mLock);
676
677    restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
678}
679
680sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
681    return mSource->DrmInitialization(mime);
682}
683
684String8 NuCachedSource2::getUri() {
685    return mSource->getUri();
686}
687
688String8 NuCachedSource2::getMIMEType() const {
689    return mSource->getMIMEType();
690}
691
692void NuCachedSource2::updateCacheParamsFromSystemProperty() {
693    char value[PROPERTY_VALUE_MAX];
694    if (!property_get("media.stagefright.cache-params", value, NULL)) {
695        return;
696    }
697
698    updateCacheParamsFromString(value);
699}
700
701void NuCachedSource2::updateCacheParamsFromString(const char *s) {
702    ssize_t lowwaterMarkKb, highwaterMarkKb;
703    int keepAliveSecs;
704
705    if (sscanf(s, "%zd/%zd/%d",
706               &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
707        ALOGE("Failed to parse cache parameters from '%s'.", s);
708        return;
709    }
710
711    if (lowwaterMarkKb >= 0) {
712        mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
713    } else {
714        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
715    }
716
717    if (highwaterMarkKb >= 0) {
718        mHighwaterThresholdBytes = highwaterMarkKb * 1024;
719    } else {
720        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
721    }
722
723    if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
724        ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
725
726        mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
727        mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
728    }
729
730    if (keepAliveSecs >= 0) {
731        mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
732    } else {
733        mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
734    }
735
736    ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us",
737         mLowwaterThresholdBytes,
738         mHighwaterThresholdBytes,
739         (long long)mKeepAliveIntervalUs);
740}
741
742// static
743void NuCachedSource2::RemoveCacheSpecificHeaders(
744        KeyedVector<String8, String8> *headers,
745        String8 *cacheConfig,
746        bool *disconnectAtHighwatermark) {
747    *cacheConfig = String8();
748    *disconnectAtHighwatermark = false;
749
750    if (headers == NULL) {
751        return;
752    }
753
754    ssize_t index;
755    if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
756        *cacheConfig = headers->valueAt(index);
757
758        headers->removeItemsAt(index);
759
760        ALOGV("Using special cache config '%s'", cacheConfig->string());
761    }
762
763    if ((index = headers->indexOfKey(
764                    String8("x-disconnect-at-highwatermark"))) >= 0) {
765        *disconnectAtHighwatermark = true;
766        headers->removeItemsAt(index);
767
768        ALOGV("Client requested disconnection at highwater mark");
769    }
770}
771
772}  // namespace android
773