1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "Prefetcher"
18//#define LOG_NDEBUG 0
19#include <utils/Log.h>
20
21#include "include/Prefetcher.h"
22
23#include <media/stagefright/MediaBuffer.h>
24#include <media/stagefright/MediaDebug.h>
25#include <media/stagefright/MediaErrors.h>
26#include <media/stagefright/MediaSource.h>
27#include <media/stagefright/MetaData.h>
28#include <utils/List.h>
29
30namespace android {
31
32struct PrefetchedSource : public MediaSource {
33    PrefetchedSource(
34            size_t index,
35            const sp<MediaSource> &source);
36
37    virtual status_t start(MetaData *params);
38    virtual status_t stop();
39
40    virtual status_t read(
41            MediaBuffer **buffer, const ReadOptions *options);
42
43    virtual sp<MetaData> getFormat();
44
45protected:
46    virtual ~PrefetchedSource();
47
48private:
49    friend struct Prefetcher;
50
51    Mutex mLock;
52    Condition mCondition;
53
54    sp<MediaSource> mSource;
55    size_t mIndex;
56    bool mStarted;
57    bool mReachedEOS;
58    status_t mFinalStatus;
59    int64_t mSeekTimeUs;
60    int64_t mCacheDurationUs;
61    size_t mCacheSizeBytes;
62    bool mPrefetcherStopped;
63    bool mCurrentlyPrefetching;
64
65    List<MediaBuffer *> mCachedBuffers;
66
67    // Returns true iff source is currently caching.
68    bool getCacheDurationUs(int64_t *durationUs, size_t *totalSize = NULL);
69
70    void updateCacheDuration_l();
71    void clearCache_l();
72
73    void cacheMore();
74    void onPrefetcherStopped();
75
76    PrefetchedSource(const PrefetchedSource &);
77    PrefetchedSource &operator=(const PrefetchedSource &);
78};
79
80Prefetcher::Prefetcher()
81    : mDone(false),
82      mThreadExited(false) {
83    startThread();
84}
85
86Prefetcher::~Prefetcher() {
87    stopThread();
88}
89
90sp<MediaSource> Prefetcher::addSource(const sp<MediaSource> &source) {
91    Mutex::Autolock autoLock(mLock);
92
93    sp<PrefetchedSource> psource =
94        new PrefetchedSource(mSources.size(), source);
95
96    mSources.add(psource);
97
98    return psource;
99}
100
101void Prefetcher::startThread() {
102    mThreadExited = false;
103    mDone = false;
104
105    int res = androidCreateThreadEtc(
106            ThreadWrapper, this, "Prefetcher",
107            ANDROID_PRIORITY_DEFAULT, 0, &mThread);
108
109    CHECK_EQ(res, 1);
110}
111
112void Prefetcher::stopThread() {
113    Mutex::Autolock autoLock(mLock);
114
115    while (!mThreadExited) {
116        mDone = true;
117        mCondition.signal();
118        mCondition.wait(mLock);
119    }
120}
121
122// static
123int Prefetcher::ThreadWrapper(void *me) {
124    static_cast<Prefetcher *>(me)->threadFunc();
125
126    return 0;
127}
128
129// Cache at most 1 min for each source.
130static int64_t kMaxCacheDurationUs = 60 * 1000000ll;
131
132// At the same time cache at most 5MB per source.
133static size_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
134
135// If the amount of cached data drops below this,
136// fill the cache up to the max duration again.
137static int64_t kLowWaterDurationUs = 5000000ll;
138
139void Prefetcher::threadFunc() {
140    bool fillingCache = false;
141
142    for (;;) {
143        sp<PrefetchedSource> minSource;
144        int64_t minCacheDurationUs = -1;
145
146        {
147            Mutex::Autolock autoLock(mLock);
148            if (mDone) {
149                break;
150            }
151
152            mCondition.waitRelative(
153                    mLock, fillingCache ? 1ll : 1000000000ll);
154
155
156            ssize_t minIndex = -1;
157            for (size_t i = 0; i < mSources.size(); ++i) {
158                sp<PrefetchedSource> source = mSources[i].promote();
159
160                if (source == NULL) {
161                    continue;
162                }
163
164                int64_t cacheDurationUs;
165                size_t cacheSizeBytes;
166                if (!source->getCacheDurationUs(&cacheDurationUs, &cacheSizeBytes)) {
167                    continue;
168                }
169
170                if (cacheSizeBytes > kMaxCacheSizeBytes) {
171                    LOGI("max cache size reached");
172                    continue;
173                }
174
175                if (mSources.size() > 1 && cacheDurationUs >= kMaxCacheDurationUs) {
176                    LOGI("max duration reached, size = %d bytes", cacheSizeBytes);
177                    continue;
178                }
179
180                if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) {
181                    minCacheDurationUs = cacheDurationUs;
182                    minIndex = i;
183                    minSource = source;
184                }
185            }
186
187            if (minIndex < 0) {
188                if (fillingCache) {
189                    LOGV("[%p] done filling the cache, above high water mark.",
190                         this);
191                    fillingCache = false;
192                }
193                continue;
194            }
195        }
196
197        if (!fillingCache && minCacheDurationUs < kLowWaterDurationUs) {
198            LOGI("[%p] cache below low water mark, filling cache.", this);
199            fillingCache = true;
200        }
201
202        if (fillingCache) {
203            // Make sure not to hold the lock while calling into the source.
204            // The lock guards the list of sources, not the individual sources
205            // themselves.
206            minSource->cacheMore();
207        }
208    }
209
210    Mutex::Autolock autoLock(mLock);
211    for (size_t i = 0; i < mSources.size(); ++i) {
212        sp<PrefetchedSource> source = mSources[i].promote();
213
214        if (source == NULL) {
215            continue;
216        }
217
218        source->onPrefetcherStopped();
219    }
220
221    mThreadExited = true;
222    mCondition.signal();
223}
224
225int64_t Prefetcher::getCachedDurationUs(bool *noMoreData) {
226    Mutex::Autolock autoLock(mLock);
227
228    int64_t minCacheDurationUs = -1;
229    ssize_t minIndex = -1;
230    bool anySourceActive = false;
231    for (size_t i = 0; i < mSources.size(); ++i) {
232        int64_t cacheDurationUs;
233        sp<PrefetchedSource> source = mSources[i].promote();
234        if (source == NULL) {
235            continue;
236        }
237
238        if (source->getCacheDurationUs(&cacheDurationUs)) {
239            anySourceActive = true;
240        }
241
242        if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) {
243            minCacheDurationUs = cacheDurationUs;
244            minIndex = i;
245        }
246    }
247
248    if (noMoreData) {
249        *noMoreData = !anySourceActive;
250    }
251
252    return minCacheDurationUs < 0 ? 0 : minCacheDurationUs;
253}
254
255status_t Prefetcher::prepare(
256        bool (*continueFunc)(void *cookie), void *cookie) {
257    // Fill the cache.
258
259    int64_t duration;
260    bool noMoreData;
261    do {
262        usleep(100000);
263
264        if (continueFunc && !(*continueFunc)(cookie)) {
265            return -EINTR;
266        }
267
268        duration = getCachedDurationUs(&noMoreData);
269    } while (!noMoreData && duration < 2000000ll);
270
271    return OK;
272}
273
274////////////////////////////////////////////////////////////////////////////////
275
276PrefetchedSource::PrefetchedSource(
277        size_t index,
278        const sp<MediaSource> &source)
279    : mSource(source),
280      mIndex(index),
281      mStarted(false),
282      mReachedEOS(false),
283      mSeekTimeUs(0),
284      mCacheDurationUs(0),
285      mCacheSizeBytes(0),
286      mPrefetcherStopped(false),
287      mCurrentlyPrefetching(false) {
288}
289
290PrefetchedSource::~PrefetchedSource() {
291    if (mStarted) {
292        stop();
293    }
294}
295
296status_t PrefetchedSource::start(MetaData *params) {
297    CHECK(!mStarted);
298
299    Mutex::Autolock autoLock(mLock);
300
301    status_t err = mSource->start(params);
302
303    if (err != OK) {
304        return err;
305    }
306
307    mStarted = true;
308
309    return OK;
310}
311
312status_t PrefetchedSource::stop() {
313    CHECK(mStarted);
314
315    Mutex::Autolock autoLock(mLock);
316
317    while (mCurrentlyPrefetching) {
318        mCondition.wait(mLock);
319    }
320
321    clearCache_l();
322
323    status_t err = mSource->stop();
324
325    mStarted = false;
326
327    return err;
328}
329
330status_t PrefetchedSource::read(
331        MediaBuffer **out, const ReadOptions *options) {
332    *out = NULL;
333
334    Mutex::Autolock autoLock(mLock);
335
336    CHECK(mStarted);
337
338    int64_t seekTimeUs;
339    if (options && options->getSeekTo(&seekTimeUs)) {
340        CHECK(seekTimeUs >= 0);
341
342        clearCache_l();
343
344        mReachedEOS = false;
345        mSeekTimeUs = seekTimeUs;
346    }
347
348    while (!mPrefetcherStopped && !mReachedEOS && mCachedBuffers.empty()) {
349        mCondition.wait(mLock);
350    }
351
352    if (mCachedBuffers.empty()) {
353        return mReachedEOS ? mFinalStatus : ERROR_END_OF_STREAM;
354    }
355
356    *out = *mCachedBuffers.begin();
357    mCachedBuffers.erase(mCachedBuffers.begin());
358    updateCacheDuration_l();
359    mCacheSizeBytes -= (*out)->size();
360
361    return OK;
362}
363
364sp<MetaData> PrefetchedSource::getFormat() {
365    return mSource->getFormat();
366}
367
368bool PrefetchedSource::getCacheDurationUs(
369        int64_t *durationUs, size_t *totalSize) {
370    Mutex::Autolock autoLock(mLock);
371
372    *durationUs = mCacheDurationUs;
373    if (totalSize != NULL) {
374        *totalSize = mCacheSizeBytes;
375    }
376
377    if (!mStarted || mReachedEOS) {
378        return false;
379    }
380
381    return true;
382}
383
384void PrefetchedSource::cacheMore() {
385    MediaSource::ReadOptions options;
386
387    Mutex::Autolock autoLock(mLock);
388
389    if (!mStarted) {
390        return;
391    }
392
393    mCurrentlyPrefetching = true;
394
395    if (mSeekTimeUs >= 0) {
396        options.setSeekTo(mSeekTimeUs);
397        mSeekTimeUs = -1;
398    }
399
400    // Ensure our object does not go away while we're not holding
401    // the lock.
402    sp<PrefetchedSource> me = this;
403
404    mLock.unlock();
405    MediaBuffer *buffer;
406    status_t err = mSource->read(&buffer, &options);
407    mLock.lock();
408
409    if (err != OK) {
410        mCurrentlyPrefetching = false;
411        mReachedEOS = true;
412        mFinalStatus = err;
413        mCondition.signal();
414
415        return;
416    }
417
418    CHECK(buffer != NULL);
419
420    MediaBuffer *copy = new MediaBuffer(buffer->range_length());
421    memcpy(copy->data(),
422           (const uint8_t *)buffer->data() + buffer->range_offset(),
423           buffer->range_length());
424
425    sp<MetaData> from = buffer->meta_data();
426    sp<MetaData> to = copy->meta_data();
427
428    int64_t timeUs;
429    if (from->findInt64(kKeyTime, &timeUs)) {
430        to->setInt64(kKeyTime, timeUs);
431    }
432
433    buffer->release();
434    buffer = NULL;
435
436    mCachedBuffers.push_back(copy);
437    updateCacheDuration_l();
438    mCacheSizeBytes += copy->size();
439
440    mCurrentlyPrefetching = false;
441    mCondition.signal();
442}
443
444void PrefetchedSource::updateCacheDuration_l() {
445    if (mCachedBuffers.size() < 2) {
446        mCacheDurationUs = 0;
447    } else {
448        int64_t firstTimeUs, lastTimeUs;
449        CHECK((*mCachedBuffers.begin())->meta_data()->findInt64(
450                    kKeyTime, &firstTimeUs));
451        CHECK((*--mCachedBuffers.end())->meta_data()->findInt64(
452                    kKeyTime, &lastTimeUs));
453
454        mCacheDurationUs = lastTimeUs - firstTimeUs;
455    }
456}
457
458void PrefetchedSource::clearCache_l() {
459    List<MediaBuffer *>::iterator it = mCachedBuffers.begin();
460    while (it != mCachedBuffers.end()) {
461        (*it)->release();
462
463        it = mCachedBuffers.erase(it);
464    }
465
466    updateCacheDuration_l();
467    mCacheSizeBytes = 0;
468}
469
470void PrefetchedSource::onPrefetcherStopped() {
471    Mutex::Autolock autoLock(mLock);
472    mPrefetcherStopped = true;
473    mCondition.signal();
474}
475
476}  // namespace android
477