1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22#include "HTTPDownloader.h"
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "mpeg2ts/AnotherPacketSource.h"
27
28#include <cutils/properties.h>
29#include <media/IMediaHTTPService.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/foundation/AUtils.h>
34#include <media/stagefright/MediaDefs.h>
35#include <media/stagefright/MetaData.h>
36#include <media/stagefright/Utils.h>
37
38#include <utils/Mutex.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42
43namespace android {
44
45// static
46// Bandwidth Switch Mark Defaults
47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52struct LiveSession::BandwidthEstimator : public RefBase {
53    BandwidthEstimator();
54
55    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
56    bool estimateBandwidth(
57            int32_t *bandwidth,
58            bool *isStable = NULL,
59            int32_t *shortTermBps = NULL);
60
61private:
62    // Bandwidth estimation parameters
63    static const int32_t kShortTermBandwidthItems = 3;
64    static const int32_t kMinBandwidthHistoryItems = 20;
65    static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
66    static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
67    static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
68
69    struct BandwidthEntry {
70        int64_t mTimestampUs;
71        int64_t mDelayUs;
72        size_t mNumBytes;
73    };
74
75    Mutex mLock;
76    List<BandwidthEntry> mBandwidthHistory;
77    List<int32_t> mPrevEstimates;
78    int32_t mShortTermEstimate;
79    bool mHasNewSample;
80    bool mIsStable;
81    int64_t mTotalTransferTimeUs;
82    size_t mTotalTransferBytes;
83
84    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
85};
86
87LiveSession::BandwidthEstimator::BandwidthEstimator() :
88    mShortTermEstimate(0),
89    mHasNewSample(false),
90    mIsStable(true),
91    mTotalTransferTimeUs(0),
92    mTotalTransferBytes(0) {
93}
94
95void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
96        size_t numBytes, int64_t delayUs) {
97    AutoMutex autoLock(mLock);
98
99    int64_t nowUs = ALooper::GetNowUs();
100    BandwidthEntry entry;
101    entry.mTimestampUs = nowUs;
102    entry.mDelayUs = delayUs;
103    entry.mNumBytes = numBytes;
104    mTotalTransferTimeUs += delayUs;
105    mTotalTransferBytes += numBytes;
106    mBandwidthHistory.push_back(entry);
107    mHasNewSample = true;
108
109    // Remove no more than 10% of total transfer time at a time
110    // to avoid sudden jump on bandwidth estimation. There might
111    // be long blocking reads that takes up signification time,
112    // we have to keep a longer window in that case.
113    int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
114    if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
115        bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
116    } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
117        bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
118    }
119    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
120    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
121    while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
122        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
123        // remove sample if either absolute age or total transfer time is
124        // over kMaxBandwidthHistoryWindowUs
125        if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
126                mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
127            break;
128        }
129        mTotalTransferTimeUs -= it->mDelayUs;
130        mTotalTransferBytes -= it->mNumBytes;
131        mBandwidthHistory.erase(mBandwidthHistory.begin());
132    }
133}
134
135bool LiveSession::BandwidthEstimator::estimateBandwidth(
136        int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
137    AutoMutex autoLock(mLock);
138
139    if (mBandwidthHistory.size() < 2) {
140        return false;
141    }
142
143    if (!mHasNewSample) {
144        *bandwidthBps = *(--mPrevEstimates.end());
145        if (isStable) {
146            *isStable = mIsStable;
147        }
148        if (shortTermBps) {
149            *shortTermBps = mShortTermEstimate;
150        }
151        return true;
152    }
153
154    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
155    mPrevEstimates.push_back(*bandwidthBps);
156    while (mPrevEstimates.size() > 3) {
157        mPrevEstimates.erase(mPrevEstimates.begin());
158    }
159    mHasNewSample = false;
160
161    int64_t totalTimeUs = 0;
162    size_t totalBytes = 0;
163    if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
164        List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
165        for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
166            totalTimeUs += it->mDelayUs;
167            totalBytes += it->mNumBytes;
168        }
169    }
170    mShortTermEstimate = totalTimeUs > 0 ?
171            (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
172    if (shortTermBps) {
173        *shortTermBps = mShortTermEstimate;
174    }
175
176    int64_t minEstimate = -1, maxEstimate = -1;
177    List<int32_t>::iterator it;
178    for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
179        int32_t estimate = *it;
180        if (minEstimate < 0 || minEstimate > estimate) {
181            minEstimate = estimate;
182        }
183        if (maxEstimate < 0 || maxEstimate < estimate) {
184            maxEstimate = estimate;
185        }
186    }
187    // consider it stable if long-term average is not jumping a lot
188    // and short-term average is not much lower than long-term average
189    mIsStable = (maxEstimate <= minEstimate * 4 / 3)
190            && mShortTermEstimate > minEstimate * 7 / 10;
191    if (isStable) {
192        *isStable = mIsStable;
193    }
194
195#if 0
196    {
197        char dumpStr[1024] = {0};
198        size_t itemIdx = 0;
199        size_t histSize = mBandwidthHistory.size();
200        sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
201            *bandwidthBps, mIsStable, histSize);
202        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
203        for (; it != mBandwidthHistory.end(); ++it) {
204            if (itemIdx > 50) {
205                sprintf(dumpStr + strlen(dumpStr),
206                        "...(%zd more items)... }", histSize - itemIdx);
207                break;
208            }
209            sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
210                it->mNumBytes / 1024,
211                (double)it->mDelayUs * 1.0e-6,
212                (it == (--mBandwidthHistory.end())) ? "}" : ", ");
213            itemIdx++;
214        }
215        ALOGE(dumpStr);
216    }
217#endif
218    return true;
219}
220
221//static
222const char *LiveSession::getKeyForStream(StreamType type) {
223    switch (type) {
224        case STREAMTYPE_VIDEO:
225            return "timeUsVideo";
226        case STREAMTYPE_AUDIO:
227            return "timeUsAudio";
228        case STREAMTYPE_SUBTITLES:
229            return "timeUsSubtitle";
230        case STREAMTYPE_METADATA:
231            return "timeUsMetadata"; // unused
232        default:
233            TRESPASS();
234    }
235    return NULL;
236}
237
238//static
239const char *LiveSession::getNameForStream(StreamType type) {
240    switch (type) {
241        case STREAMTYPE_VIDEO:
242            return "video";
243        case STREAMTYPE_AUDIO:
244            return "audio";
245        case STREAMTYPE_SUBTITLES:
246            return "subs";
247        case STREAMTYPE_METADATA:
248            return "metadata";
249        default:
250            break;
251    }
252    return "unknown";
253}
254
255//static
256ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
257    switch (type) {
258        case STREAMTYPE_VIDEO:
259            return ATSParser::VIDEO;
260        case STREAMTYPE_AUDIO:
261            return ATSParser::AUDIO;
262        case STREAMTYPE_METADATA:
263            return ATSParser::META;
264        case STREAMTYPE_SUBTITLES:
265        default:
266            TRESPASS();
267    }
268    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
269}
270
271LiveSession::LiveSession(
272        const sp<AMessage> &notify, uint32_t flags,
273        const sp<IMediaHTTPService> &httpService)
274    : mNotify(notify),
275      mFlags(flags),
276      mHTTPService(httpService),
277      mBuffering(false),
278      mInPreparationPhase(true),
279      mPollBufferingGeneration(0),
280      mPrevBufferPercentage(-1),
281      mCurBandwidthIndex(-1),
282      mOrigBandwidthIndex(-1),
283      mLastBandwidthBps(-1ll),
284      mLastBandwidthStable(false),
285      mBandwidthEstimator(new BandwidthEstimator()),
286      mMaxWidth(720),
287      mMaxHeight(480),
288      mStreamMask(0),
289      mNewStreamMask(0),
290      mSwapMask(0),
291      mSwitchGeneration(0),
292      mSubtitleGeneration(0),
293      mLastDequeuedTimeUs(0ll),
294      mRealTimeBaseUs(0ll),
295      mReconfigurationInProgress(false),
296      mSwitchInProgress(false),
297      mUpSwitchMark(kUpSwitchMarkUs),
298      mDownSwitchMark(kDownSwitchMarkUs),
299      mUpSwitchMargin(kUpSwitchMarginUs),
300      mFirstTimeUsValid(false),
301      mFirstTimeUs(0),
302      mLastSeekTimeUs(0),
303      mHasMetadata(false) {
304    mStreams[kAudioIndex] = StreamItem("audio");
305    mStreams[kVideoIndex] = StreamItem("video");
306    mStreams[kSubtitleIndex] = StreamItem("subtitles");
307
308    for (size_t i = 0; i < kNumSources; ++i) {
309        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
310        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
311    }
312}
313
314LiveSession::~LiveSession() {
315    if (mFetcherLooper != NULL) {
316        mFetcherLooper->stop();
317    }
318}
319
320int64_t LiveSession::calculateMediaTimeUs(
321        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
322    if (timeUs >= firstTimeUs) {
323        timeUs -= firstTimeUs;
324    } else {
325        timeUs = 0;
326    }
327    timeUs += mLastSeekTimeUs;
328    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
329        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
330    }
331    return timeUs;
332}
333
334status_t LiveSession::dequeueAccessUnit(
335        StreamType stream, sp<ABuffer> *accessUnit) {
336    status_t finalResult = OK;
337    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
338
339    ssize_t streamIdx = typeToIndex(stream);
340    if (streamIdx < 0) {
341        return BAD_VALUE;
342    }
343    const char *streamStr = getNameForStream(stream);
344    // Do not let client pull data if we don't have data packets yet.
345    // We might only have a format discontinuity queued without data.
346    // When NuPlayerDecoder dequeues the format discontinuity, it will
347    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
348    // thinks it can do seamless change, so will not shutdown decoder.
349    // When the actual format arrives, it can't handle it and get stuck.
350    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
351        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
352                streamStr, finalResult);
353
354        if (finalResult == OK) {
355            return -EAGAIN;
356        } else {
357            return finalResult;
358        }
359    }
360
361    // Let the client dequeue as long as we have buffers available
362    // Do not make pause/resume decisions here.
363
364    status_t err = packetSource->dequeueAccessUnit(accessUnit);
365
366    if (err == INFO_DISCONTINUITY) {
367        // adaptive streaming, discontinuities in the playlist
368        int32_t type;
369        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
370
371        sp<AMessage> extra;
372        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
373            extra.clear();
374        }
375
376        ALOGI("[%s] read discontinuity of type %d, extra = %s",
377              streamStr,
378              type,
379              extra == NULL ? "NULL" : extra->debugString().c_str());
380    } else if (err == OK) {
381
382        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
383            int64_t timeUs, originalTimeUs;
384            int32_t discontinuitySeq = 0;
385            StreamItem& strm = mStreams[streamIdx];
386            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
387            originalTimeUs = timeUs;
388            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
389            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
390                int64_t offsetTimeUs;
391                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
392                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
393                } else {
394                    offsetTimeUs = 0;
395                }
396
397                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
398                        && strm.mLastDequeuedTimeUs >= 0) {
399                    int64_t firstTimeUs;
400                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
401                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
402                    offsetTimeUs += strm.mLastSampleDurationUs;
403                } else {
404                    offsetTimeUs += strm.mLastSampleDurationUs;
405                }
406
407                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
408                strm.mCurDiscontinuitySeq = discontinuitySeq;
409            }
410
411            int32_t discard = 0;
412            int64_t firstTimeUs;
413            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
414                int64_t durUs; // approximate sample duration
415                if (timeUs > strm.mLastDequeuedTimeUs) {
416                    durUs = timeUs - strm.mLastDequeuedTimeUs;
417                } else {
418                    durUs = strm.mLastDequeuedTimeUs - timeUs;
419                }
420                strm.mLastSampleDurationUs = durUs;
421                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
422            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
423                firstTimeUs = timeUs;
424            } else {
425                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
426                firstTimeUs = timeUs;
427            }
428
429            strm.mLastDequeuedTimeUs = timeUs;
430            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
431
432            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
433                    streamStr, (long long)timeUs, (long long)originalTimeUs);
434            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
435            mLastDequeuedTimeUs = timeUs;
436            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
437        } else if (stream == STREAMTYPE_SUBTITLES) {
438            int32_t subtitleGeneration;
439            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
440                    && subtitleGeneration != mSubtitleGeneration) {
441               return -EAGAIN;
442            };
443            (*accessUnit)->meta()->setInt32(
444                    "trackIndex", mPlaylist->getSelectedIndex());
445            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
446        } else if (stream == STREAMTYPE_METADATA) {
447            HLSTime mdTime((*accessUnit)->meta());
448            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
449                packetSource->requeueAccessUnit((*accessUnit));
450                return -EAGAIN;
451            } else {
452                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
453                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
454                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
455                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
456            }
457        }
458    } else {
459        ALOGI("[%s] encountered error %d", streamStr, err);
460    }
461
462    return err;
463}
464
465status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
466    if (!(mStreamMask & stream)) {
467        return UNKNOWN_ERROR;
468    }
469
470    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
471
472    *meta = packetSource->getFormat();
473
474    if (*meta == NULL) {
475        return -EWOULDBLOCK;
476    }
477
478    if (stream == STREAMTYPE_AUDIO) {
479        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
480        (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
481    } else if (stream == STREAMTYPE_VIDEO) {
482        (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
483        (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
484    }
485
486    return OK;
487}
488
489sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
490    return new HTTPDownloader(mHTTPService, mExtraHeaders);
491}
492
493void LiveSession::setBufferingSettings(
494        const BufferingSettings &buffering) {
495    sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
496    writeToAMessage(msg, buffering);
497    msg->post();
498}
499
500void LiveSession::connectAsync(
501        const char *url, const KeyedVector<String8, String8> *headers) {
502    sp<AMessage> msg = new AMessage(kWhatConnect, this);
503    msg->setString("url", url);
504
505    if (headers != NULL) {
506        msg->setPointer(
507                "headers",
508                new KeyedVector<String8, String8>(*headers));
509    }
510
511    msg->post();
512}
513
514status_t LiveSession::disconnect() {
515    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
516
517    sp<AMessage> response;
518    status_t err = msg->postAndAwaitResponse(&response);
519
520    return err;
521}
522
523status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
524    sp<AMessage> msg = new AMessage(kWhatSeek, this);
525    msg->setInt64("timeUs", timeUs);
526    msg->setInt32("mode", mode);
527
528    sp<AMessage> response;
529    status_t err = msg->postAndAwaitResponse(&response);
530
531    return err;
532}
533
534bool LiveSession::checkSwitchProgress(
535        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
536    AString newUri;
537    CHECK(stopParams->findString("uri", &newUri));
538
539    *needResumeUntil = false;
540    sp<AMessage> firstNewMeta[kMaxStreams];
541    for (size_t i = 0; i < kMaxStreams; ++i) {
542        StreamType stream = indexToType(i);
543        if (!(mSwapMask & mNewStreamMask & stream)
544            || (mStreams[i].mNewUri != newUri)) {
545            continue;
546        }
547        if (stream == STREAMTYPE_SUBTITLES) {
548            continue;
549        }
550        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
551
552        // First, get latest dequeued meta, which is where the decoder is at.
553        // (when upswitching, we take the meta after a certain delay, so that
554        // the decoder is left with some cushion)
555        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
556        if (delayUs > 0) {
557            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
558            if (lastDequeueMeta == NULL) {
559                // this means we don't have enough cushion, try again later
560                ALOGV("[%s] up switching failed due to insufficient buffer",
561                        getNameForStream(stream));
562                return false;
563            }
564        } else {
565            // It's okay for lastDequeueMeta to be NULL here, it means the
566            // decoder hasn't even started dequeueing
567            lastDequeueMeta = source->getLatestDequeuedMeta();
568        }
569        // Then, trim off packets at beginning of mPacketSources2 that's before
570        // the latest dequeued time. These samples are definitely too late.
571        firstNewMeta[i] = mPacketSources2.editValueAt(i)
572                            ->trimBuffersBeforeMeta(lastDequeueMeta);
573
574        // Now firstNewMeta[i] is the first sample after the trim.
575        // If it's NULL, we failed because dequeue already past all samples
576        // in mPacketSource2, we have to try again.
577        if (firstNewMeta[i] == NULL) {
578            HLSTime dequeueTime(lastDequeueMeta);
579            ALOGV("[%s] dequeue time (%d, %lld) past start time",
580                    getNameForStream(stream),
581                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
582            return false;
583        }
584
585        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
586        // already fetched, and see if we need to resumeUntil
587        lastEnqueueMeta = source->getLatestEnqueuedMeta();
588        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
589        // boundary, no need to resume as the content will look different anyways
590        if (lastEnqueueMeta != NULL) {
591            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
592
593            // no need to resume old fetcher if new fetcher started in different
594            // discontinuity sequence, as the content will look different.
595            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
596                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
597
598            // update the stopTime for resumeUntil
599            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
600            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
601        }
602    }
603
604    // if we're here, it means dequeue progress hasn't passed some samples in
605    // mPacketSource2, we can trim off the excess in mPacketSource.
606    // (old fetcher might still need to resumeUntil the start time of new fetcher)
607    for (size_t i = 0; i < kMaxStreams; ++i) {
608        StreamType stream = indexToType(i);
609        if (!(mSwapMask & mNewStreamMask & stream)
610            || (newUri != mStreams[i].mNewUri)
611            || stream == STREAMTYPE_SUBTITLES) {
612            continue;
613        }
614        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
615    }
616
617    // no resumeUntil if already underflow
618    *needResumeUntil &= !mBuffering;
619
620    return true;
621}
622
623void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
624    switch (msg->what()) {
625        case kWhatSetBufferingSettings:
626        {
627            readFromAMessage(msg, &mBufferingSettings);
628            break;
629        }
630
631        case kWhatConnect:
632        {
633            onConnect(msg);
634            break;
635        }
636
637        case kWhatDisconnect:
638        {
639            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
640
641            if (mReconfigurationInProgress) {
642                break;
643            }
644
645            finishDisconnect();
646            break;
647        }
648
649        case kWhatSeek:
650        {
651            if (mReconfigurationInProgress) {
652                msg->post(50000);
653                break;
654            }
655
656            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
657            mSeekReply = new AMessage;
658
659            onSeek(msg);
660            break;
661        }
662
663        case kWhatFetcherNotify:
664        {
665            int32_t what;
666            CHECK(msg->findInt32("what", &what));
667
668            switch (what) {
669                case PlaylistFetcher::kWhatStarted:
670                    break;
671                case PlaylistFetcher::kWhatPaused:
672                case PlaylistFetcher::kWhatStopped:
673                {
674                    AString uri;
675                    CHECK(msg->findString("uri", &uri));
676                    ssize_t index = mFetcherInfos.indexOfKey(uri);
677                    if (index < 0) {
678                        // ignore msgs from fetchers that's already gone
679                        break;
680                    }
681
682                    ALOGV("fetcher-%d %s",
683                            mFetcherInfos[index].mFetcher->getFetcherID(),
684                            what == PlaylistFetcher::kWhatPaused ?
685                                    "paused" : "stopped");
686
687                    if (what == PlaylistFetcher::kWhatStopped) {
688                        mFetcherLooper->unregisterHandler(
689                                mFetcherInfos[index].mFetcher->id());
690                        mFetcherInfos.removeItemsAt(index);
691                    } else if (what == PlaylistFetcher::kWhatPaused) {
692                        int32_t seekMode;
693                        CHECK(msg->findInt32("seekMode", &seekMode));
694                        for (size_t i = 0; i < kMaxStreams; ++i) {
695                            if (mStreams[i].mUri == uri) {
696                                mStreams[i].mSeekMode = (SeekMode) seekMode;
697                            }
698                        }
699                    }
700
701                    if (mContinuation != NULL) {
702                        CHECK_GT(mContinuationCounter, 0);
703                        if (--mContinuationCounter == 0) {
704                            mContinuation->post();
705                        }
706                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
707                    }
708                    break;
709                }
710
711                case PlaylistFetcher::kWhatDurationUpdate:
712                {
713                    AString uri;
714                    CHECK(msg->findString("uri", &uri));
715
716                    int64_t durationUs;
717                    CHECK(msg->findInt64("durationUs", &durationUs));
718
719                    ssize_t index = mFetcherInfos.indexOfKey(uri);
720                    if (index >= 0) {
721                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
722                        info->mDurationUs = durationUs;
723                    }
724                    break;
725                }
726
727                case PlaylistFetcher::kWhatTargetDurationUpdate:
728                {
729                    int64_t targetDurationUs;
730                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
731                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
732                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
733                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
734                    break;
735                }
736
737                case PlaylistFetcher::kWhatError:
738                {
739                    status_t err;
740                    CHECK(msg->findInt32("err", &err));
741
742                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
743
744                    // handle EOS on subtitle tracks independently
745                    AString uri;
746                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
747                        ssize_t i = mFetcherInfos.indexOfKey(uri);
748                        if (i >= 0) {
749                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
750                            if (fetcher != NULL) {
751                                uint32_t type = fetcher->getStreamTypeMask();
752                                if (type == STREAMTYPE_SUBTITLES) {
753                                    mPacketSources.valueFor(
754                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
755                                    break;
756                                }
757                            }
758                        }
759                    }
760
761                    // remember the failure index (as mCurBandwidthIndex will be restored
762                    // after cancelBandwidthSwitch()), and record last fail time
763                    size_t failureIndex = mCurBandwidthIndex;
764                    mBandwidthItems.editItemAt(
765                            failureIndex).mLastFailureUs = ALooper::GetNowUs();
766
767                    if (mSwitchInProgress) {
768                        // if error happened when we switch to a variant, try fallback
769                        // to other variant to save the session
770                        if (tryBandwidthFallback()) {
771                            break;
772                        }
773                    }
774
775                    if (mInPreparationPhase) {
776                        postPrepared(err);
777                    }
778
779                    cancelBandwidthSwitch();
780
781                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
782
783                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
784
785                    mPacketSources.valueFor(
786                            STREAMTYPE_SUBTITLES)->signalEOS(err);
787
788                    postError(err);
789                    break;
790                }
791
792                case PlaylistFetcher::kWhatStopReached:
793                {
794                    ALOGV("kWhatStopReached");
795
796                    AString oldUri;
797                    CHECK(msg->findString("uri", &oldUri));
798
799                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
800                    if (index < 0) {
801                        break;
802                    }
803
804                    tryToFinishBandwidthSwitch(oldUri);
805                    break;
806                }
807
808                case PlaylistFetcher::kWhatStartedAt:
809                {
810                    int32_t switchGeneration;
811                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
812
813                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
814                            switchGeneration, mSwitchGeneration);
815
816                    if (switchGeneration != mSwitchGeneration) {
817                        break;
818                    }
819
820                    AString uri;
821                    CHECK(msg->findString("uri", &uri));
822
823                    // mark new fetcher mToBeResumed
824                    ssize_t index = mFetcherInfos.indexOfKey(uri);
825                    if (index >= 0) {
826                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
827                    }
828
829                    // temporarily disable packet sources to be swapped to prevent
830                    // NuPlayerDecoder from dequeuing while we check progress
831                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
832                        if ((mSwapMask & mPacketSources.keyAt(i))
833                                && uri == mStreams[i].mNewUri) {
834                            mPacketSources.editValueAt(i)->enable(false);
835                        }
836                    }
837                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
838                    // If switching up, require a cushion bigger than kUnderflowMark
839                    // to avoid buffering immediately after the switch.
840                    // (If we don't have that cushion we'd rather cancel and try again.)
841                    int64_t delayUs =
842                        switchUp ?
843                            (mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll + 1000000ll)
844                            : 0;
845                    bool needResumeUntil = false;
846                    sp<AMessage> stopParams = msg;
847                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
848                        // playback time hasn't passed startAt time
849                        if (!needResumeUntil) {
850                            ALOGV("finish switch");
851                            for (size_t i = 0; i < kMaxStreams; ++i) {
852                                if ((mSwapMask & indexToType(i))
853                                        && uri == mStreams[i].mNewUri) {
854                                    // have to make a copy of mStreams[i].mUri because
855                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
856                                    AString oldURI = mStreams[i].mUri;
857                                    tryToFinishBandwidthSwitch(oldURI);
858                                    break;
859                                }
860                            }
861                        } else {
862                            // startAt time is after last enqueue time
863                            // Resume fetcher for the original variant; the resumed fetcher should
864                            // continue until the timestamps found in msg, which is stored by the
865                            // new fetcher to indicate where the new variant has started buffering.
866                            ALOGV("finish switch with resumeUntilAsync");
867                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
868                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
869                                if (info.mToBeRemoved) {
870                                    info.mFetcher->resumeUntilAsync(stopParams);
871                                }
872                            }
873                        }
874                    } else {
875                        // playback time passed startAt time
876                        if (switchUp) {
877                            // if switching up, cancel and retry if condition satisfies again
878                            ALOGV("cancel up switch because we're too late");
879                            cancelBandwidthSwitch(true /* resume */);
880                        } else {
881                            ALOGV("retry down switch at next sample");
882                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
883                        }
884                    }
885                    // re-enable all packet sources
886                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
887                        mPacketSources.editValueAt(i)->enable(true);
888                    }
889
890                    break;
891                }
892
893                case PlaylistFetcher::kWhatPlaylistFetched:
894                {
895                    onMasterPlaylistFetched(msg);
896                    break;
897                }
898
899                case PlaylistFetcher::kWhatMetadataDetected:
900                {
901                    if (!mHasMetadata) {
902                        mHasMetadata = true;
903                        sp<AMessage> notify = mNotify->dup();
904                        notify->setInt32("what", kWhatMetadataDetected);
905                        notify->post();
906                    }
907                    break;
908                }
909
910                default:
911                    TRESPASS();
912            }
913
914            break;
915        }
916
917        case kWhatChangeConfiguration:
918        {
919            onChangeConfiguration(msg);
920            break;
921        }
922
923        case kWhatChangeConfiguration2:
924        {
925            onChangeConfiguration2(msg);
926            break;
927        }
928
929        case kWhatChangeConfiguration3:
930        {
931            onChangeConfiguration3(msg);
932            break;
933        }
934
935        case kWhatPollBuffering:
936        {
937            int32_t generation;
938            CHECK(msg->findInt32("generation", &generation));
939            if (generation == mPollBufferingGeneration) {
940                onPollBuffering();
941            }
942            break;
943        }
944
945        default:
946            TRESPASS();
947            break;
948    }
949}
950
951// static
952bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
953    static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
954    return item.mLastFailureUs < 0
955            || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
956}
957
958// static
959int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
960    if (a->mBandwidth < b->mBandwidth) {
961        return -1;
962    } else if (a->mBandwidth == b->mBandwidth) {
963        return 0;
964    }
965
966    return 1;
967}
968
969// static
970LiveSession::StreamType LiveSession::indexToType(int idx) {
971    CHECK(idx >= 0 && idx < kNumSources);
972    return (StreamType)(1 << idx);
973}
974
975// static
976ssize_t LiveSession::typeToIndex(int32_t type) {
977    switch (type) {
978        case STREAMTYPE_AUDIO:
979            return 0;
980        case STREAMTYPE_VIDEO:
981            return 1;
982        case STREAMTYPE_SUBTITLES:
983            return 2;
984        case STREAMTYPE_METADATA:
985            return 3;
986        default:
987            return -1;
988    };
989    return -1;
990}
991
992void LiveSession::onConnect(const sp<AMessage> &msg) {
993    CHECK(msg->findString("url", &mMasterURL));
994
995    // TODO currently we don't know if we are coming here from incognito mode
996    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
997
998    KeyedVector<String8, String8> *headers = NULL;
999    if (!msg->findPointer("headers", (void **)&headers)) {
1000        mExtraHeaders.clear();
1001    } else {
1002        mExtraHeaders = *headers;
1003
1004        delete headers;
1005        headers = NULL;
1006    }
1007
1008    // create looper for fetchers
1009    if (mFetcherLooper == NULL) {
1010        mFetcherLooper = new ALooper();
1011
1012        mFetcherLooper->setName("Fetcher");
1013        mFetcherLooper->start(false, false);
1014    }
1015
1016    // create fetcher to fetch the master playlist
1017    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1018}
1019
1020void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1021    AString uri;
1022    CHECK(msg->findString("uri", &uri));
1023    ssize_t index = mFetcherInfos.indexOfKey(uri);
1024    if (index < 0) {
1025        ALOGW("fetcher for master playlist is gone.");
1026        return;
1027    }
1028
1029    // no longer useful, remove
1030    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1031    mFetcherInfos.removeItemsAt(index);
1032
1033    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1034    if (mPlaylist == NULL) {
1035        ALOGE("unable to fetch master playlist %s.",
1036                uriDebugString(mMasterURL).c_str());
1037
1038        postPrepared(ERROR_IO);
1039        return;
1040    }
1041    // We trust the content provider to make a reasonable choice of preferred
1042    // initial bandwidth by listing it first in the variant playlist.
1043    // At startup we really don't have a good estimate on the available
1044    // network bandwidth since we haven't tranferred any data yet. Once
1045    // we have we can make a better informed choice.
1046    size_t initialBandwidth = 0;
1047    size_t initialBandwidthIndex = 0;
1048
1049    int32_t maxWidth = 0;
1050    int32_t maxHeight = 0;
1051
1052    if (mPlaylist->isVariantPlaylist()) {
1053        Vector<BandwidthItem> itemsWithVideo;
1054        for (size_t i = 0; i < mPlaylist->size(); ++i) {
1055            BandwidthItem item;
1056
1057            item.mPlaylistIndex = i;
1058            item.mLastFailureUs = -1ll;
1059
1060            sp<AMessage> meta;
1061            AString uri;
1062            mPlaylist->itemAt(i, &uri, &meta);
1063
1064            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1065
1066            int32_t width, height;
1067            if (meta->findInt32("width", &width)) {
1068                maxWidth = max(maxWidth, width);
1069            }
1070            if (meta->findInt32("height", &height)) {
1071                maxHeight = max(maxHeight, height);
1072            }
1073
1074            mBandwidthItems.push(item);
1075            if (mPlaylist->hasType(i, "video")) {
1076                itemsWithVideo.push(item);
1077            }
1078        }
1079        // remove the audio-only variants if we have at least one with video
1080        if (!itemsWithVideo.empty()
1081                && itemsWithVideo.size() < mBandwidthItems.size()) {
1082            mBandwidthItems.clear();
1083            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1084                mBandwidthItems.push(itemsWithVideo[i]);
1085            }
1086        }
1087
1088        CHECK_GT(mBandwidthItems.size(), 0u);
1089        initialBandwidth = mBandwidthItems[0].mBandwidth;
1090
1091        mBandwidthItems.sort(SortByBandwidth);
1092
1093        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1094            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1095                initialBandwidthIndex = i;
1096                break;
1097            }
1098        }
1099    } else {
1100        // dummy item.
1101        BandwidthItem item;
1102        item.mPlaylistIndex = 0;
1103        item.mBandwidth = 0;
1104        mBandwidthItems.push(item);
1105    }
1106
1107    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1108    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1109
1110    mPlaylist->pickRandomMediaItems();
1111    changeConfiguration(
1112            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1113}
1114
1115void LiveSession::finishDisconnect() {
1116    ALOGV("finishDisconnect");
1117
1118    // No reconfiguration is currently pending, make sure none will trigger
1119    // during disconnection either.
1120    cancelBandwidthSwitch();
1121
1122    // cancel buffer polling
1123    cancelPollBuffering();
1124
1125    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1126    //
1127    // Some fetchers might be stuck in connect/getSize at this point. These
1128    // operations will eventually timeout (as we have a timeout set in
1129    // MediaHTTPConnection), but we don't want to block the main UI thread
1130    // until then. Here we just need to make sure we clear all references
1131    // to the fetchers, so that when they finally exit from the blocking
1132    // operation, they can be destructed.
1133    //
1134    // There is one very tricky point though. For this scheme to work, the
1135    // fecther must hold a reference to LiveSession, so that LiveSession is
1136    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1137    // own destructor when it waits for mFetcherLooper to stop, which still
1138    // blocks main UI thread.
1139    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1140        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1141        mFetcherLooper->unregisterHandler(
1142                mFetcherInfos.valueAt(i).mFetcher->id());
1143    }
1144    mFetcherInfos.clear();
1145
1146    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1147    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1148
1149    mPacketSources.valueFor(
1150            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1151
1152    sp<AMessage> response = new AMessage;
1153    response->setInt32("err", OK);
1154
1155    response->postReply(mDisconnectReplyID);
1156    mDisconnectReplyID.clear();
1157}
1158
1159sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1160    ssize_t index = mFetcherInfos.indexOfKey(uri);
1161
1162    if (index >= 0) {
1163        return NULL;
1164    }
1165
1166    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1167    notify->setString("uri", uri);
1168    notify->setInt32("switchGeneration", mSwitchGeneration);
1169
1170    FetcherInfo info;
1171    info.mFetcher = new PlaylistFetcher(
1172            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1173    info.mDurationUs = -1ll;
1174    info.mToBeRemoved = false;
1175    info.mToBeResumed = false;
1176    mFetcherLooper->registerHandler(info.mFetcher);
1177
1178    mFetcherInfos.add(uri, info);
1179
1180    return info.mFetcher;
1181}
1182
1183#if 0
1184static double uniformRand() {
1185    return (double)rand() / RAND_MAX;
1186}
1187#endif
1188
1189bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1190    ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1191            newUri ? "true" : "false",
1192            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1193    return i >= 0
1194            && ((!newUri && uri == mStreams[i].mUri)
1195            || (newUri && uri == mStreams[i].mNewUri));
1196}
1197
1198sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1199        size_t trackIndex, bool newUri) {
1200    StreamType type = indexToType(trackIndex);
1201    sp<AnotherPacketSource> source = NULL;
1202    if (newUri) {
1203        source = mPacketSources2.valueFor(type);
1204        source->clear();
1205    } else {
1206        source = mPacketSources.valueFor(type);
1207    };
1208    return source;
1209}
1210
1211sp<AnotherPacketSource> LiveSession::getMetadataSource(
1212        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1213    // todo: One case where the following strategy can fail is when audio and video
1214    // are in separate playlists, both are transport streams, and the metadata
1215    // is actually contained in the audio stream.
1216    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1217            streamMask, newUri ? "true" : "false");
1218
1219    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1220            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1221            // ... audio fetcher for audio only variant
1222        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1223    }
1224
1225    return NULL;
1226}
1227
1228bool LiveSession::resumeFetcher(
1229        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1230    ssize_t index = mFetcherInfos.indexOfKey(uri);
1231    if (index < 0) {
1232        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1233        return false;
1234    }
1235
1236    bool resume = false;
1237    sp<AnotherPacketSource> sources[kNumSources];
1238    for (size_t i = 0; i < kMaxStreams; ++i) {
1239        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1240            resume = true;
1241            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1242        }
1243    }
1244
1245    if (resume) {
1246        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1247        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1248
1249        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1250                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1251
1252        fetcher->startAsync(
1253                sources[kAudioIndex],
1254                sources[kVideoIndex],
1255                sources[kSubtitleIndex],
1256                getMetadataSource(sources, streamMask, newUri),
1257                timeUs, -1, -1, seekMode);
1258    }
1259
1260    return resume;
1261}
1262
1263float LiveSession::getAbortThreshold(
1264        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1265    float abortThreshold = -1.0f;
1266    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1267        /*
1268           If we're switching down, we need to decide whether to
1269
1270           1) finish last segment of high-bandwidth variant, or
1271           2) abort last segment of high-bandwidth variant, and fetch an
1272              overlapping portion from low-bandwidth variant.
1273
1274           Here we try to maximize the amount of buffer left when the
1275           switch point is met. Given the following parameters:
1276
1277           B: our current buffering level in seconds
1278           T: target duration in seconds
1279           X: sample duration in seconds remain to fetch in last segment
1280           bw0: bandwidth of old variant (as specified in playlist)
1281           bw1: bandwidth of new variant (as specified in playlist)
1282           bw: measured bandwidth available
1283
1284           If we choose 1), when switch happens at the end of current
1285           segment, our buffering will be
1286                  B + X - X * bw0 / bw
1287
1288           If we choose 2), when switch happens where we aborted current
1289           segment, our buffering will be
1290                  B - (T - X) * bw1 / bw
1291
1292           We should only choose 1) if
1293                  X/T < bw1 / (bw1 + bw0 - bw)
1294        */
1295
1296        // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1297        // our estimate could be far off, and fetching old bandwidth could
1298        // take too long.
1299        if (!mLastBandwidthStable) {
1300            return 0.0f;
1301        }
1302
1303        // Taking the measured current bandwidth at 50% face value only,
1304        // as our bandwidth estimation is a lagging indicator. Being
1305        // conservative on this, we prefer switching to lower bandwidth
1306        // unless we're really confident finishing up the last segment
1307        // of higher bandwidth will be fast.
1308        CHECK(mLastBandwidthBps >= 0);
1309        abortThreshold =
1310                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1311             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1312              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1313              - (float)mLastBandwidthBps * 0.5f);
1314        if (abortThreshold < 0.0f) {
1315            abortThreshold = -1.0f; // do not abort
1316        }
1317        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1318                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1319                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1320                mLastBandwidthBps,
1321                abortThreshold);
1322    }
1323    return abortThreshold;
1324}
1325
1326void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1327    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1328}
1329
1330ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1331    for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1332        if (isBandwidthValid(mBandwidthItems[index])) {
1333            return index;
1334        }
1335    }
1336    // if playlists are all blacklisted, return 0 and hope it's alive
1337    return 0;
1338}
1339
1340size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1341    if (mBandwidthItems.size() < 2) {
1342        // shouldn't be here if we only have 1 bandwidth, check
1343        // logic to get rid of redundant bandwidth polling
1344        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1345        return 0;
1346    }
1347
1348#if 1
1349    char value[PROPERTY_VALUE_MAX];
1350    ssize_t index = -1;
1351    if (property_get("media.httplive.bw-index", value, NULL)) {
1352        char *end;
1353        index = strtol(value, &end, 10);
1354        CHECK(end > value && *end == '\0');
1355
1356        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1357            index = mBandwidthItems.size() - 1;
1358        }
1359    }
1360
1361    if (index < 0) {
1362        char value[PROPERTY_VALUE_MAX];
1363        if (property_get("media.httplive.max-bw", value, NULL)) {
1364            char *end;
1365            long maxBw = strtoul(value, &end, 10);
1366            if (end > value && *end == '\0') {
1367                if (maxBw > 0 && bandwidthBps > maxBw) {
1368                    ALOGV("bandwidth capped to %ld bps", maxBw);
1369                    bandwidthBps = maxBw;
1370                }
1371            }
1372        }
1373
1374        // Pick the highest bandwidth stream that's not currently blacklisted
1375        // below or equal to estimated bandwidth.
1376
1377        index = mBandwidthItems.size() - 1;
1378        ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1379        while (index > lowestBandwidth) {
1380            // be conservative (70%) to avoid overestimating and immediately
1381            // switching down again.
1382            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1383            const BandwidthItem &item = mBandwidthItems[index];
1384            if (item.mBandwidth <= adjustedBandwidthBps
1385                    && isBandwidthValid(item)) {
1386                break;
1387            }
1388            --index;
1389        }
1390    }
1391#elif 0
1392    // Change bandwidth at random()
1393    size_t index = uniformRand() * mBandwidthItems.size();
1394#elif 0
1395    // There's a 50% chance to stay on the current bandwidth and
1396    // a 50% chance to switch to the next higher bandwidth (wrapping around
1397    // to lowest)
1398    const size_t kMinIndex = 0;
1399
1400    static ssize_t mCurBandwidthIndex = -1;
1401
1402    size_t index;
1403    if (mCurBandwidthIndex < 0) {
1404        index = kMinIndex;
1405    } else if (uniformRand() < 0.5) {
1406        index = (size_t)mCurBandwidthIndex;
1407    } else {
1408        index = mCurBandwidthIndex + 1;
1409        if (index == mBandwidthItems.size()) {
1410            index = kMinIndex;
1411        }
1412    }
1413    mCurBandwidthIndex = index;
1414#elif 0
1415    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1416
1417    size_t index = mBandwidthItems.size() - 1;
1418    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1419        --index;
1420    }
1421#elif 1
1422    char value[PROPERTY_VALUE_MAX];
1423    size_t index;
1424    if (property_get("media.httplive.bw-index", value, NULL)) {
1425        char *end;
1426        index = strtoul(value, &end, 10);
1427        CHECK(end > value && *end == '\0');
1428
1429        if (index >= mBandwidthItems.size()) {
1430            index = mBandwidthItems.size() - 1;
1431        }
1432    } else {
1433        index = 0;
1434    }
1435#else
1436    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1437#endif
1438
1439    CHECK_GE(index, 0);
1440
1441    return index;
1442}
1443
1444HLSTime LiveSession::latestMediaSegmentStartTime() const {
1445    HLSTime audioTime(mPacketSources.valueFor(
1446                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1447
1448    HLSTime videoTime(mPacketSources.valueFor(
1449                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1450
1451    return audioTime < videoTime ? videoTime : audioTime;
1452}
1453
1454void LiveSession::onSeek(const sp<AMessage> &msg) {
1455    int64_t timeUs;
1456    int32_t mode;
1457    CHECK(msg->findInt64("timeUs", &timeUs));
1458    CHECK(msg->findInt32("mode", &mode));
1459    // TODO: add "mode" to changeConfiguration.
1460    changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1461}
1462
1463status_t LiveSession::getDuration(int64_t *durationUs) const {
1464    int64_t maxDurationUs = -1ll;
1465    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1466        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1467
1468        if (fetcherDurationUs > maxDurationUs) {
1469            maxDurationUs = fetcherDurationUs;
1470        }
1471    }
1472
1473    *durationUs = maxDurationUs;
1474
1475    return OK;
1476}
1477
1478bool LiveSession::isSeekable() const {
1479    int64_t durationUs;
1480    return getDuration(&durationUs) == OK && durationUs >= 0;
1481}
1482
1483bool LiveSession::hasDynamicDuration() const {
1484    return false;
1485}
1486
1487size_t LiveSession::getTrackCount() const {
1488    if (mPlaylist == NULL) {
1489        return 0;
1490    } else {
1491        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1492    }
1493}
1494
1495sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1496    if (mPlaylist == NULL) {
1497        return NULL;
1498    } else {
1499        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1500            sp<AMessage> format = new AMessage();
1501            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1502            format->setString("language", "und");
1503            format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1504            return format;
1505        }
1506        return mPlaylist->getTrackInfo(trackIndex);
1507    }
1508}
1509
1510status_t LiveSession::selectTrack(size_t index, bool select) {
1511    if (mPlaylist == NULL) {
1512        return INVALID_OPERATION;
1513    }
1514
1515    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1516            index, select, mSubtitleGeneration);
1517
1518    ++mSubtitleGeneration;
1519    status_t err = mPlaylist->selectTrack(index, select);
1520    if (err == OK) {
1521        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1522        msg->setInt32("pickTrack", select);
1523        msg->post();
1524    }
1525    return err;
1526}
1527
1528ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1529    if (mPlaylist == NULL) {
1530        return -1;
1531    } else {
1532        return mPlaylist->getSelectedTrack(type);
1533    }
1534}
1535
1536void LiveSession::changeConfiguration(
1537        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1538    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1539          (long long)timeUs, bandwidthIndex, pickTrack);
1540
1541    cancelBandwidthSwitch();
1542
1543    CHECK(!mReconfigurationInProgress);
1544    mReconfigurationInProgress = true;
1545    if (bandwidthIndex >= 0) {
1546        mOrigBandwidthIndex = mCurBandwidthIndex;
1547        mCurBandwidthIndex = bandwidthIndex;
1548        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1549            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1550                    mOrigBandwidthIndex, mCurBandwidthIndex);
1551        }
1552    }
1553    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1554    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1555
1556    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1557    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1558
1559    AString URIs[kMaxStreams];
1560    for (size_t i = 0; i < kMaxStreams; ++i) {
1561        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1562            streamMask |= indexToType(i);
1563        }
1564    }
1565
1566    // Step 1, stop and discard fetchers that are no longer needed.
1567    // Pause those that we'll reuse.
1568    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1569        // skip fetchers that are marked mToBeRemoved,
1570        // these are done and can't be reused
1571        if (mFetcherInfos[i].mToBeRemoved) {
1572            continue;
1573        }
1574
1575        const AString &uri = mFetcherInfos.keyAt(i);
1576        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1577
1578        bool discardFetcher = true, delayRemoval = false;
1579        for (size_t j = 0; j < kMaxStreams; ++j) {
1580            StreamType type = indexToType(j);
1581            if ((streamMask & type) && uri == URIs[j]) {
1582                resumeMask |= type;
1583                streamMask &= ~type;
1584                discardFetcher = false;
1585            }
1586        }
1587        // Delay fetcher removal if not picking tracks, AND old fetcher
1588        // has stream mask that overlaps new variant. (Okay to discard
1589        // old fetcher now, if completely no overlap.)
1590        if (discardFetcher && timeUs < 0ll && !pickTrack
1591                && (fetcher->getStreamTypeMask() & streamMask)) {
1592            discardFetcher = false;
1593            delayRemoval = true;
1594        }
1595
1596        if (discardFetcher) {
1597            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1598            fetcher->stopAsync();
1599        } else {
1600            float threshold = 0.0f; // default to pause after current block (47Kbytes)
1601            bool disconnect = false;
1602            if (timeUs >= 0ll) {
1603                // seeking, no need to finish fetching
1604                disconnect = true;
1605            } else if (delayRemoval) {
1606                // adapting, abort if remaining of current segment is over threshold
1607                threshold = getAbortThreshold(
1608                        mOrigBandwidthIndex, mCurBandwidthIndex);
1609            }
1610
1611            ALOGV("pausing fetcher-%d, threshold=%.2f",
1612                    fetcher->getFetcherID(), threshold);
1613            fetcher->pauseAsync(threshold, disconnect);
1614        }
1615    }
1616
1617    sp<AMessage> msg;
1618    if (timeUs < 0ll) {
1619        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1620        msg = new AMessage(kWhatChangeConfiguration3, this);
1621    } else {
1622        msg = new AMessage(kWhatChangeConfiguration2, this);
1623    }
1624    msg->setInt32("streamMask", streamMask);
1625    msg->setInt32("resumeMask", resumeMask);
1626    msg->setInt32("pickTrack", pickTrack);
1627    msg->setInt64("timeUs", timeUs);
1628    for (size_t i = 0; i < kMaxStreams; ++i) {
1629        if ((streamMask | resumeMask) & indexToType(i)) {
1630            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1631        }
1632    }
1633
1634    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1635    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1636    // fetchers have completed their asynchronous operation, we'll post
1637    // mContinuation, which then is handled below in onChangeConfiguration2.
1638    mContinuationCounter = mFetcherInfos.size();
1639    mContinuation = msg;
1640
1641    if (mContinuationCounter == 0) {
1642        msg->post();
1643    }
1644}
1645
1646void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1647    ALOGV("onChangeConfiguration");
1648
1649    if (!mReconfigurationInProgress) {
1650        int32_t pickTrack = 0;
1651        msg->findInt32("pickTrack", &pickTrack);
1652        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1653    } else {
1654        msg->post(1000000ll); // retry in 1 sec
1655    }
1656}
1657
1658void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1659    ALOGV("onChangeConfiguration2");
1660
1661    mContinuation.clear();
1662
1663    // All fetchers are either suspended or have been removed now.
1664
1665    // If we're seeking, clear all packet sources before we report
1666    // seek complete, to prevent decoder from pulling stale data.
1667    int64_t timeUs;
1668    CHECK(msg->findInt64("timeUs", &timeUs));
1669
1670    if (timeUs >= 0) {
1671        mLastSeekTimeUs = timeUs;
1672        mLastDequeuedTimeUs = timeUs;
1673
1674        for (size_t i = 0; i < mPacketSources.size(); i++) {
1675            sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1676            sp<MetaData> format = packetSource->getFormat();
1677            packetSource->clear();
1678            // Set a tentative format here such that HTTPLiveSource will always have
1679            // a format available when NuPlayer queries. Without an available video
1680            // format when setting a surface NuPlayer might disable video decoding
1681            // altogether. The tentative format will be overwritten by the
1682            // authoritative (and possibly same) format once content from the new
1683            // position is dequeued.
1684            packetSource->setFormat(format);
1685        }
1686
1687        for (size_t i = 0; i < kMaxStreams; ++i) {
1688            mStreams[i].reset();
1689        }
1690
1691        mDiscontinuityOffsetTimesUs.clear();
1692        mDiscontinuityAbsStartTimesUs.clear();
1693
1694        if (mSeekReplyID != NULL) {
1695            CHECK(mSeekReply != NULL);
1696            mSeekReply->setInt32("err", OK);
1697            mSeekReply->postReply(mSeekReplyID);
1698            mSeekReplyID.clear();
1699            mSeekReply.clear();
1700        }
1701
1702        // restart buffer polling after seek becauese previous
1703        // buffering position is no longer valid.
1704        restartPollBuffering();
1705    }
1706
1707    uint32_t streamMask, resumeMask;
1708    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1709    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1710
1711    streamMask |= resumeMask;
1712
1713    AString URIs[kMaxStreams];
1714    for (size_t i = 0; i < kMaxStreams; ++i) {
1715        if (streamMask & indexToType(i)) {
1716            const AString &uriKey = mStreams[i].uriKey();
1717            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1718            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1719        }
1720    }
1721
1722    uint32_t changedMask = 0;
1723    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1724        // stream URI could change even if onChangeConfiguration2 is only
1725        // used for seek. Seek could happen during a bw switch, in this
1726        // case bw switch will be cancelled, but the seekTo position will
1727        // fetch from the new URI.
1728        if ((mStreamMask & streamMask & indexToType(i))
1729                && !mStreams[i].mUri.empty()
1730                && !(URIs[i] == mStreams[i].mUri)) {
1731            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1732                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1733            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1734            if (source->getLatestDequeuedMeta() != NULL) {
1735                source->queueDiscontinuity(
1736                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1737            }
1738        }
1739        // Determine which decoders to shutdown on the player side,
1740        // a decoder has to be shutdown if its streamtype was active
1741        // before but now longer isn't.
1742        if ((mStreamMask & ~streamMask & indexToType(i))) {
1743            changedMask |= indexToType(i);
1744        }
1745    }
1746
1747    if (changedMask == 0) {
1748        // If nothing changed as far as the audio/video decoders
1749        // are concerned we can proceed.
1750        onChangeConfiguration3(msg);
1751        return;
1752    }
1753
1754    // Something changed, inform the player which will shutdown the
1755    // corresponding decoders and will post the reply once that's done.
1756    // Handling the reply will continue executing below in
1757    // onChangeConfiguration3.
1758    sp<AMessage> notify = mNotify->dup();
1759    notify->setInt32("what", kWhatStreamsChanged);
1760    notify->setInt32("changedMask", changedMask);
1761
1762    msg->setWhat(kWhatChangeConfiguration3);
1763    msg->setTarget(this);
1764
1765    notify->setMessage("reply", msg);
1766    notify->post();
1767}
1768
1769void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1770    mContinuation.clear();
1771    // All remaining fetchers are still suspended, the player has shutdown
1772    // any decoders that needed it.
1773
1774    uint32_t streamMask, resumeMask;
1775    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1776    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1777
1778    mNewStreamMask = streamMask | resumeMask;
1779
1780    int64_t timeUs;
1781    int32_t pickTrack;
1782    bool switching = false;
1783    CHECK(msg->findInt64("timeUs", &timeUs));
1784    CHECK(msg->findInt32("pickTrack", &pickTrack));
1785
1786    if (timeUs < 0ll) {
1787        if (!pickTrack) {
1788            // mSwapMask contains streams that are in both old and new variant,
1789            // (in mNewStreamMask & mStreamMask) but with different URIs
1790            // (not in resumeMask).
1791            // For example, old variant has video and audio in two separate
1792            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1793            // should be 0 as there is nothing to swap. We only need to stop video,
1794            // and resume audio.
1795            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1796            switching = (mSwapMask != 0);
1797        }
1798        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1799    } else {
1800        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1801    }
1802
1803    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1804            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1805            (long long)timeUs, switching, pickTrack,
1806            mStreamMask, mNewStreamMask, mSwapMask);
1807
1808    for (size_t i = 0; i < kMaxStreams; ++i) {
1809        if (streamMask & indexToType(i)) {
1810            if (switching) {
1811                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1812            } else {
1813                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1814            }
1815        }
1816    }
1817
1818    // Of all existing fetchers:
1819    // * Resume fetchers that are still needed and assign them original packet sources.
1820    // * Mark otherwise unneeded fetchers for removal.
1821    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1822    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1823        const AString &uri = mFetcherInfos.keyAt(i);
1824        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1825            ALOGV("marking fetcher-%d to be removed",
1826                    mFetcherInfos[i].mFetcher->getFetcherID());
1827
1828            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1829        }
1830    }
1831
1832    // streamMask now only contains the types that need a new fetcher created.
1833    if (streamMask != 0) {
1834        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1835    }
1836
1837    // Find out when the original fetchers have buffered up to and start the new fetchers
1838    // at a later timestamp.
1839    for (size_t i = 0; i < kMaxStreams; i++) {
1840        if (!(indexToType(i) & streamMask)) {
1841            continue;
1842        }
1843
1844        AString uri;
1845        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1846
1847        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1848        CHECK(fetcher != NULL);
1849
1850        HLSTime startTime;
1851        SeekMode seekMode = kSeekModeExactPosition;
1852        sp<AnotherPacketSource> sources[kNumSources];
1853
1854        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1855            startTime = latestMediaSegmentStartTime();
1856        }
1857
1858        // TRICKY: looping from i as earlier streams are already removed from streamMask
1859        for (size_t j = i; j < kMaxStreams; ++j) {
1860            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1861            if ((streamMask & indexToType(j)) && uri == streamUri) {
1862                sources[j] = mPacketSources.valueFor(indexToType(j));
1863
1864                if (timeUs >= 0) {
1865                    startTime.mTimeUs = timeUs;
1866                } else {
1867                    int32_t type;
1868                    sp<AMessage> meta;
1869                    if (!switching) {
1870                        // selecting, or adapting but no swap required
1871                        meta = sources[j]->getLatestDequeuedMeta();
1872                    } else {
1873                        // adapting and swap required
1874                        meta = sources[j]->getLatestEnqueuedMeta();
1875                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1876                            // switching up
1877                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1878                        }
1879                    }
1880
1881                    if ((j == kAudioIndex || j == kVideoIndex)
1882                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1883                        HLSTime tmpTime(meta);
1884                        if (startTime < tmpTime) {
1885                            startTime = tmpTime;
1886                        }
1887                    }
1888
1889                    if (!switching) {
1890                        // selecting, or adapting but no swap required
1891                        sources[j]->clear();
1892                        if (j == kSubtitleIndex) {
1893                            break;
1894                        }
1895
1896                        ALOGV("stream[%zu]: queue format change", j);
1897                        sources[j]->queueDiscontinuity(
1898                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1899                    } else {
1900                        // switching, queue discontinuities after resume
1901                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1902                        sources[j]->clear();
1903                        // the new fetcher might be providing streams that used to be
1904                        // provided by two different fetchers,  if one of the fetcher
1905                        // paused in the middle while the other somehow paused in next
1906                        // seg, we have to start from next seg.
1907                        if (seekMode < mStreams[j].mSeekMode) {
1908                            seekMode = mStreams[j].mSeekMode;
1909                        }
1910                    }
1911                }
1912
1913                streamMask &= ~indexToType(j);
1914            }
1915        }
1916
1917        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1918                "segmentStartTimeUs %lld seekMode %d",
1919                fetcher->getFetcherID(),
1920                (long long)startTime.mTimeUs,
1921                (long long)mLastSeekTimeUs,
1922                (long long)startTime.getSegmentTimeUs(),
1923                seekMode);
1924
1925        // Set the target segment start time to the middle point of the
1926        // segment where the last sample was.
1927        // This gives a better guess if segments of the two variants are not
1928        // perfectly aligned. (If the corresponding segment in new variant
1929        // starts slightly later than that in the old variant, we still want
1930        // to pick that segment, not the one before)
1931        fetcher->startAsync(
1932                sources[kAudioIndex],
1933                sources[kVideoIndex],
1934                sources[kSubtitleIndex],
1935                getMetadataSource(sources, mNewStreamMask, switching),
1936                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1937                startTime.getSegmentTimeUs(),
1938                startTime.mSeq,
1939                seekMode);
1940    }
1941
1942    // All fetchers have now been started, the configuration change
1943    // has completed.
1944
1945    mReconfigurationInProgress = false;
1946    if (switching) {
1947        mSwitchInProgress = true;
1948    } else {
1949        mStreamMask = mNewStreamMask;
1950        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1951            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1952                    mOrigBandwidthIndex, mCurBandwidthIndex);
1953            mOrigBandwidthIndex = mCurBandwidthIndex;
1954        }
1955    }
1956
1957    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1958            mSwitchInProgress, mStreamMask);
1959
1960    if (mDisconnectReplyID != NULL) {
1961        finishDisconnect();
1962    }
1963}
1964
1965void LiveSession::swapPacketSource(StreamType stream) {
1966    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1967
1968    // transfer packets from source2 to source
1969    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1970    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1971
1972    // queue discontinuity in mPacketSource
1973    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1974
1975    // queue packets in mPacketSource2 to mPacketSource
1976    status_t finalResult = OK;
1977    sp<ABuffer> accessUnit;
1978    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1979          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1980        aps->queueAccessUnit(accessUnit);
1981    }
1982    aps2->clear();
1983}
1984
1985void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1986    if (!mSwitchInProgress) {
1987        return;
1988    }
1989
1990    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1991    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1992        return;
1993    }
1994
1995    // Swap packet source of streams provided by old variant
1996    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1997        StreamType stream = indexToType(idx);
1998        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1999            swapPacketSource(stream);
2000
2001            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2002                ALOGW("swapping stream type %d %s to empty stream",
2003                        stream, mStreams[idx].mUri.c_str());
2004            }
2005            mStreams[idx].mUri = mStreams[idx].mNewUri;
2006            mStreams[idx].mNewUri.clear();
2007
2008            mSwapMask &= ~stream;
2009        }
2010    }
2011
2012    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2013
2014    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2015    if (mSwapMask != 0) {
2016        return;
2017    }
2018
2019    // Check if new variant contains extra streams.
2020    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2021    while (extraStreams) {
2022        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2023        extraStreams &= ~stream;
2024
2025        swapPacketSource(stream);
2026
2027        ssize_t idx = typeToIndex(stream);
2028        CHECK(idx >= 0);
2029        if (mStreams[idx].mNewUri.empty()) {
2030            ALOGW("swapping extra stream type %d %s to empty stream",
2031                    stream, mStreams[idx].mUri.c_str());
2032        }
2033        mStreams[idx].mUri = mStreams[idx].mNewUri;
2034        mStreams[idx].mNewUri.clear();
2035    }
2036
2037    // Restart new fetcher (it was paused after the first 47k block)
2038    // and let it fetch into mPacketSources (not mPacketSources2)
2039    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2040        FetcherInfo &info = mFetcherInfos.editValueAt(i);
2041        if (info.mToBeResumed) {
2042            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2043            info.mToBeResumed = false;
2044        }
2045    }
2046
2047    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2048            mOrigBandwidthIndex, mCurBandwidthIndex);
2049
2050    mStreamMask = mNewStreamMask;
2051    mSwitchInProgress = false;
2052    mOrigBandwidthIndex = mCurBandwidthIndex;
2053
2054    restartPollBuffering();
2055}
2056
2057void LiveSession::schedulePollBuffering() {
2058    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2059    msg->setInt32("generation", mPollBufferingGeneration);
2060    msg->post(1000000ll);
2061}
2062
2063void LiveSession::cancelPollBuffering() {
2064    ++mPollBufferingGeneration;
2065    mPrevBufferPercentage = -1;
2066}
2067
2068void LiveSession::restartPollBuffering() {
2069    cancelPollBuffering();
2070    onPollBuffering();
2071}
2072
2073void LiveSession::onPollBuffering() {
2074    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2075            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2076        mSwitchInProgress, mReconfigurationInProgress,
2077        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2078
2079    bool underflow, ready, down, up;
2080    if (checkBuffering(underflow, ready, down, up)) {
2081        if (mInPreparationPhase) {
2082            // Allow down switch even if we're still preparing.
2083            //
2084            // Some streams have a high bandwidth index as default,
2085            // when bandwidth is low, it takes a long time to buffer
2086            // to ready mark, then it immediately pauses after start
2087            // as we have to do a down switch. It's better experience
2088            // to restart from a lower index, if we detect low bw.
2089            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2090                postPrepared(OK);
2091            }
2092        }
2093
2094        if (!mInPreparationPhase) {
2095            if (ready) {
2096                stopBufferingIfNecessary();
2097            } else if (underflow) {
2098                startBufferingIfNecessary();
2099            }
2100            switchBandwidthIfNeeded(up, down);
2101        }
2102    }
2103
2104    schedulePollBuffering();
2105}
2106
2107void LiveSession::cancelBandwidthSwitch(bool resume) {
2108    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2109            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2110    if (!mSwitchInProgress) {
2111        return;
2112    }
2113
2114    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2115        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2116        if (info.mToBeRemoved) {
2117            info.mToBeRemoved = false;
2118            if (resume) {
2119                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2120            }
2121        }
2122    }
2123
2124    for (size_t i = 0; i < kMaxStreams; ++i) {
2125        AString newUri = mStreams[i].mNewUri;
2126        if (!newUri.empty()) {
2127            // clear all mNewUri matching this newUri
2128            for (size_t j = i; j < kMaxStreams; ++j) {
2129                if (mStreams[j].mNewUri == newUri) {
2130                    mStreams[j].mNewUri.clear();
2131                }
2132            }
2133            ALOGV("stopping newUri = %s", newUri.c_str());
2134            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2135            if (index < 0) {
2136                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2137                continue;
2138            }
2139            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2140            info.mToBeRemoved = true;
2141            info.mFetcher->stopAsync();
2142        }
2143    }
2144
2145    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2146            mOrigBandwidthIndex, mCurBandwidthIndex);
2147
2148    mSwitchGeneration++;
2149    mSwitchInProgress = false;
2150    mCurBandwidthIndex = mOrigBandwidthIndex;
2151    mSwapMask = 0;
2152}
2153
2154bool LiveSession::checkBuffering(
2155        bool &underflow, bool &ready, bool &down, bool &up) {
2156    underflow = ready = down = up = false;
2157
2158    if (mReconfigurationInProgress) {
2159        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2160        return false;
2161    }
2162
2163    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2164    activeCount = underflowCount = readyCount = downCount = upCount =0;
2165    int32_t minBufferPercent = -1;
2166    int64_t durationUs;
2167    if (getDuration(&durationUs) != OK) {
2168        durationUs = -1;
2169    }
2170    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2171        // we don't check subtitles for buffering level
2172        if (!(mStreamMask & mPacketSources.keyAt(i)
2173                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2174            continue;
2175        }
2176        // ignore streams that never had any packet queued.
2177        // (it's possible that the variant only has audio or video)
2178        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2179        if (meta == NULL) {
2180            continue;
2181        }
2182
2183        status_t finalResult;
2184        int64_t bufferedDurationUs =
2185                mPacketSources[i]->getBufferedDurationUs(&finalResult);
2186        ALOGV("[%s] buffered %lld us",
2187                getNameForStream(mPacketSources.keyAt(i)),
2188                (long long)bufferedDurationUs);
2189        if (durationUs >= 0) {
2190            int32_t percent;
2191            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2192                percent = 100;
2193            } else {
2194                percent = (int32_t)(100.0 *
2195                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2196            }
2197            if (minBufferPercent < 0 || percent < minBufferPercent) {
2198                minBufferPercent = percent;
2199            }
2200        }
2201
2202        ++activeCount;
2203        int64_t readyMarkUs =
2204            (mInPreparationPhase ?
2205                mBufferingSettings.mInitialWatermarkMs :
2206                mBufferingSettings.mRebufferingWatermarkHighMs) * 1000ll;
2207        if (bufferedDurationUs > readyMarkUs
2208                || mPacketSources[i]->isFinished(0)) {
2209            ++readyCount;
2210        }
2211        if (!mPacketSources[i]->isFinished(0)) {
2212            if (bufferedDurationUs < mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll) {
2213                ++underflowCount;
2214            }
2215            if (bufferedDurationUs > mUpSwitchMark) {
2216                ++upCount;
2217            }
2218            if (bufferedDurationUs < mDownSwitchMark) {
2219                ++downCount;
2220            }
2221        }
2222    }
2223
2224    if (minBufferPercent >= 0) {
2225        notifyBufferingUpdate(minBufferPercent);
2226    }
2227
2228    if (activeCount > 0) {
2229        up        = (upCount == activeCount);
2230        down      = (downCount > 0);
2231        ready     = (readyCount == activeCount);
2232        underflow = (underflowCount > 0);
2233        return true;
2234    }
2235
2236    return false;
2237}
2238
2239void LiveSession::startBufferingIfNecessary() {
2240    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2241            mInPreparationPhase, mBuffering);
2242    if (!mBuffering) {
2243        mBuffering = true;
2244
2245        sp<AMessage> notify = mNotify->dup();
2246        notify->setInt32("what", kWhatBufferingStart);
2247        notify->post();
2248    }
2249}
2250
2251void LiveSession::stopBufferingIfNecessary() {
2252    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2253            mInPreparationPhase, mBuffering);
2254
2255    if (mBuffering) {
2256        mBuffering = false;
2257
2258        sp<AMessage> notify = mNotify->dup();
2259        notify->setInt32("what", kWhatBufferingEnd);
2260        notify->post();
2261    }
2262}
2263
2264void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2265    if (percentage < mPrevBufferPercentage) {
2266        percentage = mPrevBufferPercentage;
2267    } else if (percentage > 100) {
2268        percentage = 100;
2269    }
2270
2271    mPrevBufferPercentage = percentage;
2272
2273    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2274
2275    sp<AMessage> notify = mNotify->dup();
2276    notify->setInt32("what", kWhatBufferingUpdate);
2277    notify->setInt32("percentage", percentage);
2278    notify->post();
2279}
2280
2281bool LiveSession::tryBandwidthFallback() {
2282    if (mInPreparationPhase || mReconfigurationInProgress) {
2283        // Don't try fallback during prepare or reconfig.
2284        // If error happens there, it's likely unrecoverable.
2285        return false;
2286    }
2287    if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2288        // if we're switching up, simply cancel and resume old variant
2289        cancelBandwidthSwitch(true /* resume */);
2290        return true;
2291    } else {
2292        // if we're switching down, we're likely about to underflow (if
2293        // not already underflowing). try the lowest viable bandwidth if
2294        // not on that variant already.
2295        ssize_t lowestValid = getLowestValidBandwidthIndex();
2296        if (mCurBandwidthIndex > lowestValid) {
2297            cancelBandwidthSwitch();
2298            changeConfiguration(-1ll, lowestValid);
2299            return true;
2300        }
2301    }
2302    // return false if we couldn't find any fallback
2303    return false;
2304}
2305
2306/*
2307 * returns true if a bandwidth switch is actually needed (and started),
2308 * returns false otherwise
2309 */
2310bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2311    // no need to check bandwidth if we only have 1 bandwidth settings
2312    if (mBandwidthItems.size() < 2) {
2313        return false;
2314    }
2315
2316    if (mSwitchInProgress) {
2317        if (mBuffering) {
2318            tryBandwidthFallback();
2319        }
2320        return false;
2321    }
2322
2323    int32_t bandwidthBps, shortTermBps;
2324    bool isStable;
2325    if (mBandwidthEstimator->estimateBandwidth(
2326            &bandwidthBps, &isStable, &shortTermBps)) {
2327        ALOGV("bandwidth estimated at %.2f kbps, "
2328                "stable %d, shortTermBps %.2f kbps",
2329                bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2330        mLastBandwidthBps = bandwidthBps;
2331        mLastBandwidthStable = isStable;
2332    } else {
2333        ALOGV("no bandwidth estimate.");
2334        return false;
2335    }
2336
2337    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2338    // canSwithDown and canSwitchUp can't both be true.
2339    // we only want to switch up when measured bw is 120% higher than current variant,
2340    // and we only want to switch down when measured bw is below current variant.
2341    bool canSwitchDown = bufferLow
2342            && (bandwidthBps < (int32_t)curBandwidth);
2343    bool canSwitchUp = bufferHigh
2344            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2345
2346    if (canSwitchDown || canSwitchUp) {
2347        // bandwidth estimating has some delay, if we have to downswitch when
2348        // it hasn't stabilized, use the short term to guess real bandwidth,
2349        // since it may be dropping too fast.
2350        // (note this doesn't apply to upswitch, always use longer average there)
2351        if (!isStable && canSwitchDown) {
2352            if (shortTermBps < bandwidthBps) {
2353                bandwidthBps = shortTermBps;
2354            }
2355        }
2356
2357        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2358
2359        // it's possible that we're checking for canSwitchUp case, but the returned
2360        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2361        // of measured bw. In that case we don't want to do anything, since we have
2362        // both enough buffer and enough bw.
2363        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2364         || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2365            // if not yet prepared, just restart again with new bw index.
2366            // this is faster and playback experience is cleaner.
2367            changeConfiguration(
2368                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2369            return true;
2370        }
2371    }
2372    return false;
2373}
2374
2375void LiveSession::postError(status_t err) {
2376    // if we reached EOS, notify buffering of 100%
2377    if (err == ERROR_END_OF_STREAM) {
2378        notifyBufferingUpdate(100);
2379    }
2380    // we'll stop buffer polling now, before that notify
2381    // stop buffering to stop the spinning icon
2382    stopBufferingIfNecessary();
2383    cancelPollBuffering();
2384
2385    sp<AMessage> notify = mNotify->dup();
2386    notify->setInt32("what", kWhatError);
2387    notify->setInt32("err", err);
2388    notify->post();
2389}
2390
2391void LiveSession::postPrepared(status_t err) {
2392    CHECK(mInPreparationPhase);
2393
2394    sp<AMessage> notify = mNotify->dup();
2395    if (err == OK || err == ERROR_END_OF_STREAM) {
2396        notify->setInt32("what", kWhatPrepared);
2397    } else {
2398        cancelPollBuffering();
2399
2400        notify->setInt32("what", kWhatPreparationFailed);
2401        notify->setInt32("err", err);
2402    }
2403
2404    notify->post();
2405
2406    mInPreparationPhase = false;
2407}
2408
2409
2410}  // namespace android
2411
2412