1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22#include "HTTPDownloader.h"
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "mpeg2ts/AnotherPacketSource.h"
27
28#include <cutils/properties.h>
29#include <media/MediaHTTPService.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/foundation/AUtils.h>
34#include <media/stagefright/MediaDefs.h>
35#include <media/stagefright/MetaData.h>
36#include <media/stagefright/Utils.h>
37
38#include <utils/Mutex.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42
43namespace android {
44
45// static
46// Bandwidth Switch Mark Defaults
47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52//TODO: redefine this mark to a fair value
53// default buffer underflow mark
54static const int kUnderflowMarkMs = 1000;  // 1 second
55
56struct LiveSession::BandwidthEstimator : public RefBase {
57    BandwidthEstimator();
58
59    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
60    bool estimateBandwidth(
61            int32_t *bandwidth,
62            bool *isStable = NULL,
63            int32_t *shortTermBps = NULL);
64
65private:
66    // Bandwidth estimation parameters
67    static const int32_t kShortTermBandwidthItems = 3;
68    static const int32_t kMinBandwidthHistoryItems = 20;
69    static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
70    static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
71    static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
72
73    struct BandwidthEntry {
74        int64_t mTimestampUs;
75        int64_t mDelayUs;
76        size_t mNumBytes;
77    };
78
79    Mutex mLock;
80    List<BandwidthEntry> mBandwidthHistory;
81    List<int32_t> mPrevEstimates;
82    int32_t mShortTermEstimate;
83    bool mHasNewSample;
84    bool mIsStable;
85    int64_t mTotalTransferTimeUs;
86    size_t mTotalTransferBytes;
87
88    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
89};
90
91LiveSession::BandwidthEstimator::BandwidthEstimator() :
92    mShortTermEstimate(0),
93    mHasNewSample(false),
94    mIsStable(true),
95    mTotalTransferTimeUs(0),
96    mTotalTransferBytes(0) {
97}
98
99void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
100        size_t numBytes, int64_t delayUs) {
101    AutoMutex autoLock(mLock);
102
103    int64_t nowUs = ALooper::GetNowUs();
104    BandwidthEntry entry;
105    entry.mTimestampUs = nowUs;
106    entry.mDelayUs = delayUs;
107    entry.mNumBytes = numBytes;
108    mTotalTransferTimeUs += delayUs;
109    mTotalTransferBytes += numBytes;
110    mBandwidthHistory.push_back(entry);
111    mHasNewSample = true;
112
113    // Remove no more than 10% of total transfer time at a time
114    // to avoid sudden jump on bandwidth estimation. There might
115    // be long blocking reads that takes up signification time,
116    // we have to keep a longer window in that case.
117    int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
118    if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
119        bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
120    } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
121        bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
122    }
123    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
124    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
125    while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
126        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
127        // remove sample if either absolute age or total transfer time is
128        // over kMaxBandwidthHistoryWindowUs
129        if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
130                mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
131            break;
132        }
133        mTotalTransferTimeUs -= it->mDelayUs;
134        mTotalTransferBytes -= it->mNumBytes;
135        mBandwidthHistory.erase(mBandwidthHistory.begin());
136    }
137}
138
139bool LiveSession::BandwidthEstimator::estimateBandwidth(
140        int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
141    AutoMutex autoLock(mLock);
142
143    if (mBandwidthHistory.size() < 2) {
144        return false;
145    }
146
147    if (!mHasNewSample) {
148        *bandwidthBps = *(--mPrevEstimates.end());
149        if (isStable) {
150            *isStable = mIsStable;
151        }
152        if (shortTermBps) {
153            *shortTermBps = mShortTermEstimate;
154        }
155        return true;
156    }
157
158    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
159    mPrevEstimates.push_back(*bandwidthBps);
160    while (mPrevEstimates.size() > 3) {
161        mPrevEstimates.erase(mPrevEstimates.begin());
162    }
163    mHasNewSample = false;
164
165    int64_t totalTimeUs = 0;
166    size_t totalBytes = 0;
167    if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
168        List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
169        for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
170            totalTimeUs += it->mDelayUs;
171            totalBytes += it->mNumBytes;
172        }
173    }
174    mShortTermEstimate = totalTimeUs > 0 ?
175            (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
176    if (shortTermBps) {
177        *shortTermBps = mShortTermEstimate;
178    }
179
180    int64_t minEstimate = -1, maxEstimate = -1;
181    List<int32_t>::iterator it;
182    for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
183        int32_t estimate = *it;
184        if (minEstimate < 0 || minEstimate > estimate) {
185            minEstimate = estimate;
186        }
187        if (maxEstimate < 0 || maxEstimate < estimate) {
188            maxEstimate = estimate;
189        }
190    }
191    // consider it stable if long-term average is not jumping a lot
192    // and short-term average is not much lower than long-term average
193    mIsStable = (maxEstimate <= minEstimate * 4 / 3)
194            && mShortTermEstimate > minEstimate * 7 / 10;
195    if (isStable) {
196        *isStable = mIsStable;
197    }
198
199#if 0
200    {
201        char dumpStr[1024] = {0};
202        size_t itemIdx = 0;
203        size_t histSize = mBandwidthHistory.size();
204        sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
205            *bandwidthBps, mIsStable, histSize);
206        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
207        for (; it != mBandwidthHistory.end(); ++it) {
208            if (itemIdx > 50) {
209                sprintf(dumpStr + strlen(dumpStr),
210                        "...(%zd more items)... }", histSize - itemIdx);
211                break;
212            }
213            sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
214                it->mNumBytes / 1024,
215                (double)it->mDelayUs * 1.0e-6,
216                (it == (--mBandwidthHistory.end())) ? "}" : ", ");
217            itemIdx++;
218        }
219        ALOGE(dumpStr);
220    }
221#endif
222    return true;
223}
224
225//static
226const char *LiveSession::getKeyForStream(StreamType type) {
227    switch (type) {
228        case STREAMTYPE_VIDEO:
229            return "timeUsVideo";
230        case STREAMTYPE_AUDIO:
231            return "timeUsAudio";
232        case STREAMTYPE_SUBTITLES:
233            return "timeUsSubtitle";
234        case STREAMTYPE_METADATA:
235            return "timeUsMetadata"; // unused
236        default:
237            TRESPASS();
238    }
239    return NULL;
240}
241
242//static
243const char *LiveSession::getNameForStream(StreamType type) {
244    switch (type) {
245        case STREAMTYPE_VIDEO:
246            return "video";
247        case STREAMTYPE_AUDIO:
248            return "audio";
249        case STREAMTYPE_SUBTITLES:
250            return "subs";
251        case STREAMTYPE_METADATA:
252            return "metadata";
253        default:
254            break;
255    }
256    return "unknown";
257}
258
259//static
260ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
261    switch (type) {
262        case STREAMTYPE_VIDEO:
263            return ATSParser::VIDEO;
264        case STREAMTYPE_AUDIO:
265            return ATSParser::AUDIO;
266        case STREAMTYPE_METADATA:
267            return ATSParser::META;
268        case STREAMTYPE_SUBTITLES:
269        default:
270            TRESPASS();
271    }
272    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
273}
274
275LiveSession::LiveSession(
276        const sp<AMessage> &notify, uint32_t flags,
277        const sp<MediaHTTPService> &httpService)
278    : mNotify(notify),
279      mFlags(flags),
280      mHTTPService(httpService),
281      mBuffering(false),
282      mInPreparationPhase(true),
283      mPollBufferingGeneration(0),
284      mPrevBufferPercentage(-1),
285      mCurBandwidthIndex(-1),
286      mOrigBandwidthIndex(-1),
287      mLastBandwidthBps(-1ll),
288      mLastBandwidthStable(false),
289      mBandwidthEstimator(new BandwidthEstimator()),
290      mMaxWidth(720),
291      mMaxHeight(480),
292      mStreamMask(0),
293      mNewStreamMask(0),
294      mSwapMask(0),
295      mSwitchGeneration(0),
296      mSubtitleGeneration(0),
297      mLastDequeuedTimeUs(0ll),
298      mRealTimeBaseUs(0ll),
299      mReconfigurationInProgress(false),
300      mSwitchInProgress(false),
301      mUpSwitchMark(kUpSwitchMarkUs),
302      mDownSwitchMark(kDownSwitchMarkUs),
303      mUpSwitchMargin(kUpSwitchMarginUs),
304      mFirstTimeUsValid(false),
305      mFirstTimeUs(0),
306      mLastSeekTimeUs(0),
307      mHasMetadata(false) {
308    mStreams[kAudioIndex] = StreamItem("audio");
309    mStreams[kVideoIndex] = StreamItem("video");
310    mStreams[kSubtitleIndex] = StreamItem("subtitles");
311
312    for (size_t i = 0; i < kNumSources; ++i) {
313        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
314        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315    }
316}
317
318LiveSession::~LiveSession() {
319    if (mFetcherLooper != NULL) {
320        mFetcherLooper->stop();
321    }
322}
323
324int64_t LiveSession::calculateMediaTimeUs(
325        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
326    if (timeUs >= firstTimeUs) {
327        timeUs -= firstTimeUs;
328    } else {
329        timeUs = 0;
330    }
331    timeUs += mLastSeekTimeUs;
332    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
333        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
334    }
335    return timeUs;
336}
337
338status_t LiveSession::dequeueAccessUnit(
339        StreamType stream, sp<ABuffer> *accessUnit) {
340    status_t finalResult = OK;
341    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
342
343    ssize_t streamIdx = typeToIndex(stream);
344    if (streamIdx < 0) {
345        return BAD_VALUE;
346    }
347    const char *streamStr = getNameForStream(stream);
348    // Do not let client pull data if we don't have data packets yet.
349    // We might only have a format discontinuity queued without data.
350    // When NuPlayerDecoder dequeues the format discontinuity, it will
351    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
352    // thinks it can do seamless change, so will not shutdown decoder.
353    // When the actual format arrives, it can't handle it and get stuck.
354    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
355        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
356                streamStr, finalResult);
357
358        if (finalResult == OK) {
359            return -EAGAIN;
360        } else {
361            return finalResult;
362        }
363    }
364
365    // Let the client dequeue as long as we have buffers available
366    // Do not make pause/resume decisions here.
367
368    status_t err = packetSource->dequeueAccessUnit(accessUnit);
369
370    if (err == INFO_DISCONTINUITY) {
371        // adaptive streaming, discontinuities in the playlist
372        int32_t type;
373        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
374
375        sp<AMessage> extra;
376        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
377            extra.clear();
378        }
379
380        ALOGI("[%s] read discontinuity of type %d, extra = %s",
381              streamStr,
382              type,
383              extra == NULL ? "NULL" : extra->debugString().c_str());
384    } else if (err == OK) {
385
386        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
387            int64_t timeUs, originalTimeUs;
388            int32_t discontinuitySeq = 0;
389            StreamItem& strm = mStreams[streamIdx];
390            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
391            originalTimeUs = timeUs;
392            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
393            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
394                int64_t offsetTimeUs;
395                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
396                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
397                } else {
398                    offsetTimeUs = 0;
399                }
400
401                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
402                        && strm.mLastDequeuedTimeUs >= 0) {
403                    int64_t firstTimeUs;
404                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
405                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
406                    offsetTimeUs += strm.mLastSampleDurationUs;
407                } else {
408                    offsetTimeUs += strm.mLastSampleDurationUs;
409                }
410
411                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
412                strm.mCurDiscontinuitySeq = discontinuitySeq;
413            }
414
415            int32_t discard = 0;
416            int64_t firstTimeUs;
417            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
418                int64_t durUs; // approximate sample duration
419                if (timeUs > strm.mLastDequeuedTimeUs) {
420                    durUs = timeUs - strm.mLastDequeuedTimeUs;
421                } else {
422                    durUs = strm.mLastDequeuedTimeUs - timeUs;
423                }
424                strm.mLastSampleDurationUs = durUs;
425                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
426            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
427                firstTimeUs = timeUs;
428            } else {
429                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
430                firstTimeUs = timeUs;
431            }
432
433            strm.mLastDequeuedTimeUs = timeUs;
434            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
435
436            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
437                    streamStr, (long long)timeUs, (long long)originalTimeUs);
438            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
439            mLastDequeuedTimeUs = timeUs;
440            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
441        } else if (stream == STREAMTYPE_SUBTITLES) {
442            int32_t subtitleGeneration;
443            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
444                    && subtitleGeneration != mSubtitleGeneration) {
445               return -EAGAIN;
446            };
447            (*accessUnit)->meta()->setInt32(
448                    "trackIndex", mPlaylist->getSelectedIndex());
449            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
450        } else if (stream == STREAMTYPE_METADATA) {
451            HLSTime mdTime((*accessUnit)->meta());
452            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
453                packetSource->requeueAccessUnit((*accessUnit));
454                return -EAGAIN;
455            } else {
456                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
457                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
458                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
459                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
460            }
461        }
462    } else {
463        ALOGI("[%s] encountered error %d", streamStr, err);
464    }
465
466    return err;
467}
468
469status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
470    if (!(mStreamMask & stream)) {
471        return UNKNOWN_ERROR;
472    }
473
474    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
475
476    *meta = packetSource->getFormat();
477
478    if (*meta == NULL) {
479        return -EWOULDBLOCK;
480    }
481
482    if (stream == STREAMTYPE_AUDIO) {
483        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
484        (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
485    } else if (stream == STREAMTYPE_VIDEO) {
486        (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
487        (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
488    }
489
490    return OK;
491}
492
493sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
494    return new HTTPDownloader(mHTTPService, mExtraHeaders);
495}
496
497void LiveSession::setBufferingSettings(
498        const BufferingSettings &buffering) {
499    sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
500    writeToAMessage(msg, buffering);
501    msg->post();
502}
503
504void LiveSession::connectAsync(
505        const char *url, const KeyedVector<String8, String8> *headers) {
506    sp<AMessage> msg = new AMessage(kWhatConnect, this);
507    msg->setString("url", url);
508
509    if (headers != NULL) {
510        msg->setPointer(
511                "headers",
512                new KeyedVector<String8, String8>(*headers));
513    }
514
515    msg->post();
516}
517
518status_t LiveSession::disconnect() {
519    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
520
521    sp<AMessage> response;
522    status_t err = msg->postAndAwaitResponse(&response);
523
524    return err;
525}
526
527status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
528    sp<AMessage> msg = new AMessage(kWhatSeek, this);
529    msg->setInt64("timeUs", timeUs);
530    msg->setInt32("mode", mode);
531
532    sp<AMessage> response;
533    status_t err = msg->postAndAwaitResponse(&response);
534
535    return err;
536}
537
538bool LiveSession::checkSwitchProgress(
539        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
540    AString newUri;
541    CHECK(stopParams->findString("uri", &newUri));
542
543    *needResumeUntil = false;
544    sp<AMessage> firstNewMeta[kMaxStreams];
545    for (size_t i = 0; i < kMaxStreams; ++i) {
546        StreamType stream = indexToType(i);
547        if (!(mSwapMask & mNewStreamMask & stream)
548            || (mStreams[i].mNewUri != newUri)) {
549            continue;
550        }
551        if (stream == STREAMTYPE_SUBTITLES) {
552            continue;
553        }
554        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
555
556        // First, get latest dequeued meta, which is where the decoder is at.
557        // (when upswitching, we take the meta after a certain delay, so that
558        // the decoder is left with some cushion)
559        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
560        if (delayUs > 0) {
561            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
562            if (lastDequeueMeta == NULL) {
563                // this means we don't have enough cushion, try again later
564                ALOGV("[%s] up switching failed due to insufficient buffer",
565                        getNameForStream(stream));
566                return false;
567            }
568        } else {
569            // It's okay for lastDequeueMeta to be NULL here, it means the
570            // decoder hasn't even started dequeueing
571            lastDequeueMeta = source->getLatestDequeuedMeta();
572        }
573        // Then, trim off packets at beginning of mPacketSources2 that's before
574        // the latest dequeued time. These samples are definitely too late.
575        firstNewMeta[i] = mPacketSources2.editValueAt(i)
576                            ->trimBuffersBeforeMeta(lastDequeueMeta);
577
578        // Now firstNewMeta[i] is the first sample after the trim.
579        // If it's NULL, we failed because dequeue already past all samples
580        // in mPacketSource2, we have to try again.
581        if (firstNewMeta[i] == NULL) {
582            HLSTime dequeueTime(lastDequeueMeta);
583            ALOGV("[%s] dequeue time (%d, %lld) past start time",
584                    getNameForStream(stream),
585                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
586            return false;
587        }
588
589        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
590        // already fetched, and see if we need to resumeUntil
591        lastEnqueueMeta = source->getLatestEnqueuedMeta();
592        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
593        // boundary, no need to resume as the content will look different anyways
594        if (lastEnqueueMeta != NULL) {
595            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
596
597            // no need to resume old fetcher if new fetcher started in different
598            // discontinuity sequence, as the content will look different.
599            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
600                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
601
602            // update the stopTime for resumeUntil
603            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
604            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
605        }
606    }
607
608    // if we're here, it means dequeue progress hasn't passed some samples in
609    // mPacketSource2, we can trim off the excess in mPacketSource.
610    // (old fetcher might still need to resumeUntil the start time of new fetcher)
611    for (size_t i = 0; i < kMaxStreams; ++i) {
612        StreamType stream = indexToType(i);
613        if (!(mSwapMask & mNewStreamMask & stream)
614            || (newUri != mStreams[i].mNewUri)
615            || stream == STREAMTYPE_SUBTITLES) {
616            continue;
617        }
618        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
619    }
620
621    // no resumeUntil if already underflow
622    *needResumeUntil &= !mBuffering;
623
624    return true;
625}
626
627void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
628    switch (msg->what()) {
629        case kWhatSetBufferingSettings:
630        {
631            readFromAMessage(msg, &mBufferingSettings);
632            break;
633        }
634
635        case kWhatConnect:
636        {
637            onConnect(msg);
638            break;
639        }
640
641        case kWhatDisconnect:
642        {
643            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
644
645            if (mReconfigurationInProgress) {
646                break;
647            }
648
649            finishDisconnect();
650            break;
651        }
652
653        case kWhatSeek:
654        {
655            if (mReconfigurationInProgress) {
656                msg->post(50000);
657                break;
658            }
659
660            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
661            mSeekReply = new AMessage;
662
663            onSeek(msg);
664            break;
665        }
666
667        case kWhatFetcherNotify:
668        {
669            int32_t what;
670            CHECK(msg->findInt32("what", &what));
671
672            switch (what) {
673                case PlaylistFetcher::kWhatStarted:
674                    break;
675                case PlaylistFetcher::kWhatPaused:
676                case PlaylistFetcher::kWhatStopped:
677                {
678                    AString uri;
679                    CHECK(msg->findString("uri", &uri));
680                    ssize_t index = mFetcherInfos.indexOfKey(uri);
681                    if (index < 0) {
682                        // ignore msgs from fetchers that's already gone
683                        break;
684                    }
685
686                    ALOGV("fetcher-%d %s",
687                            mFetcherInfos[index].mFetcher->getFetcherID(),
688                            what == PlaylistFetcher::kWhatPaused ?
689                                    "paused" : "stopped");
690
691                    if (what == PlaylistFetcher::kWhatStopped) {
692                        mFetcherLooper->unregisterHandler(
693                                mFetcherInfos[index].mFetcher->id());
694                        mFetcherInfos.removeItemsAt(index);
695                    } else if (what == PlaylistFetcher::kWhatPaused) {
696                        int32_t seekMode;
697                        CHECK(msg->findInt32("seekMode", &seekMode));
698                        for (size_t i = 0; i < kMaxStreams; ++i) {
699                            if (mStreams[i].mUri == uri) {
700                                mStreams[i].mSeekMode = (SeekMode) seekMode;
701                            }
702                        }
703                    }
704
705                    if (mContinuation != NULL) {
706                        CHECK_GT(mContinuationCounter, 0u);
707                        if (--mContinuationCounter == 0) {
708                            mContinuation->post();
709                        }
710                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
711                    }
712                    break;
713                }
714
715                case PlaylistFetcher::kWhatDurationUpdate:
716                {
717                    AString uri;
718                    CHECK(msg->findString("uri", &uri));
719
720                    int64_t durationUs;
721                    CHECK(msg->findInt64("durationUs", &durationUs));
722
723                    ssize_t index = mFetcherInfos.indexOfKey(uri);
724                    if (index >= 0) {
725                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
726                        info->mDurationUs = durationUs;
727                    }
728                    break;
729                }
730
731                case PlaylistFetcher::kWhatTargetDurationUpdate:
732                {
733                    int64_t targetDurationUs;
734                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
735                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
736                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
737                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
738                    break;
739                }
740
741                case PlaylistFetcher::kWhatError:
742                {
743                    status_t err;
744                    CHECK(msg->findInt32("err", &err));
745
746                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
747
748                    // handle EOS on subtitle tracks independently
749                    AString uri;
750                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
751                        ssize_t i = mFetcherInfos.indexOfKey(uri);
752                        if (i >= 0) {
753                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
754                            if (fetcher != NULL) {
755                                uint32_t type = fetcher->getStreamTypeMask();
756                                if (type == STREAMTYPE_SUBTITLES) {
757                                    mPacketSources.valueFor(
758                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
759                                    break;
760                                }
761                            }
762                        }
763                    }
764
765                    // remember the failure index (as mCurBandwidthIndex will be restored
766                    // after cancelBandwidthSwitch()), and record last fail time
767                    size_t failureIndex = mCurBandwidthIndex;
768                    mBandwidthItems.editItemAt(
769                            failureIndex).mLastFailureUs = ALooper::GetNowUs();
770
771                    if (mSwitchInProgress) {
772                        // if error happened when we switch to a variant, try fallback
773                        // to other variant to save the session
774                        if (tryBandwidthFallback()) {
775                            break;
776                        }
777                    }
778
779                    if (mInPreparationPhase) {
780                        postPrepared(err);
781                    }
782
783                    cancelBandwidthSwitch();
784
785                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
786
787                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
788
789                    mPacketSources.valueFor(
790                            STREAMTYPE_SUBTITLES)->signalEOS(err);
791
792                    postError(err);
793                    break;
794                }
795
796                case PlaylistFetcher::kWhatStopReached:
797                {
798                    ALOGV("kWhatStopReached");
799
800                    AString oldUri;
801                    CHECK(msg->findString("uri", &oldUri));
802
803                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
804                    if (index < 0) {
805                        break;
806                    }
807
808                    tryToFinishBandwidthSwitch(oldUri);
809                    break;
810                }
811
812                case PlaylistFetcher::kWhatStartedAt:
813                {
814                    int32_t switchGeneration;
815                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
816
817                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
818                            switchGeneration, mSwitchGeneration);
819
820                    if (switchGeneration != mSwitchGeneration) {
821                        break;
822                    }
823
824                    AString uri;
825                    CHECK(msg->findString("uri", &uri));
826
827                    // mark new fetcher mToBeResumed
828                    ssize_t index = mFetcherInfos.indexOfKey(uri);
829                    if (index >= 0) {
830                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
831                    }
832
833                    // temporarily disable packet sources to be swapped to prevent
834                    // NuPlayerDecoder from dequeuing while we check progress
835                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
836                        if ((mSwapMask & mPacketSources.keyAt(i))
837                                && uri == mStreams[i].mNewUri) {
838                            mPacketSources.editValueAt(i)->enable(false);
839                        }
840                    }
841                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
842                    // If switching up, require a cushion bigger than kUnderflowMark
843                    // to avoid buffering immediately after the switch.
844                    // (If we don't have that cushion we'd rather cancel and try again.)
845                    int64_t delayUs =
846                        switchUp ?
847                            (kUnderflowMarkMs * 1000ll + 1000000ll)
848                            : 0;
849                    bool needResumeUntil = false;
850                    sp<AMessage> stopParams = msg;
851                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
852                        // playback time hasn't passed startAt time
853                        if (!needResumeUntil) {
854                            ALOGV("finish switch");
855                            for (size_t i = 0; i < kMaxStreams; ++i) {
856                                if ((mSwapMask & indexToType(i))
857                                        && uri == mStreams[i].mNewUri) {
858                                    // have to make a copy of mStreams[i].mUri because
859                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
860                                    AString oldURI = mStreams[i].mUri;
861                                    tryToFinishBandwidthSwitch(oldURI);
862                                    break;
863                                }
864                            }
865                        } else {
866                            // startAt time is after last enqueue time
867                            // Resume fetcher for the original variant; the resumed fetcher should
868                            // continue until the timestamps found in msg, which is stored by the
869                            // new fetcher to indicate where the new variant has started buffering.
870                            ALOGV("finish switch with resumeUntilAsync");
871                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
872                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
873                                if (info.mToBeRemoved) {
874                                    info.mFetcher->resumeUntilAsync(stopParams);
875                                }
876                            }
877                        }
878                    } else {
879                        // playback time passed startAt time
880                        if (switchUp) {
881                            // if switching up, cancel and retry if condition satisfies again
882                            ALOGV("cancel up switch because we're too late");
883                            cancelBandwidthSwitch(true /* resume */);
884                        } else {
885                            ALOGV("retry down switch at next sample");
886                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
887                        }
888                    }
889                    // re-enable all packet sources
890                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
891                        mPacketSources.editValueAt(i)->enable(true);
892                    }
893
894                    break;
895                }
896
897                case PlaylistFetcher::kWhatPlaylistFetched:
898                {
899                    onMasterPlaylistFetched(msg);
900                    break;
901                }
902
903                case PlaylistFetcher::kWhatMetadataDetected:
904                {
905                    if (!mHasMetadata) {
906                        mHasMetadata = true;
907                        sp<AMessage> notify = mNotify->dup();
908                        notify->setInt32("what", kWhatMetadataDetected);
909                        notify->post();
910                    }
911                    break;
912                }
913
914                default:
915                    TRESPASS();
916            }
917
918            break;
919        }
920
921        case kWhatChangeConfiguration:
922        {
923            onChangeConfiguration(msg);
924            break;
925        }
926
927        case kWhatChangeConfiguration2:
928        {
929            onChangeConfiguration2(msg);
930            break;
931        }
932
933        case kWhatChangeConfiguration3:
934        {
935            onChangeConfiguration3(msg);
936            break;
937        }
938
939        case kWhatPollBuffering:
940        {
941            int32_t generation;
942            CHECK(msg->findInt32("generation", &generation));
943            if (generation == mPollBufferingGeneration) {
944                onPollBuffering();
945            }
946            break;
947        }
948
949        default:
950            TRESPASS();
951            break;
952    }
953}
954
955// static
956bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
957    static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
958    return item.mLastFailureUs < 0
959            || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
960}
961
962// static
963int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
964    if (a->mBandwidth < b->mBandwidth) {
965        return -1;
966    } else if (a->mBandwidth == b->mBandwidth) {
967        return 0;
968    }
969
970    return 1;
971}
972
973// static
974LiveSession::StreamType LiveSession::indexToType(int idx) {
975    CHECK(idx >= 0 && idx < kNumSources);
976    return (StreamType)(1 << idx);
977}
978
979// static
980ssize_t LiveSession::typeToIndex(int32_t type) {
981    switch (type) {
982        case STREAMTYPE_AUDIO:
983            return 0;
984        case STREAMTYPE_VIDEO:
985            return 1;
986        case STREAMTYPE_SUBTITLES:
987            return 2;
988        case STREAMTYPE_METADATA:
989            return 3;
990        default:
991            return -1;
992    };
993    return -1;
994}
995
996void LiveSession::onConnect(const sp<AMessage> &msg) {
997    CHECK(msg->findString("url", &mMasterURL));
998
999    // TODO currently we don't know if we are coming here from incognito mode
1000    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
1001
1002    KeyedVector<String8, String8> *headers = NULL;
1003    if (!msg->findPointer("headers", (void **)&headers)) {
1004        mExtraHeaders.clear();
1005    } else {
1006        mExtraHeaders = *headers;
1007
1008        delete headers;
1009        headers = NULL;
1010    }
1011
1012    // create looper for fetchers
1013    if (mFetcherLooper == NULL) {
1014        mFetcherLooper = new ALooper();
1015
1016        mFetcherLooper->setName("Fetcher");
1017        mFetcherLooper->start(false, /* runOnCallingThread */
1018                              true  /* canCallJava */);
1019    }
1020
1021    // create fetcher to fetch the master playlist
1022    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1023}
1024
1025void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1026    AString uri;
1027    CHECK(msg->findString("uri", &uri));
1028    ssize_t index = mFetcherInfos.indexOfKey(uri);
1029    if (index < 0) {
1030        ALOGW("fetcher for master playlist is gone.");
1031        return;
1032    }
1033
1034    // no longer useful, remove
1035    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1036    mFetcherInfos.removeItemsAt(index);
1037
1038    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1039    if (mPlaylist == NULL) {
1040        ALOGE("unable to fetch master playlist %s.",
1041                uriDebugString(mMasterURL).c_str());
1042
1043        postPrepared(ERROR_IO);
1044        return;
1045    }
1046    // We trust the content provider to make a reasonable choice of preferred
1047    // initial bandwidth by listing it first in the variant playlist.
1048    // At startup we really don't have a good estimate on the available
1049    // network bandwidth since we haven't tranferred any data yet. Once
1050    // we have we can make a better informed choice.
1051    size_t initialBandwidth = 0;
1052    size_t initialBandwidthIndex = 0;
1053
1054    int32_t maxWidth = 0;
1055    int32_t maxHeight = 0;
1056
1057    if (mPlaylist->isVariantPlaylist()) {
1058        Vector<BandwidthItem> itemsWithVideo;
1059        for (size_t i = 0; i < mPlaylist->size(); ++i) {
1060            BandwidthItem item;
1061
1062            item.mPlaylistIndex = i;
1063            item.mLastFailureUs = -1ll;
1064
1065            sp<AMessage> meta;
1066            AString uri;
1067            mPlaylist->itemAt(i, &uri, &meta);
1068
1069            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1070
1071            int32_t width, height;
1072            if (meta->findInt32("width", &width)) {
1073                maxWidth = max(maxWidth, width);
1074            }
1075            if (meta->findInt32("height", &height)) {
1076                maxHeight = max(maxHeight, height);
1077            }
1078
1079            mBandwidthItems.push(item);
1080            if (mPlaylist->hasType(i, "video")) {
1081                itemsWithVideo.push(item);
1082            }
1083        }
1084        // remove the audio-only variants if we have at least one with video
1085        if (!itemsWithVideo.empty()
1086                && itemsWithVideo.size() < mBandwidthItems.size()) {
1087            mBandwidthItems.clear();
1088            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1089                mBandwidthItems.push(itemsWithVideo[i]);
1090            }
1091        }
1092
1093        CHECK_GT(mBandwidthItems.size(), 0u);
1094        initialBandwidth = mBandwidthItems[0].mBandwidth;
1095
1096        mBandwidthItems.sort(SortByBandwidth);
1097
1098        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1099            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1100                initialBandwidthIndex = i;
1101                break;
1102            }
1103        }
1104    } else {
1105        // dummy item.
1106        BandwidthItem item;
1107        item.mPlaylistIndex = 0;
1108        item.mBandwidth = 0;
1109        mBandwidthItems.push(item);
1110    }
1111
1112    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1113    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1114
1115    mPlaylist->pickRandomMediaItems();
1116    changeConfiguration(
1117            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1118}
1119
1120void LiveSession::finishDisconnect() {
1121    ALOGV("finishDisconnect");
1122
1123    // No reconfiguration is currently pending, make sure none will trigger
1124    // during disconnection either.
1125    cancelBandwidthSwitch();
1126
1127    // cancel buffer polling
1128    cancelPollBuffering();
1129
1130    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1131    //
1132    // Some fetchers might be stuck in connect/getSize at this point. These
1133    // operations will eventually timeout (as we have a timeout set in
1134    // MediaHTTPConnection), but we don't want to block the main UI thread
1135    // until then. Here we just need to make sure we clear all references
1136    // to the fetchers, so that when they finally exit from the blocking
1137    // operation, they can be destructed.
1138    //
1139    // There is one very tricky point though. For this scheme to work, the
1140    // fecther must hold a reference to LiveSession, so that LiveSession is
1141    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1142    // own destructor when it waits for mFetcherLooper to stop, which still
1143    // blocks main UI thread.
1144    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1145        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1146        mFetcherLooper->unregisterHandler(
1147                mFetcherInfos.valueAt(i).mFetcher->id());
1148    }
1149    mFetcherInfos.clear();
1150
1151    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1152    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1153
1154    mPacketSources.valueFor(
1155            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1156
1157    sp<AMessage> response = new AMessage;
1158    response->setInt32("err", OK);
1159
1160    response->postReply(mDisconnectReplyID);
1161    mDisconnectReplyID.clear();
1162}
1163
1164sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1165    ssize_t index = mFetcherInfos.indexOfKey(uri);
1166
1167    if (index >= 0) {
1168        return NULL;
1169    }
1170
1171    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1172    notify->setString("uri", uri);
1173    notify->setInt32("switchGeneration", mSwitchGeneration);
1174
1175    FetcherInfo info;
1176    info.mFetcher = new PlaylistFetcher(
1177            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1178    info.mDurationUs = -1ll;
1179    info.mToBeRemoved = false;
1180    info.mToBeResumed = false;
1181    mFetcherLooper->registerHandler(info.mFetcher);
1182
1183    mFetcherInfos.add(uri, info);
1184
1185    return info.mFetcher;
1186}
1187
1188#if 0
1189static double uniformRand() {
1190    return (double)rand() / RAND_MAX;
1191}
1192#endif
1193
1194bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1195    ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1196            newUri ? "true" : "false",
1197            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1198    return i >= 0
1199            && ((!newUri && uri == mStreams[i].mUri)
1200            || (newUri && uri == mStreams[i].mNewUri));
1201}
1202
1203sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1204        size_t trackIndex, bool newUri) {
1205    StreamType type = indexToType(trackIndex);
1206    sp<AnotherPacketSource> source = NULL;
1207    if (newUri) {
1208        source = mPacketSources2.valueFor(type);
1209        source->clear();
1210    } else {
1211        source = mPacketSources.valueFor(type);
1212    };
1213    return source;
1214}
1215
1216sp<AnotherPacketSource> LiveSession::getMetadataSource(
1217        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1218    // todo: One case where the following strategy can fail is when audio and video
1219    // are in separate playlists, both are transport streams, and the metadata
1220    // is actually contained in the audio stream.
1221    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1222            streamMask, newUri ? "true" : "false");
1223
1224    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1225            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1226            // ... audio fetcher for audio only variant
1227        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1228    }
1229
1230    return NULL;
1231}
1232
1233bool LiveSession::resumeFetcher(
1234        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1235    ssize_t index = mFetcherInfos.indexOfKey(uri);
1236    if (index < 0) {
1237        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1238        return false;
1239    }
1240
1241    bool resume = false;
1242    sp<AnotherPacketSource> sources[kNumSources];
1243    for (size_t i = 0; i < kMaxStreams; ++i) {
1244        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1245            resume = true;
1246            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1247        }
1248    }
1249
1250    if (resume) {
1251        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1252        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1253
1254        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1255                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1256
1257        fetcher->startAsync(
1258                sources[kAudioIndex],
1259                sources[kVideoIndex],
1260                sources[kSubtitleIndex],
1261                getMetadataSource(sources, streamMask, newUri),
1262                timeUs, -1, -1, seekMode);
1263    }
1264
1265    return resume;
1266}
1267
1268float LiveSession::getAbortThreshold(
1269        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1270    float abortThreshold = -1.0f;
1271    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1272        /*
1273           If we're switching down, we need to decide whether to
1274
1275           1) finish last segment of high-bandwidth variant, or
1276           2) abort last segment of high-bandwidth variant, and fetch an
1277              overlapping portion from low-bandwidth variant.
1278
1279           Here we try to maximize the amount of buffer left when the
1280           switch point is met. Given the following parameters:
1281
1282           B: our current buffering level in seconds
1283           T: target duration in seconds
1284           X: sample duration in seconds remain to fetch in last segment
1285           bw0: bandwidth of old variant (as specified in playlist)
1286           bw1: bandwidth of new variant (as specified in playlist)
1287           bw: measured bandwidth available
1288
1289           If we choose 1), when switch happens at the end of current
1290           segment, our buffering will be
1291                  B + X - X * bw0 / bw
1292
1293           If we choose 2), when switch happens where we aborted current
1294           segment, our buffering will be
1295                  B - (T - X) * bw1 / bw
1296
1297           We should only choose 1) if
1298                  X/T < bw1 / (bw1 + bw0 - bw)
1299        */
1300
1301        // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1302        // our estimate could be far off, and fetching old bandwidth could
1303        // take too long.
1304        if (!mLastBandwidthStable) {
1305            return 0.0f;
1306        }
1307
1308        // Taking the measured current bandwidth at 50% face value only,
1309        // as our bandwidth estimation is a lagging indicator. Being
1310        // conservative on this, we prefer switching to lower bandwidth
1311        // unless we're really confident finishing up the last segment
1312        // of higher bandwidth will be fast.
1313        CHECK(mLastBandwidthBps >= 0);
1314        abortThreshold =
1315                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1316             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1317              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1318              - (float)mLastBandwidthBps * 0.5f);
1319        if (abortThreshold < 0.0f) {
1320            abortThreshold = -1.0f; // do not abort
1321        }
1322        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1323                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1324                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1325                mLastBandwidthBps,
1326                abortThreshold);
1327    }
1328    return abortThreshold;
1329}
1330
1331void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1332    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1333}
1334
1335ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1336    for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1337        if (isBandwidthValid(mBandwidthItems[index])) {
1338            return index;
1339        }
1340    }
1341    // if playlists are all blacklisted, return 0 and hope it's alive
1342    return 0;
1343}
1344
1345size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1346    if (mBandwidthItems.size() < 2) {
1347        // shouldn't be here if we only have 1 bandwidth, check
1348        // logic to get rid of redundant bandwidth polling
1349        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1350        return 0;
1351    }
1352
1353#if 1
1354    char value[PROPERTY_VALUE_MAX];
1355    ssize_t index = -1;
1356    if (property_get("media.httplive.bw-index", value, NULL)) {
1357        char *end;
1358        index = strtol(value, &end, 10);
1359        CHECK(end > value && *end == '\0');
1360
1361        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1362            index = mBandwidthItems.size() - 1;
1363        }
1364    }
1365
1366    if (index < 0) {
1367        char value[PROPERTY_VALUE_MAX];
1368        if (property_get("media.httplive.max-bw", value, NULL)) {
1369            char *end;
1370            long maxBw = strtoul(value, &end, 10);
1371            if (end > value && *end == '\0') {
1372                if (maxBw > 0 && bandwidthBps > maxBw) {
1373                    ALOGV("bandwidth capped to %ld bps", maxBw);
1374                    bandwidthBps = maxBw;
1375                }
1376            }
1377        }
1378
1379        // Pick the highest bandwidth stream that's not currently blacklisted
1380        // below or equal to estimated bandwidth.
1381
1382        index = mBandwidthItems.size() - 1;
1383        ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1384        while (index > lowestBandwidth) {
1385            // be conservative (70%) to avoid overestimating and immediately
1386            // switching down again.
1387            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1388            const BandwidthItem &item = mBandwidthItems[index];
1389            if (item.mBandwidth <= adjustedBandwidthBps
1390                    && isBandwidthValid(item)) {
1391                break;
1392            }
1393            --index;
1394        }
1395    }
1396#elif 0
1397    // Change bandwidth at random()
1398    size_t index = uniformRand() * mBandwidthItems.size();
1399#elif 0
1400    // There's a 50% chance to stay on the current bandwidth and
1401    // a 50% chance to switch to the next higher bandwidth (wrapping around
1402    // to lowest)
1403    const size_t kMinIndex = 0;
1404
1405    static ssize_t mCurBandwidthIndex = -1;
1406
1407    size_t index;
1408    if (mCurBandwidthIndex < 0) {
1409        index = kMinIndex;
1410    } else if (uniformRand() < 0.5) {
1411        index = (size_t)mCurBandwidthIndex;
1412    } else {
1413        index = mCurBandwidthIndex + 1;
1414        if (index == mBandwidthItems.size()) {
1415            index = kMinIndex;
1416        }
1417    }
1418    mCurBandwidthIndex = index;
1419#elif 0
1420    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1421
1422    size_t index = mBandwidthItems.size() - 1;
1423    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1424        --index;
1425    }
1426#elif 1
1427    char value[PROPERTY_VALUE_MAX];
1428    size_t index;
1429    if (property_get("media.httplive.bw-index", value, NULL)) {
1430        char *end;
1431        index = strtoul(value, &end, 10);
1432        CHECK(end > value && *end == '\0');
1433
1434        if (index >= mBandwidthItems.size()) {
1435            index = mBandwidthItems.size() - 1;
1436        }
1437    } else {
1438        index = 0;
1439    }
1440#else
1441    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1442#endif
1443
1444    CHECK_GE(index, 0);
1445
1446    return index;
1447}
1448
1449HLSTime LiveSession::latestMediaSegmentStartTime() const {
1450    HLSTime audioTime(mPacketSources.valueFor(
1451                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1452
1453    HLSTime videoTime(mPacketSources.valueFor(
1454                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1455
1456    return audioTime < videoTime ? videoTime : audioTime;
1457}
1458
1459void LiveSession::onSeek(const sp<AMessage> &msg) {
1460    int64_t timeUs;
1461    int32_t mode;
1462    CHECK(msg->findInt64("timeUs", &timeUs));
1463    CHECK(msg->findInt32("mode", &mode));
1464    // TODO: add "mode" to changeConfiguration.
1465    changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1466}
1467
1468status_t LiveSession::getDuration(int64_t *durationUs) const {
1469    int64_t maxDurationUs = -1ll;
1470    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1471        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1472
1473        if (fetcherDurationUs > maxDurationUs) {
1474            maxDurationUs = fetcherDurationUs;
1475        }
1476    }
1477
1478    *durationUs = maxDurationUs;
1479
1480    return OK;
1481}
1482
1483bool LiveSession::isSeekable() const {
1484    int64_t durationUs;
1485    return getDuration(&durationUs) == OK && durationUs >= 0;
1486}
1487
1488bool LiveSession::hasDynamicDuration() const {
1489    return false;
1490}
1491
1492size_t LiveSession::getTrackCount() const {
1493    if (mPlaylist == NULL) {
1494        return 0;
1495    } else {
1496        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1497    }
1498}
1499
1500sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1501    if (mPlaylist == NULL) {
1502        return NULL;
1503    } else {
1504        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1505            sp<AMessage> format = new AMessage();
1506            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1507            format->setString("language", "und");
1508            format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1509            return format;
1510        }
1511        return mPlaylist->getTrackInfo(trackIndex);
1512    }
1513}
1514
1515status_t LiveSession::selectTrack(size_t index, bool select) {
1516    if (mPlaylist == NULL) {
1517        return INVALID_OPERATION;
1518    }
1519
1520    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1521            index, select, mSubtitleGeneration);
1522
1523    ++mSubtitleGeneration;
1524    status_t err = mPlaylist->selectTrack(index, select);
1525    if (err == OK) {
1526        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1527        msg->setInt32("pickTrack", select);
1528        msg->post();
1529    }
1530    return err;
1531}
1532
1533ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1534    if (mPlaylist == NULL) {
1535        return -1;
1536    } else {
1537        return mPlaylist->getSelectedTrack(type);
1538    }
1539}
1540
1541void LiveSession::changeConfiguration(
1542        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1543    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1544          (long long)timeUs, bandwidthIndex, pickTrack);
1545
1546    cancelBandwidthSwitch();
1547
1548    CHECK(!mReconfigurationInProgress);
1549    mReconfigurationInProgress = true;
1550    if (bandwidthIndex >= 0) {
1551        mOrigBandwidthIndex = mCurBandwidthIndex;
1552        mCurBandwidthIndex = bandwidthIndex;
1553        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1554            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1555                    mOrigBandwidthIndex, mCurBandwidthIndex);
1556        }
1557    }
1558    CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
1559    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1560
1561    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1562    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1563
1564    AString URIs[kMaxStreams];
1565    for (size_t i = 0; i < kMaxStreams; ++i) {
1566        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1567            streamMask |= indexToType(i);
1568        }
1569    }
1570
1571    // Step 1, stop and discard fetchers that are no longer needed.
1572    // Pause those that we'll reuse.
1573    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1574        // skip fetchers that are marked mToBeRemoved,
1575        // these are done and can't be reused
1576        if (mFetcherInfos[i].mToBeRemoved) {
1577            continue;
1578        }
1579
1580        const AString &uri = mFetcherInfos.keyAt(i);
1581        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1582
1583        bool discardFetcher = true, delayRemoval = false;
1584        for (size_t j = 0; j < kMaxStreams; ++j) {
1585            StreamType type = indexToType(j);
1586            if ((streamMask & type) && uri == URIs[j]) {
1587                resumeMask |= type;
1588                streamMask &= ~type;
1589                discardFetcher = false;
1590            }
1591        }
1592        // Delay fetcher removal if not picking tracks, AND old fetcher
1593        // has stream mask that overlaps new variant. (Okay to discard
1594        // old fetcher now, if completely no overlap.)
1595        if (discardFetcher && timeUs < 0ll && !pickTrack
1596                && (fetcher->getStreamTypeMask() & streamMask)) {
1597            discardFetcher = false;
1598            delayRemoval = true;
1599        }
1600
1601        if (discardFetcher) {
1602            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1603            fetcher->stopAsync();
1604        } else {
1605            float threshold = 0.0f; // default to pause after current block (47Kbytes)
1606            bool disconnect = false;
1607            if (timeUs >= 0ll) {
1608                // seeking, no need to finish fetching
1609                disconnect = true;
1610            } else if (delayRemoval) {
1611                // adapting, abort if remaining of current segment is over threshold
1612                threshold = getAbortThreshold(
1613                        mOrigBandwidthIndex, mCurBandwidthIndex);
1614            }
1615
1616            ALOGV("pausing fetcher-%d, threshold=%.2f",
1617                    fetcher->getFetcherID(), threshold);
1618            fetcher->pauseAsync(threshold, disconnect);
1619        }
1620    }
1621
1622    sp<AMessage> msg;
1623    if (timeUs < 0ll) {
1624        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1625        msg = new AMessage(kWhatChangeConfiguration3, this);
1626    } else {
1627        msg = new AMessage(kWhatChangeConfiguration2, this);
1628    }
1629    msg->setInt32("streamMask", streamMask);
1630    msg->setInt32("resumeMask", resumeMask);
1631    msg->setInt32("pickTrack", pickTrack);
1632    msg->setInt64("timeUs", timeUs);
1633    for (size_t i = 0; i < kMaxStreams; ++i) {
1634        if ((streamMask | resumeMask) & indexToType(i)) {
1635            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1636        }
1637    }
1638
1639    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1640    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1641    // fetchers have completed their asynchronous operation, we'll post
1642    // mContinuation, which then is handled below in onChangeConfiguration2.
1643    mContinuationCounter = mFetcherInfos.size();
1644    mContinuation = msg;
1645
1646    if (mContinuationCounter == 0) {
1647        msg->post();
1648    }
1649}
1650
1651void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1652    ALOGV("onChangeConfiguration");
1653
1654    if (!mReconfigurationInProgress) {
1655        int32_t pickTrack = 0;
1656        msg->findInt32("pickTrack", &pickTrack);
1657        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1658    } else {
1659        msg->post(1000000ll); // retry in 1 sec
1660    }
1661}
1662
1663void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1664    ALOGV("onChangeConfiguration2");
1665
1666    mContinuation.clear();
1667
1668    // All fetchers are either suspended or have been removed now.
1669
1670    // If we're seeking, clear all packet sources before we report
1671    // seek complete, to prevent decoder from pulling stale data.
1672    int64_t timeUs;
1673    CHECK(msg->findInt64("timeUs", &timeUs));
1674
1675    if (timeUs >= 0) {
1676        mLastSeekTimeUs = timeUs;
1677        mLastDequeuedTimeUs = timeUs;
1678
1679        for (size_t i = 0; i < mPacketSources.size(); i++) {
1680            sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1681            sp<MetaData> format = packetSource->getFormat();
1682            packetSource->clear();
1683            // Set a tentative format here such that HTTPLiveSource will always have
1684            // a format available when NuPlayer queries. Without an available video
1685            // format when setting a surface NuPlayer might disable video decoding
1686            // altogether. The tentative format will be overwritten by the
1687            // authoritative (and possibly same) format once content from the new
1688            // position is dequeued.
1689            packetSource->setFormat(format);
1690        }
1691
1692        for (size_t i = 0; i < kMaxStreams; ++i) {
1693            mStreams[i].reset();
1694        }
1695
1696        mDiscontinuityOffsetTimesUs.clear();
1697        mDiscontinuityAbsStartTimesUs.clear();
1698
1699        if (mSeekReplyID != NULL) {
1700            CHECK(mSeekReply != NULL);
1701            mSeekReply->setInt32("err", OK);
1702            mSeekReply->postReply(mSeekReplyID);
1703            mSeekReplyID.clear();
1704            mSeekReply.clear();
1705        }
1706
1707        // restart buffer polling after seek becauese previous
1708        // buffering position is no longer valid.
1709        restartPollBuffering();
1710    }
1711
1712    uint32_t streamMask, resumeMask;
1713    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1714    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1715
1716    streamMask |= resumeMask;
1717
1718    AString URIs[kMaxStreams];
1719    for (size_t i = 0; i < kMaxStreams; ++i) {
1720        if (streamMask & indexToType(i)) {
1721            const AString &uriKey = mStreams[i].uriKey();
1722            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1723            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1724        }
1725    }
1726
1727    uint32_t changedMask = 0;
1728    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1729        // stream URI could change even if onChangeConfiguration2 is only
1730        // used for seek. Seek could happen during a bw switch, in this
1731        // case bw switch will be cancelled, but the seekTo position will
1732        // fetch from the new URI.
1733        if ((mStreamMask & streamMask & indexToType(i))
1734                && !mStreams[i].mUri.empty()
1735                && !(URIs[i] == mStreams[i].mUri)) {
1736            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1737                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1738            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1739            if (source->getLatestDequeuedMeta() != NULL) {
1740                source->queueDiscontinuity(
1741                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1742            }
1743        }
1744        // Determine which decoders to shutdown on the player side,
1745        // a decoder has to be shutdown if its streamtype was active
1746        // before but now longer isn't.
1747        if ((mStreamMask & ~streamMask & indexToType(i))) {
1748            changedMask |= indexToType(i);
1749        }
1750    }
1751
1752    if (changedMask == 0) {
1753        // If nothing changed as far as the audio/video decoders
1754        // are concerned we can proceed.
1755        onChangeConfiguration3(msg);
1756        return;
1757    }
1758
1759    // Something changed, inform the player which will shutdown the
1760    // corresponding decoders and will post the reply once that's done.
1761    // Handling the reply will continue executing below in
1762    // onChangeConfiguration3.
1763    sp<AMessage> notify = mNotify->dup();
1764    notify->setInt32("what", kWhatStreamsChanged);
1765    notify->setInt32("changedMask", changedMask);
1766
1767    msg->setWhat(kWhatChangeConfiguration3);
1768    msg->setTarget(this);
1769
1770    notify->setMessage("reply", msg);
1771    notify->post();
1772}
1773
1774void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1775    mContinuation.clear();
1776    // All remaining fetchers are still suspended, the player has shutdown
1777    // any decoders that needed it.
1778
1779    uint32_t streamMask, resumeMask;
1780    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1781    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1782
1783    mNewStreamMask = streamMask | resumeMask;
1784
1785    int64_t timeUs;
1786    int32_t pickTrack;
1787    bool switching = false;
1788    CHECK(msg->findInt64("timeUs", &timeUs));
1789    CHECK(msg->findInt32("pickTrack", &pickTrack));
1790
1791    if (timeUs < 0ll) {
1792        if (!pickTrack) {
1793            // mSwapMask contains streams that are in both old and new variant,
1794            // (in mNewStreamMask & mStreamMask) but with different URIs
1795            // (not in resumeMask).
1796            // For example, old variant has video and audio in two separate
1797            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1798            // should be 0 as there is nothing to swap. We only need to stop video,
1799            // and resume audio.
1800            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1801            switching = (mSwapMask != 0);
1802        }
1803        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1804    } else {
1805        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1806    }
1807
1808    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1809            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1810            (long long)timeUs, switching, pickTrack,
1811            mStreamMask, mNewStreamMask, mSwapMask);
1812
1813    for (size_t i = 0; i < kMaxStreams; ++i) {
1814        if (streamMask & indexToType(i)) {
1815            if (switching) {
1816                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1817            } else {
1818                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1819            }
1820        }
1821    }
1822
1823    // Of all existing fetchers:
1824    // * Resume fetchers that are still needed and assign them original packet sources.
1825    // * Mark otherwise unneeded fetchers for removal.
1826    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1827    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1828        const AString &uri = mFetcherInfos.keyAt(i);
1829        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1830            ALOGV("marking fetcher-%d to be removed",
1831                    mFetcherInfos[i].mFetcher->getFetcherID());
1832
1833            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1834        }
1835    }
1836
1837    // streamMask now only contains the types that need a new fetcher created.
1838    if (streamMask != 0) {
1839        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1840    }
1841
1842    // Find out when the original fetchers have buffered up to and start the new fetchers
1843    // at a later timestamp.
1844    for (size_t i = 0; i < kMaxStreams; i++) {
1845        if (!(indexToType(i) & streamMask)) {
1846            continue;
1847        }
1848
1849        AString uri;
1850        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1851
1852        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1853        CHECK(fetcher != NULL);
1854
1855        HLSTime startTime;
1856        SeekMode seekMode = kSeekModeExactPosition;
1857        sp<AnotherPacketSource> sources[kNumSources];
1858
1859        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1860            startTime = latestMediaSegmentStartTime();
1861        }
1862
1863        // TRICKY: looping from i as earlier streams are already removed from streamMask
1864        for (size_t j = i; j < kMaxStreams; ++j) {
1865            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1866            if ((streamMask & indexToType(j)) && uri == streamUri) {
1867                sources[j] = mPacketSources.valueFor(indexToType(j));
1868
1869                if (timeUs >= 0) {
1870                    startTime.mTimeUs = timeUs;
1871                } else {
1872                    int32_t type;
1873                    sp<AMessage> meta;
1874                    if (!switching) {
1875                        // selecting, or adapting but no swap required
1876                        meta = sources[j]->getLatestDequeuedMeta();
1877                    } else {
1878                        // adapting and swap required
1879                        meta = sources[j]->getLatestEnqueuedMeta();
1880                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1881                            // switching up
1882                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1883                        }
1884                    }
1885
1886                    if ((j == kAudioIndex || j == kVideoIndex)
1887                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1888                        HLSTime tmpTime(meta);
1889                        if (startTime < tmpTime) {
1890                            startTime = tmpTime;
1891                        }
1892                    }
1893
1894                    if (!switching) {
1895                        // selecting, or adapting but no swap required
1896                        sources[j]->clear();
1897                        if (j == kSubtitleIndex) {
1898                            break;
1899                        }
1900
1901                        ALOGV("stream[%zu]: queue format change", j);
1902                        sources[j]->queueDiscontinuity(
1903                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1904                    } else {
1905                        // switching, queue discontinuities after resume
1906                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1907                        sources[j]->clear();
1908                        // the new fetcher might be providing streams that used to be
1909                        // provided by two different fetchers,  if one of the fetcher
1910                        // paused in the middle while the other somehow paused in next
1911                        // seg, we have to start from next seg.
1912                        if (seekMode < mStreams[j].mSeekMode) {
1913                            seekMode = mStreams[j].mSeekMode;
1914                        }
1915                    }
1916                }
1917
1918                streamMask &= ~indexToType(j);
1919            }
1920        }
1921
1922        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1923                "segmentStartTimeUs %lld seekMode %d",
1924                fetcher->getFetcherID(),
1925                (long long)startTime.mTimeUs,
1926                (long long)mLastSeekTimeUs,
1927                (long long)startTime.getSegmentTimeUs(),
1928                seekMode);
1929
1930        // Set the target segment start time to the middle point of the
1931        // segment where the last sample was.
1932        // This gives a better guess if segments of the two variants are not
1933        // perfectly aligned. (If the corresponding segment in new variant
1934        // starts slightly later than that in the old variant, we still want
1935        // to pick that segment, not the one before)
1936        fetcher->startAsync(
1937                sources[kAudioIndex],
1938                sources[kVideoIndex],
1939                sources[kSubtitleIndex],
1940                getMetadataSource(sources, mNewStreamMask, switching),
1941                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1942                startTime.getSegmentTimeUs(),
1943                startTime.mSeq,
1944                seekMode);
1945    }
1946
1947    // All fetchers have now been started, the configuration change
1948    // has completed.
1949
1950    mReconfigurationInProgress = false;
1951    if (switching) {
1952        mSwitchInProgress = true;
1953    } else {
1954        mStreamMask = mNewStreamMask;
1955        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1956            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1957                    mOrigBandwidthIndex, mCurBandwidthIndex);
1958            mOrigBandwidthIndex = mCurBandwidthIndex;
1959        }
1960    }
1961
1962    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1963            mSwitchInProgress, mStreamMask);
1964
1965    if (mDisconnectReplyID != NULL) {
1966        finishDisconnect();
1967    }
1968}
1969
1970void LiveSession::swapPacketSource(StreamType stream) {
1971    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1972
1973    // transfer packets from source2 to source
1974    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1975    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1976
1977    // queue discontinuity in mPacketSource
1978    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1979
1980    // queue packets in mPacketSource2 to mPacketSource
1981    status_t finalResult = OK;
1982    sp<ABuffer> accessUnit;
1983    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1984          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1985        aps->queueAccessUnit(accessUnit);
1986    }
1987    aps2->clear();
1988}
1989
1990void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1991    if (!mSwitchInProgress) {
1992        return;
1993    }
1994
1995    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1996    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1997        return;
1998    }
1999
2000    // Swap packet source of streams provided by old variant
2001    for (size_t idx = 0; idx < kMaxStreams; idx++) {
2002        StreamType stream = indexToType(idx);
2003        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2004            swapPacketSource(stream);
2005
2006            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2007                ALOGW("swapping stream type %d %s to empty stream",
2008                        stream, mStreams[idx].mUri.c_str());
2009            }
2010            mStreams[idx].mUri = mStreams[idx].mNewUri;
2011            mStreams[idx].mNewUri.clear();
2012
2013            mSwapMask &= ~stream;
2014        }
2015    }
2016
2017    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2018
2019    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2020    if (mSwapMask != 0) {
2021        return;
2022    }
2023
2024    // Check if new variant contains extra streams.
2025    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2026    while (extraStreams) {
2027        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2028        extraStreams &= ~stream;
2029
2030        swapPacketSource(stream);
2031
2032        ssize_t idx = typeToIndex(stream);
2033        CHECK(idx >= 0);
2034        if (mStreams[idx].mNewUri.empty()) {
2035            ALOGW("swapping extra stream type %d %s to empty stream",
2036                    stream, mStreams[idx].mUri.c_str());
2037        }
2038        mStreams[idx].mUri = mStreams[idx].mNewUri;
2039        mStreams[idx].mNewUri.clear();
2040    }
2041
2042    // Restart new fetcher (it was paused after the first 47k block)
2043    // and let it fetch into mPacketSources (not mPacketSources2)
2044    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2045        FetcherInfo &info = mFetcherInfos.editValueAt(i);
2046        if (info.mToBeResumed) {
2047            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2048            info.mToBeResumed = false;
2049        }
2050    }
2051
2052    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2053            mOrigBandwidthIndex, mCurBandwidthIndex);
2054
2055    mStreamMask = mNewStreamMask;
2056    mSwitchInProgress = false;
2057    mOrigBandwidthIndex = mCurBandwidthIndex;
2058
2059    restartPollBuffering();
2060}
2061
2062void LiveSession::schedulePollBuffering() {
2063    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2064    msg->setInt32("generation", mPollBufferingGeneration);
2065    msg->post(1000000ll);
2066}
2067
2068void LiveSession::cancelPollBuffering() {
2069    ++mPollBufferingGeneration;
2070    mPrevBufferPercentage = -1;
2071}
2072
2073void LiveSession::restartPollBuffering() {
2074    cancelPollBuffering();
2075    onPollBuffering();
2076}
2077
2078void LiveSession::onPollBuffering() {
2079    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2080            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2081        mSwitchInProgress, mReconfigurationInProgress,
2082        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2083
2084    bool underflow, ready, down, up;
2085    if (checkBuffering(underflow, ready, down, up)) {
2086        if (mInPreparationPhase) {
2087            // Allow down switch even if we're still preparing.
2088            //
2089            // Some streams have a high bandwidth index as default,
2090            // when bandwidth is low, it takes a long time to buffer
2091            // to ready mark, then it immediately pauses after start
2092            // as we have to do a down switch. It's better experience
2093            // to restart from a lower index, if we detect low bw.
2094            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2095                postPrepared(OK);
2096            }
2097        }
2098
2099        if (!mInPreparationPhase) {
2100            if (ready) {
2101                stopBufferingIfNecessary();
2102            } else if (underflow) {
2103                startBufferingIfNecessary();
2104            }
2105            switchBandwidthIfNeeded(up, down);
2106        }
2107    }
2108
2109    schedulePollBuffering();
2110}
2111
2112void LiveSession::cancelBandwidthSwitch(bool resume) {
2113    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2114            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2115    if (!mSwitchInProgress) {
2116        return;
2117    }
2118
2119    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2120        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2121        if (info.mToBeRemoved) {
2122            info.mToBeRemoved = false;
2123            if (resume) {
2124                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2125            }
2126        }
2127    }
2128
2129    for (size_t i = 0; i < kMaxStreams; ++i) {
2130        AString newUri = mStreams[i].mNewUri;
2131        if (!newUri.empty()) {
2132            // clear all mNewUri matching this newUri
2133            for (size_t j = i; j < kMaxStreams; ++j) {
2134                if (mStreams[j].mNewUri == newUri) {
2135                    mStreams[j].mNewUri.clear();
2136                }
2137            }
2138            ALOGV("stopping newUri = %s", newUri.c_str());
2139            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2140            if (index < 0) {
2141                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2142                continue;
2143            }
2144            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2145            info.mToBeRemoved = true;
2146            info.mFetcher->stopAsync();
2147        }
2148    }
2149
2150    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2151            mOrigBandwidthIndex, mCurBandwidthIndex);
2152
2153    mSwitchGeneration++;
2154    mSwitchInProgress = false;
2155    mCurBandwidthIndex = mOrigBandwidthIndex;
2156    mSwapMask = 0;
2157}
2158
2159bool LiveSession::checkBuffering(
2160        bool &underflow, bool &ready, bool &down, bool &up) {
2161    underflow = ready = down = up = false;
2162
2163    if (mReconfigurationInProgress) {
2164        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2165        return false;
2166    }
2167
2168    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2169    activeCount = underflowCount = readyCount = downCount = upCount =0;
2170    int32_t minBufferPercent = -1;
2171    int64_t durationUs;
2172    if (getDuration(&durationUs) != OK) {
2173        durationUs = -1;
2174    }
2175    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2176        // we don't check subtitles for buffering level
2177        if (!(mStreamMask & mPacketSources.keyAt(i)
2178                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2179            continue;
2180        }
2181        // ignore streams that never had any packet queued.
2182        // (it's possible that the variant only has audio or video)
2183        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2184        if (meta == NULL) {
2185            continue;
2186        }
2187
2188        status_t finalResult;
2189        int64_t bufferedDurationUs =
2190                mPacketSources[i]->getBufferedDurationUs(&finalResult);
2191        ALOGV("[%s] buffered %lld us",
2192                getNameForStream(mPacketSources.keyAt(i)),
2193                (long long)bufferedDurationUs);
2194        if (durationUs >= 0) {
2195            int32_t percent;
2196            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2197                percent = 100;
2198            } else {
2199                percent = (int32_t)(100.0 *
2200                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2201            }
2202            if (minBufferPercent < 0 || percent < minBufferPercent) {
2203                minBufferPercent = percent;
2204            }
2205        }
2206
2207        ++activeCount;
2208        int64_t readyMarkUs =
2209            (mInPreparationPhase ?
2210                mBufferingSettings.mInitialMarkMs :
2211                mBufferingSettings.mResumePlaybackMarkMs) * 1000ll;
2212        if (bufferedDurationUs > readyMarkUs
2213                || mPacketSources[i]->isFinished(0)) {
2214            ++readyCount;
2215        }
2216        if (!mPacketSources[i]->isFinished(0)) {
2217            if (bufferedDurationUs < kUnderflowMarkMs * 1000ll) {
2218                ++underflowCount;
2219            }
2220            if (bufferedDurationUs > mUpSwitchMark) {
2221                ++upCount;
2222            }
2223            if (bufferedDurationUs < mDownSwitchMark) {
2224                ++downCount;
2225            }
2226        }
2227    }
2228
2229    if (minBufferPercent >= 0) {
2230        notifyBufferingUpdate(minBufferPercent);
2231    }
2232
2233    if (activeCount > 0) {
2234        up        = (upCount == activeCount);
2235        down      = (downCount > 0);
2236        ready     = (readyCount == activeCount);
2237        underflow = (underflowCount > 0);
2238        return true;
2239    }
2240
2241    return false;
2242}
2243
2244void LiveSession::startBufferingIfNecessary() {
2245    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2246            mInPreparationPhase, mBuffering);
2247    if (!mBuffering) {
2248        mBuffering = true;
2249
2250        sp<AMessage> notify = mNotify->dup();
2251        notify->setInt32("what", kWhatBufferingStart);
2252        notify->post();
2253    }
2254}
2255
2256void LiveSession::stopBufferingIfNecessary() {
2257    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2258            mInPreparationPhase, mBuffering);
2259
2260    if (mBuffering) {
2261        mBuffering = false;
2262
2263        sp<AMessage> notify = mNotify->dup();
2264        notify->setInt32("what", kWhatBufferingEnd);
2265        notify->post();
2266    }
2267}
2268
2269void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2270    if (percentage < mPrevBufferPercentage) {
2271        percentage = mPrevBufferPercentage;
2272    } else if (percentage > 100) {
2273        percentage = 100;
2274    }
2275
2276    mPrevBufferPercentage = percentage;
2277
2278    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2279
2280    sp<AMessage> notify = mNotify->dup();
2281    notify->setInt32("what", kWhatBufferingUpdate);
2282    notify->setInt32("percentage", percentage);
2283    notify->post();
2284}
2285
2286bool LiveSession::tryBandwidthFallback() {
2287    if (mInPreparationPhase || mReconfigurationInProgress) {
2288        // Don't try fallback during prepare or reconfig.
2289        // If error happens there, it's likely unrecoverable.
2290        return false;
2291    }
2292    if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2293        // if we're switching up, simply cancel and resume old variant
2294        cancelBandwidthSwitch(true /* resume */);
2295        return true;
2296    } else {
2297        // if we're switching down, we're likely about to underflow (if
2298        // not already underflowing). try the lowest viable bandwidth if
2299        // not on that variant already.
2300        ssize_t lowestValid = getLowestValidBandwidthIndex();
2301        if (mCurBandwidthIndex > lowestValid) {
2302            cancelBandwidthSwitch();
2303            changeConfiguration(-1ll, lowestValid);
2304            return true;
2305        }
2306    }
2307    // return false if we couldn't find any fallback
2308    return false;
2309}
2310
2311/*
2312 * returns true if a bandwidth switch is actually needed (and started),
2313 * returns false otherwise
2314 */
2315bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2316    // no need to check bandwidth if we only have 1 bandwidth settings
2317    if (mBandwidthItems.size() < 2) {
2318        return false;
2319    }
2320
2321    if (mSwitchInProgress) {
2322        if (mBuffering) {
2323            tryBandwidthFallback();
2324        }
2325        return false;
2326    }
2327
2328    int32_t bandwidthBps, shortTermBps;
2329    bool isStable;
2330    if (mBandwidthEstimator->estimateBandwidth(
2331            &bandwidthBps, &isStable, &shortTermBps)) {
2332        ALOGV("bandwidth estimated at %.2f kbps, "
2333                "stable %d, shortTermBps %.2f kbps",
2334                bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2335        mLastBandwidthBps = bandwidthBps;
2336        mLastBandwidthStable = isStable;
2337    } else {
2338        ALOGV("no bandwidth estimate.");
2339        return false;
2340    }
2341
2342    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2343    // canSwithDown and canSwitchUp can't both be true.
2344    // we only want to switch up when measured bw is 120% higher than current variant,
2345    // and we only want to switch down when measured bw is below current variant.
2346    bool canSwitchDown = bufferLow
2347            && (bandwidthBps < (int32_t)curBandwidth);
2348    bool canSwitchUp = bufferHigh
2349            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2350
2351    if (canSwitchDown || canSwitchUp) {
2352        // bandwidth estimating has some delay, if we have to downswitch when
2353        // it hasn't stabilized, use the short term to guess real bandwidth,
2354        // since it may be dropping too fast.
2355        // (note this doesn't apply to upswitch, always use longer average there)
2356        if (!isStable && canSwitchDown) {
2357            if (shortTermBps < bandwidthBps) {
2358                bandwidthBps = shortTermBps;
2359            }
2360        }
2361
2362        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2363
2364        // it's possible that we're checking for canSwitchUp case, but the returned
2365        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2366        // of measured bw. In that case we don't want to do anything, since we have
2367        // both enough buffer and enough bw.
2368        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2369         || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2370            // if not yet prepared, just restart again with new bw index.
2371            // this is faster and playback experience is cleaner.
2372            changeConfiguration(
2373                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2374            return true;
2375        }
2376    }
2377    return false;
2378}
2379
2380void LiveSession::postError(status_t err) {
2381    // if we reached EOS, notify buffering of 100%
2382    if (err == ERROR_END_OF_STREAM) {
2383        notifyBufferingUpdate(100);
2384    }
2385    // we'll stop buffer polling now, before that notify
2386    // stop buffering to stop the spinning icon
2387    stopBufferingIfNecessary();
2388    cancelPollBuffering();
2389
2390    sp<AMessage> notify = mNotify->dup();
2391    notify->setInt32("what", kWhatError);
2392    notify->setInt32("err", err);
2393    notify->post();
2394}
2395
2396void LiveSession::postPrepared(status_t err) {
2397    CHECK(mInPreparationPhase);
2398
2399    sp<AMessage> notify = mNotify->dup();
2400    if (err == OK || err == ERROR_END_OF_STREAM) {
2401        notify->setInt32("what", kWhatPrepared);
2402    } else {
2403        cancelPollBuffering();
2404
2405        notify->setInt32("what", kWhatPreparationFailed);
2406        notify->setInt32("err", err);
2407    }
2408
2409    notify->post();
2410
2411    mInPreparationPhase = false;
2412}
2413
2414
2415}  // namespace android
2416
2417