1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22#include "HTTPDownloader.h"
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "mpeg2ts/AnotherPacketSource.h"
27
28#include <cutils/properties.h>
29#include <media/IMediaHTTPService.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/foundation/AUtils.h>
34#include <media/stagefright/MediaDefs.h>
35#include <media/stagefright/MetaData.h>
36#include <media/stagefright/Utils.h>
37
38#include <utils/Mutex.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42
43namespace android {
44
45// static
46// Bandwidth Switch Mark Defaults
47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52// Buffer Prepare/Ready/Underflow Marks
53const int64_t LiveSession::kReadyMarkUs = 5000000ll;
54const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
55const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
56
57struct LiveSession::BandwidthEstimator : public RefBase {
58    BandwidthEstimator();
59
60    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61    bool estimateBandwidth(
62            int32_t *bandwidth,
63            bool *isStable = NULL,
64            int32_t *shortTermBps = NULL);
65
66private:
67    // Bandwidth estimation parameters
68    static const int32_t kShortTermBandwidthItems = 3;
69    static const int32_t kMinBandwidthHistoryItems = 20;
70    static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
71    static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
72    static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
73
74    struct BandwidthEntry {
75        int64_t mTimestampUs;
76        int64_t mDelayUs;
77        size_t mNumBytes;
78    };
79
80    Mutex mLock;
81    List<BandwidthEntry> mBandwidthHistory;
82    List<int32_t> mPrevEstimates;
83    int32_t mShortTermEstimate;
84    bool mHasNewSample;
85    bool mIsStable;
86    int64_t mTotalTransferTimeUs;
87    size_t mTotalTransferBytes;
88
89    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
90};
91
92LiveSession::BandwidthEstimator::BandwidthEstimator() :
93    mShortTermEstimate(0),
94    mHasNewSample(false),
95    mIsStable(true),
96    mTotalTransferTimeUs(0),
97    mTotalTransferBytes(0) {
98}
99
100void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
101        size_t numBytes, int64_t delayUs) {
102    AutoMutex autoLock(mLock);
103
104    int64_t nowUs = ALooper::GetNowUs();
105    BandwidthEntry entry;
106    entry.mTimestampUs = nowUs;
107    entry.mDelayUs = delayUs;
108    entry.mNumBytes = numBytes;
109    mTotalTransferTimeUs += delayUs;
110    mTotalTransferBytes += numBytes;
111    mBandwidthHistory.push_back(entry);
112    mHasNewSample = true;
113
114    // Remove no more than 10% of total transfer time at a time
115    // to avoid sudden jump on bandwidth estimation. There might
116    // be long blocking reads that takes up signification time,
117    // we have to keep a longer window in that case.
118    int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
119    if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
120        bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
121    } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
122        bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
123    }
124    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
125    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
126    while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
127        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
128        // remove sample if either absolute age or total transfer time is
129        // over kMaxBandwidthHistoryWindowUs
130        if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
131                mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
132            break;
133        }
134        mTotalTransferTimeUs -= it->mDelayUs;
135        mTotalTransferBytes -= it->mNumBytes;
136        mBandwidthHistory.erase(mBandwidthHistory.begin());
137    }
138}
139
140bool LiveSession::BandwidthEstimator::estimateBandwidth(
141        int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
142    AutoMutex autoLock(mLock);
143
144    if (mBandwidthHistory.size() < 2) {
145        return false;
146    }
147
148    if (!mHasNewSample) {
149        *bandwidthBps = *(--mPrevEstimates.end());
150        if (isStable) {
151            *isStable = mIsStable;
152        }
153        if (shortTermBps) {
154            *shortTermBps = mShortTermEstimate;
155        }
156        return true;
157    }
158
159    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
160    mPrevEstimates.push_back(*bandwidthBps);
161    while (mPrevEstimates.size() > 3) {
162        mPrevEstimates.erase(mPrevEstimates.begin());
163    }
164    mHasNewSample = false;
165
166    int64_t totalTimeUs = 0;
167    size_t totalBytes = 0;
168    if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
169        List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
170        for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
171            totalTimeUs += it->mDelayUs;
172            totalBytes += it->mNumBytes;
173        }
174    }
175    mShortTermEstimate = totalTimeUs > 0 ?
176            (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
177    if (shortTermBps) {
178        *shortTermBps = mShortTermEstimate;
179    }
180
181    int32_t minEstimate = -1, maxEstimate = -1;
182    List<int32_t>::iterator it;
183    for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
184        int32_t estimate = *it;
185        if (minEstimate < 0 || minEstimate > estimate) {
186            minEstimate = estimate;
187        }
188        if (maxEstimate < 0 || maxEstimate < estimate) {
189            maxEstimate = estimate;
190        }
191    }
192    // consider it stable if long-term average is not jumping a lot
193    // and short-term average is not much lower than long-term average
194    mIsStable = (maxEstimate <= minEstimate * 4 / 3)
195            && mShortTermEstimate > minEstimate * 7 / 10;
196    if (isStable) {
197        *isStable = mIsStable;
198    }
199
200#if 0
201    {
202        char dumpStr[1024] = {0};
203        size_t itemIdx = 0;
204        size_t histSize = mBandwidthHistory.size();
205        sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
206            *bandwidthBps, mIsStable, histSize);
207        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
208        for (; it != mBandwidthHistory.end(); ++it) {
209            if (itemIdx > 50) {
210                sprintf(dumpStr + strlen(dumpStr),
211                        "...(%zd more items)... }", histSize - itemIdx);
212                break;
213            }
214            sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
215                it->mNumBytes / 1024,
216                (double)it->mDelayUs * 1.0e-6,
217                (it == (--mBandwidthHistory.end())) ? "}" : ", ");
218            itemIdx++;
219        }
220        ALOGE(dumpStr);
221    }
222#endif
223    return true;
224}
225
226//static
227const char *LiveSession::getKeyForStream(StreamType type) {
228    switch (type) {
229        case STREAMTYPE_VIDEO:
230            return "timeUsVideo";
231        case STREAMTYPE_AUDIO:
232            return "timeUsAudio";
233        case STREAMTYPE_SUBTITLES:
234            return "timeUsSubtitle";
235        case STREAMTYPE_METADATA:
236            return "timeUsMetadata"; // unused
237        default:
238            TRESPASS();
239    }
240    return NULL;
241}
242
243//static
244const char *LiveSession::getNameForStream(StreamType type) {
245    switch (type) {
246        case STREAMTYPE_VIDEO:
247            return "video";
248        case STREAMTYPE_AUDIO:
249            return "audio";
250        case STREAMTYPE_SUBTITLES:
251            return "subs";
252        case STREAMTYPE_METADATA:
253            return "metadata";
254        default:
255            break;
256    }
257    return "unknown";
258}
259
260//static
261ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
262    switch (type) {
263        case STREAMTYPE_VIDEO:
264            return ATSParser::VIDEO;
265        case STREAMTYPE_AUDIO:
266            return ATSParser::AUDIO;
267        case STREAMTYPE_METADATA:
268            return ATSParser::META;
269        case STREAMTYPE_SUBTITLES:
270        default:
271            TRESPASS();
272    }
273    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
274}
275
276LiveSession::LiveSession(
277        const sp<AMessage> &notify, uint32_t flags,
278        const sp<IMediaHTTPService> &httpService)
279    : mNotify(notify),
280      mFlags(flags),
281      mHTTPService(httpService),
282      mBuffering(false),
283      mInPreparationPhase(true),
284      mPollBufferingGeneration(0),
285      mPrevBufferPercentage(-1),
286      mCurBandwidthIndex(-1),
287      mOrigBandwidthIndex(-1),
288      mLastBandwidthBps(-1ll),
289      mLastBandwidthStable(false),
290      mBandwidthEstimator(new BandwidthEstimator()),
291      mMaxWidth(720),
292      mMaxHeight(480),
293      mStreamMask(0),
294      mNewStreamMask(0),
295      mSwapMask(0),
296      mSwitchGeneration(0),
297      mSubtitleGeneration(0),
298      mLastDequeuedTimeUs(0ll),
299      mRealTimeBaseUs(0ll),
300      mReconfigurationInProgress(false),
301      mSwitchInProgress(false),
302      mUpSwitchMark(kUpSwitchMarkUs),
303      mDownSwitchMark(kDownSwitchMarkUs),
304      mUpSwitchMargin(kUpSwitchMarginUs),
305      mFirstTimeUsValid(false),
306      mFirstTimeUs(0),
307      mLastSeekTimeUs(0),
308      mHasMetadata(false) {
309    mStreams[kAudioIndex] = StreamItem("audio");
310    mStreams[kVideoIndex] = StreamItem("video");
311    mStreams[kSubtitleIndex] = StreamItem("subtitles");
312
313    for (size_t i = 0; i < kNumSources; ++i) {
314        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
316    }
317}
318
319LiveSession::~LiveSession() {
320    if (mFetcherLooper != NULL) {
321        mFetcherLooper->stop();
322    }
323}
324
325int64_t LiveSession::calculateMediaTimeUs(
326        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
327    if (timeUs >= firstTimeUs) {
328        timeUs -= firstTimeUs;
329    } else {
330        timeUs = 0;
331    }
332    timeUs += mLastSeekTimeUs;
333    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
334        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
335    }
336    return timeUs;
337}
338
339status_t LiveSession::dequeueAccessUnit(
340        StreamType stream, sp<ABuffer> *accessUnit) {
341    status_t finalResult = OK;
342    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
343
344    ssize_t streamIdx = typeToIndex(stream);
345    if (streamIdx < 0) {
346        return BAD_VALUE;
347    }
348    const char *streamStr = getNameForStream(stream);
349    // Do not let client pull data if we don't have data packets yet.
350    // We might only have a format discontinuity queued without data.
351    // When NuPlayerDecoder dequeues the format discontinuity, it will
352    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
353    // thinks it can do seamless change, so will not shutdown decoder.
354    // When the actual format arrives, it can't handle it and get stuck.
355    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
356        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
357                streamStr, finalResult);
358
359        if (finalResult == OK) {
360            return -EAGAIN;
361        } else {
362            return finalResult;
363        }
364    }
365
366    // Let the client dequeue as long as we have buffers available
367    // Do not make pause/resume decisions here.
368
369    status_t err = packetSource->dequeueAccessUnit(accessUnit);
370
371    if (err == INFO_DISCONTINUITY) {
372        // adaptive streaming, discontinuities in the playlist
373        int32_t type;
374        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
375
376        sp<AMessage> extra;
377        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
378            extra.clear();
379        }
380
381        ALOGI("[%s] read discontinuity of type %d, extra = %s",
382              streamStr,
383              type,
384              extra == NULL ? "NULL" : extra->debugString().c_str());
385    } else if (err == OK) {
386
387        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
388            int64_t timeUs, originalTimeUs;
389            int32_t discontinuitySeq = 0;
390            StreamItem& strm = mStreams[streamIdx];
391            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
392            originalTimeUs = timeUs;
393            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
394            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
395                int64_t offsetTimeUs;
396                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
397                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
398                } else {
399                    offsetTimeUs = 0;
400                }
401
402                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
403                        && strm.mLastDequeuedTimeUs >= 0) {
404                    int64_t firstTimeUs;
405                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
406                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
407                    offsetTimeUs += strm.mLastSampleDurationUs;
408                } else {
409                    offsetTimeUs += strm.mLastSampleDurationUs;
410                }
411
412                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
413                strm.mCurDiscontinuitySeq = discontinuitySeq;
414            }
415
416            int32_t discard = 0;
417            int64_t firstTimeUs;
418            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
419                int64_t durUs; // approximate sample duration
420                if (timeUs > strm.mLastDequeuedTimeUs) {
421                    durUs = timeUs - strm.mLastDequeuedTimeUs;
422                } else {
423                    durUs = strm.mLastDequeuedTimeUs - timeUs;
424                }
425                strm.mLastSampleDurationUs = durUs;
426                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
427            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
428                firstTimeUs = timeUs;
429            } else {
430                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
431                firstTimeUs = timeUs;
432            }
433
434            strm.mLastDequeuedTimeUs = timeUs;
435            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
436
437            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
438                    streamStr, (long long)timeUs, (long long)originalTimeUs);
439            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
440            mLastDequeuedTimeUs = timeUs;
441            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
442        } else if (stream == STREAMTYPE_SUBTITLES) {
443            int32_t subtitleGeneration;
444            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
445                    && subtitleGeneration != mSubtitleGeneration) {
446               return -EAGAIN;
447            };
448            (*accessUnit)->meta()->setInt32(
449                    "trackIndex", mPlaylist->getSelectedIndex());
450            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
451        } else if (stream == STREAMTYPE_METADATA) {
452            HLSTime mdTime((*accessUnit)->meta());
453            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
454                packetSource->requeueAccessUnit((*accessUnit));
455                return -EAGAIN;
456            } else {
457                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
458                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
459                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
460                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
461            }
462        }
463    } else {
464        ALOGI("[%s] encountered error %d", streamStr, err);
465    }
466
467    return err;
468}
469
470status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
471    if (!(mStreamMask & stream)) {
472        return UNKNOWN_ERROR;
473    }
474
475    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
476
477    sp<MetaData> meta = packetSource->getFormat();
478
479    if (meta == NULL) {
480        return -EWOULDBLOCK;
481    }
482
483    if (stream == STREAMTYPE_AUDIO) {
484        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
485        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
486    } else if (stream == STREAMTYPE_VIDEO) {
487        meta->setInt32(kKeyMaxWidth, mMaxWidth);
488        meta->setInt32(kKeyMaxHeight, mMaxHeight);
489    }
490
491    return convertMetaDataToMessage(meta, format);
492}
493
494sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
495    return new HTTPDownloader(mHTTPService, mExtraHeaders);
496}
497
498void LiveSession::connectAsync(
499        const char *url, const KeyedVector<String8, String8> *headers) {
500    sp<AMessage> msg = new AMessage(kWhatConnect, this);
501    msg->setString("url", url);
502
503    if (headers != NULL) {
504        msg->setPointer(
505                "headers",
506                new KeyedVector<String8, String8>(*headers));
507    }
508
509    msg->post();
510}
511
512status_t LiveSession::disconnect() {
513    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
514
515    sp<AMessage> response;
516    status_t err = msg->postAndAwaitResponse(&response);
517
518    return err;
519}
520
521status_t LiveSession::seekTo(int64_t timeUs) {
522    sp<AMessage> msg = new AMessage(kWhatSeek, this);
523    msg->setInt64("timeUs", timeUs);
524
525    sp<AMessage> response;
526    status_t err = msg->postAndAwaitResponse(&response);
527
528    return err;
529}
530
531bool LiveSession::checkSwitchProgress(
532        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
533    AString newUri;
534    CHECK(stopParams->findString("uri", &newUri));
535
536    *needResumeUntil = false;
537    sp<AMessage> firstNewMeta[kMaxStreams];
538    for (size_t i = 0; i < kMaxStreams; ++i) {
539        StreamType stream = indexToType(i);
540        if (!(mSwapMask & mNewStreamMask & stream)
541            || (mStreams[i].mNewUri != newUri)) {
542            continue;
543        }
544        if (stream == STREAMTYPE_SUBTITLES) {
545            continue;
546        }
547        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
548
549        // First, get latest dequeued meta, which is where the decoder is at.
550        // (when upswitching, we take the meta after a certain delay, so that
551        // the decoder is left with some cushion)
552        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
553        if (delayUs > 0) {
554            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
555            if (lastDequeueMeta == NULL) {
556                // this means we don't have enough cushion, try again later
557                ALOGV("[%s] up switching failed due to insufficient buffer",
558                        getNameForStream(stream));
559                return false;
560            }
561        } else {
562            // It's okay for lastDequeueMeta to be NULL here, it means the
563            // decoder hasn't even started dequeueing
564            lastDequeueMeta = source->getLatestDequeuedMeta();
565        }
566        // Then, trim off packets at beginning of mPacketSources2 that's before
567        // the latest dequeued time. These samples are definitely too late.
568        firstNewMeta[i] = mPacketSources2.editValueAt(i)
569                            ->trimBuffersBeforeMeta(lastDequeueMeta);
570
571        // Now firstNewMeta[i] is the first sample after the trim.
572        // If it's NULL, we failed because dequeue already past all samples
573        // in mPacketSource2, we have to try again.
574        if (firstNewMeta[i] == NULL) {
575            HLSTime dequeueTime(lastDequeueMeta);
576            ALOGV("[%s] dequeue time (%d, %lld) past start time",
577                    getNameForStream(stream),
578                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
579            return false;
580        }
581
582        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
583        // already fetched, and see if we need to resumeUntil
584        lastEnqueueMeta = source->getLatestEnqueuedMeta();
585        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
586        // boundary, no need to resume as the content will look different anyways
587        if (lastEnqueueMeta != NULL) {
588            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
589
590            // no need to resume old fetcher if new fetcher started in different
591            // discontinuity sequence, as the content will look different.
592            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
593                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
594
595            // update the stopTime for resumeUntil
596            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
597            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
598        }
599    }
600
601    // if we're here, it means dequeue progress hasn't passed some samples in
602    // mPacketSource2, we can trim off the excess in mPacketSource.
603    // (old fetcher might still need to resumeUntil the start time of new fetcher)
604    for (size_t i = 0; i < kMaxStreams; ++i) {
605        StreamType stream = indexToType(i);
606        if (!(mSwapMask & mNewStreamMask & stream)
607            || (newUri != mStreams[i].mNewUri)
608            || stream == STREAMTYPE_SUBTITLES) {
609            continue;
610        }
611        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
612    }
613
614    // no resumeUntil if already underflow
615    *needResumeUntil &= !mBuffering;
616
617    return true;
618}
619
620void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
621    switch (msg->what()) {
622        case kWhatConnect:
623        {
624            onConnect(msg);
625            break;
626        }
627
628        case kWhatDisconnect:
629        {
630            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
631
632            if (mReconfigurationInProgress) {
633                break;
634            }
635
636            finishDisconnect();
637            break;
638        }
639
640        case kWhatSeek:
641        {
642            if (mReconfigurationInProgress) {
643                msg->post(50000);
644                break;
645            }
646
647            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
648            mSeekReply = new AMessage;
649
650            onSeek(msg);
651            break;
652        }
653
654        case kWhatFetcherNotify:
655        {
656            int32_t what;
657            CHECK(msg->findInt32("what", &what));
658
659            switch (what) {
660                case PlaylistFetcher::kWhatStarted:
661                    break;
662                case PlaylistFetcher::kWhatPaused:
663                case PlaylistFetcher::kWhatStopped:
664                {
665                    AString uri;
666                    CHECK(msg->findString("uri", &uri));
667                    ssize_t index = mFetcherInfos.indexOfKey(uri);
668                    if (index < 0) {
669                        // ignore msgs from fetchers that's already gone
670                        break;
671                    }
672
673                    ALOGV("fetcher-%d %s",
674                            mFetcherInfos[index].mFetcher->getFetcherID(),
675                            what == PlaylistFetcher::kWhatPaused ?
676                                    "paused" : "stopped");
677
678                    if (what == PlaylistFetcher::kWhatStopped) {
679                        mFetcherLooper->unregisterHandler(
680                                mFetcherInfos[index].mFetcher->id());
681                        mFetcherInfos.removeItemsAt(index);
682                    } else if (what == PlaylistFetcher::kWhatPaused) {
683                        int32_t seekMode;
684                        CHECK(msg->findInt32("seekMode", &seekMode));
685                        for (size_t i = 0; i < kMaxStreams; ++i) {
686                            if (mStreams[i].mUri == uri) {
687                                mStreams[i].mSeekMode = (SeekMode) seekMode;
688                            }
689                        }
690                    }
691
692                    if (mContinuation != NULL) {
693                        CHECK_GT(mContinuationCounter, 0);
694                        if (--mContinuationCounter == 0) {
695                            mContinuation->post();
696                        }
697                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
698                    }
699                    break;
700                }
701
702                case PlaylistFetcher::kWhatDurationUpdate:
703                {
704                    AString uri;
705                    CHECK(msg->findString("uri", &uri));
706
707                    int64_t durationUs;
708                    CHECK(msg->findInt64("durationUs", &durationUs));
709
710                    ssize_t index = mFetcherInfos.indexOfKey(uri);
711                    if (index >= 0) {
712                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
713                        info->mDurationUs = durationUs;
714                    }
715                    break;
716                }
717
718                case PlaylistFetcher::kWhatTargetDurationUpdate:
719                {
720                    int64_t targetDurationUs;
721                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
722                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
723                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
724                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
725                    break;
726                }
727
728                case PlaylistFetcher::kWhatError:
729                {
730                    status_t err;
731                    CHECK(msg->findInt32("err", &err));
732
733                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
734
735                    // handle EOS on subtitle tracks independently
736                    AString uri;
737                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
738                        ssize_t i = mFetcherInfos.indexOfKey(uri);
739                        if (i >= 0) {
740                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
741                            if (fetcher != NULL) {
742                                uint32_t type = fetcher->getStreamTypeMask();
743                                if (type == STREAMTYPE_SUBTITLES) {
744                                    mPacketSources.valueFor(
745                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
746                                    break;
747                                }
748                            }
749                        }
750                    }
751
752                    // remember the failure index (as mCurBandwidthIndex will be restored
753                    // after cancelBandwidthSwitch()), and record last fail time
754                    size_t failureIndex = mCurBandwidthIndex;
755                    mBandwidthItems.editItemAt(
756                            failureIndex).mLastFailureUs = ALooper::GetNowUs();
757
758                    if (mSwitchInProgress) {
759                        // if error happened when we switch to a variant, try fallback
760                        // to other variant to save the session
761                        if (tryBandwidthFallback()) {
762                            break;
763                        }
764                    }
765
766                    if (mInPreparationPhase) {
767                        postPrepared(err);
768                    }
769
770                    cancelBandwidthSwitch();
771
772                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
773
774                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
775
776                    mPacketSources.valueFor(
777                            STREAMTYPE_SUBTITLES)->signalEOS(err);
778
779                    postError(err);
780                    break;
781                }
782
783                case PlaylistFetcher::kWhatStopReached:
784                {
785                    ALOGV("kWhatStopReached");
786
787                    AString oldUri;
788                    CHECK(msg->findString("uri", &oldUri));
789
790                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
791                    if (index < 0) {
792                        break;
793                    }
794
795                    tryToFinishBandwidthSwitch(oldUri);
796                    break;
797                }
798
799                case PlaylistFetcher::kWhatStartedAt:
800                {
801                    int32_t switchGeneration;
802                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
803
804                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
805                            switchGeneration, mSwitchGeneration);
806
807                    if (switchGeneration != mSwitchGeneration) {
808                        break;
809                    }
810
811                    AString uri;
812                    CHECK(msg->findString("uri", &uri));
813
814                    // mark new fetcher mToBeResumed
815                    ssize_t index = mFetcherInfos.indexOfKey(uri);
816                    if (index >= 0) {
817                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
818                    }
819
820                    // temporarily disable packet sources to be swapped to prevent
821                    // NuPlayerDecoder from dequeuing while we check progress
822                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
823                        if ((mSwapMask & mPacketSources.keyAt(i))
824                                && uri == mStreams[i].mNewUri) {
825                            mPacketSources.editValueAt(i)->enable(false);
826                        }
827                    }
828                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
829                    // If switching up, require a cushion bigger than kUnderflowMark
830                    // to avoid buffering immediately after the switch.
831                    // (If we don't have that cushion we'd rather cancel and try again.)
832                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
833                    bool needResumeUntil = false;
834                    sp<AMessage> stopParams = msg;
835                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
836                        // playback time hasn't passed startAt time
837                        if (!needResumeUntil) {
838                            ALOGV("finish switch");
839                            for (size_t i = 0; i < kMaxStreams; ++i) {
840                                if ((mSwapMask & indexToType(i))
841                                        && uri == mStreams[i].mNewUri) {
842                                    // have to make a copy of mStreams[i].mUri because
843                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
844                                    AString oldURI = mStreams[i].mUri;
845                                    tryToFinishBandwidthSwitch(oldURI);
846                                    break;
847                                }
848                            }
849                        } else {
850                            // startAt time is after last enqueue time
851                            // Resume fetcher for the original variant; the resumed fetcher should
852                            // continue until the timestamps found in msg, which is stored by the
853                            // new fetcher to indicate where the new variant has started buffering.
854                            ALOGV("finish switch with resumeUntilAsync");
855                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
856                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
857                                if (info.mToBeRemoved) {
858                                    info.mFetcher->resumeUntilAsync(stopParams);
859                                }
860                            }
861                        }
862                    } else {
863                        // playback time passed startAt time
864                        if (switchUp) {
865                            // if switching up, cancel and retry if condition satisfies again
866                            ALOGV("cancel up switch because we're too late");
867                            cancelBandwidthSwitch(true /* resume */);
868                        } else {
869                            ALOGV("retry down switch at next sample");
870                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
871                        }
872                    }
873                    // re-enable all packet sources
874                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
875                        mPacketSources.editValueAt(i)->enable(true);
876                    }
877
878                    break;
879                }
880
881                case PlaylistFetcher::kWhatPlaylistFetched:
882                {
883                    onMasterPlaylistFetched(msg);
884                    break;
885                }
886
887                case PlaylistFetcher::kWhatMetadataDetected:
888                {
889                    if (!mHasMetadata) {
890                        mHasMetadata = true;
891                        sp<AMessage> notify = mNotify->dup();
892                        notify->setInt32("what", kWhatMetadataDetected);
893                        notify->post();
894                    }
895                    break;
896                }
897
898                default:
899                    TRESPASS();
900            }
901
902            break;
903        }
904
905        case kWhatChangeConfiguration:
906        {
907            onChangeConfiguration(msg);
908            break;
909        }
910
911        case kWhatChangeConfiguration2:
912        {
913            onChangeConfiguration2(msg);
914            break;
915        }
916
917        case kWhatChangeConfiguration3:
918        {
919            onChangeConfiguration3(msg);
920            break;
921        }
922
923        case kWhatPollBuffering:
924        {
925            int32_t generation;
926            CHECK(msg->findInt32("generation", &generation));
927            if (generation == mPollBufferingGeneration) {
928                onPollBuffering();
929            }
930            break;
931        }
932
933        default:
934            TRESPASS();
935            break;
936    }
937}
938
939// static
940bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
941    static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
942    return item.mLastFailureUs < 0
943            || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
944}
945
946// static
947int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
948    if (a->mBandwidth < b->mBandwidth) {
949        return -1;
950    } else if (a->mBandwidth == b->mBandwidth) {
951        return 0;
952    }
953
954    return 1;
955}
956
957// static
958LiveSession::StreamType LiveSession::indexToType(int idx) {
959    CHECK(idx >= 0 && idx < kNumSources);
960    return (StreamType)(1 << idx);
961}
962
963// static
964ssize_t LiveSession::typeToIndex(int32_t type) {
965    switch (type) {
966        case STREAMTYPE_AUDIO:
967            return 0;
968        case STREAMTYPE_VIDEO:
969            return 1;
970        case STREAMTYPE_SUBTITLES:
971            return 2;
972        case STREAMTYPE_METADATA:
973            return 3;
974        default:
975            return -1;
976    };
977    return -1;
978}
979
980void LiveSession::onConnect(const sp<AMessage> &msg) {
981    CHECK(msg->findString("url", &mMasterURL));
982
983    // TODO currently we don't know if we are coming here from incognito mode
984    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
985
986    KeyedVector<String8, String8> *headers = NULL;
987    if (!msg->findPointer("headers", (void **)&headers)) {
988        mExtraHeaders.clear();
989    } else {
990        mExtraHeaders = *headers;
991
992        delete headers;
993        headers = NULL;
994    }
995
996    // create looper for fetchers
997    if (mFetcherLooper == NULL) {
998        mFetcherLooper = new ALooper();
999
1000        mFetcherLooper->setName("Fetcher");
1001        mFetcherLooper->start(false, false);
1002    }
1003
1004    // create fetcher to fetch the master playlist
1005    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1006}
1007
1008void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1009    AString uri;
1010    CHECK(msg->findString("uri", &uri));
1011    ssize_t index = mFetcherInfos.indexOfKey(uri);
1012    if (index < 0) {
1013        ALOGW("fetcher for master playlist is gone.");
1014        return;
1015    }
1016
1017    // no longer useful, remove
1018    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1019    mFetcherInfos.removeItemsAt(index);
1020
1021    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1022    if (mPlaylist == NULL) {
1023        ALOGE("unable to fetch master playlist %s.",
1024                uriDebugString(mMasterURL).c_str());
1025
1026        postPrepared(ERROR_IO);
1027        return;
1028    }
1029    // We trust the content provider to make a reasonable choice of preferred
1030    // initial bandwidth by listing it first in the variant playlist.
1031    // At startup we really don't have a good estimate on the available
1032    // network bandwidth since we haven't tranferred any data yet. Once
1033    // we have we can make a better informed choice.
1034    size_t initialBandwidth = 0;
1035    size_t initialBandwidthIndex = 0;
1036
1037    int32_t maxWidth = 0;
1038    int32_t maxHeight = 0;
1039
1040    if (mPlaylist->isVariantPlaylist()) {
1041        Vector<BandwidthItem> itemsWithVideo;
1042        for (size_t i = 0; i < mPlaylist->size(); ++i) {
1043            BandwidthItem item;
1044
1045            item.mPlaylistIndex = i;
1046            item.mLastFailureUs = -1ll;
1047
1048            sp<AMessage> meta;
1049            AString uri;
1050            mPlaylist->itemAt(i, &uri, &meta);
1051
1052            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1053
1054            int32_t width, height;
1055            if (meta->findInt32("width", &width)) {
1056                maxWidth = max(maxWidth, width);
1057            }
1058            if (meta->findInt32("height", &height)) {
1059                maxHeight = max(maxHeight, height);
1060            }
1061
1062            mBandwidthItems.push(item);
1063            if (mPlaylist->hasType(i, "video")) {
1064                itemsWithVideo.push(item);
1065            }
1066        }
1067        // remove the audio-only variants if we have at least one with video
1068        if (!itemsWithVideo.empty()
1069                && itemsWithVideo.size() < mBandwidthItems.size()) {
1070            mBandwidthItems.clear();
1071            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1072                mBandwidthItems.push(itemsWithVideo[i]);
1073            }
1074        }
1075
1076        CHECK_GT(mBandwidthItems.size(), 0u);
1077        initialBandwidth = mBandwidthItems[0].mBandwidth;
1078
1079        mBandwidthItems.sort(SortByBandwidth);
1080
1081        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1082            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1083                initialBandwidthIndex = i;
1084                break;
1085            }
1086        }
1087    } else {
1088        // dummy item.
1089        BandwidthItem item;
1090        item.mPlaylistIndex = 0;
1091        item.mBandwidth = 0;
1092        mBandwidthItems.push(item);
1093    }
1094
1095    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1096    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1097
1098    mPlaylist->pickRandomMediaItems();
1099    changeConfiguration(
1100            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1101}
1102
1103void LiveSession::finishDisconnect() {
1104    ALOGV("finishDisconnect");
1105
1106    // No reconfiguration is currently pending, make sure none will trigger
1107    // during disconnection either.
1108    cancelBandwidthSwitch();
1109
1110    // cancel buffer polling
1111    cancelPollBuffering();
1112
1113    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1114    //
1115    // Some fetchers might be stuck in connect/getSize at this point. These
1116    // operations will eventually timeout (as we have a timeout set in
1117    // MediaHTTPConnection), but we don't want to block the main UI thread
1118    // until then. Here we just need to make sure we clear all references
1119    // to the fetchers, so that when they finally exit from the blocking
1120    // operation, they can be destructed.
1121    //
1122    // There is one very tricky point though. For this scheme to work, the
1123    // fecther must hold a reference to LiveSession, so that LiveSession is
1124    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1125    // own destructor when it waits for mFetcherLooper to stop, which still
1126    // blocks main UI thread.
1127    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1128        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1129        mFetcherLooper->unregisterHandler(
1130                mFetcherInfos.valueAt(i).mFetcher->id());
1131    }
1132    mFetcherInfos.clear();
1133
1134    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1135    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1136
1137    mPacketSources.valueFor(
1138            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1139
1140    sp<AMessage> response = new AMessage;
1141    response->setInt32("err", OK);
1142
1143    response->postReply(mDisconnectReplyID);
1144    mDisconnectReplyID.clear();
1145}
1146
1147sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1148    ssize_t index = mFetcherInfos.indexOfKey(uri);
1149
1150    if (index >= 0) {
1151        return NULL;
1152    }
1153
1154    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1155    notify->setString("uri", uri);
1156    notify->setInt32("switchGeneration", mSwitchGeneration);
1157
1158    FetcherInfo info;
1159    info.mFetcher = new PlaylistFetcher(
1160            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1161    info.mDurationUs = -1ll;
1162    info.mToBeRemoved = false;
1163    info.mToBeResumed = false;
1164    mFetcherLooper->registerHandler(info.mFetcher);
1165
1166    mFetcherInfos.add(uri, info);
1167
1168    return info.mFetcher;
1169}
1170
1171#if 0
1172static double uniformRand() {
1173    return (double)rand() / RAND_MAX;
1174}
1175#endif
1176
1177bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1178    ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1179            newUri ? "true" : "false",
1180            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1181    return i >= 0
1182            && ((!newUri && uri == mStreams[i].mUri)
1183            || (newUri && uri == mStreams[i].mNewUri));
1184}
1185
1186sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1187        size_t trackIndex, bool newUri) {
1188    StreamType type = indexToType(trackIndex);
1189    sp<AnotherPacketSource> source = NULL;
1190    if (newUri) {
1191        source = mPacketSources2.valueFor(type);
1192        source->clear();
1193    } else {
1194        source = mPacketSources.valueFor(type);
1195    };
1196    return source;
1197}
1198
1199sp<AnotherPacketSource> LiveSession::getMetadataSource(
1200        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1201    // todo: One case where the following strategy can fail is when audio and video
1202    // are in separate playlists, both are transport streams, and the metadata
1203    // is actually contained in the audio stream.
1204    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1205            streamMask, newUri ? "true" : "false");
1206
1207    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1208            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1209            // ... audio fetcher for audio only variant
1210        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1211    }
1212
1213    return NULL;
1214}
1215
1216bool LiveSession::resumeFetcher(
1217        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1218    ssize_t index = mFetcherInfos.indexOfKey(uri);
1219    if (index < 0) {
1220        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1221        return false;
1222    }
1223
1224    bool resume = false;
1225    sp<AnotherPacketSource> sources[kNumSources];
1226    for (size_t i = 0; i < kMaxStreams; ++i) {
1227        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1228            resume = true;
1229            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1230        }
1231    }
1232
1233    if (resume) {
1234        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1235        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1236
1237        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1238                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1239
1240        fetcher->startAsync(
1241                sources[kAudioIndex],
1242                sources[kVideoIndex],
1243                sources[kSubtitleIndex],
1244                getMetadataSource(sources, streamMask, newUri),
1245                timeUs, -1, -1, seekMode);
1246    }
1247
1248    return resume;
1249}
1250
1251float LiveSession::getAbortThreshold(
1252        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1253    float abortThreshold = -1.0f;
1254    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1255        /*
1256           If we're switching down, we need to decide whether to
1257
1258           1) finish last segment of high-bandwidth variant, or
1259           2) abort last segment of high-bandwidth variant, and fetch an
1260              overlapping portion from low-bandwidth variant.
1261
1262           Here we try to maximize the amount of buffer left when the
1263           switch point is met. Given the following parameters:
1264
1265           B: our current buffering level in seconds
1266           T: target duration in seconds
1267           X: sample duration in seconds remain to fetch in last segment
1268           bw0: bandwidth of old variant (as specified in playlist)
1269           bw1: bandwidth of new variant (as specified in playlist)
1270           bw: measured bandwidth available
1271
1272           If we choose 1), when switch happens at the end of current
1273           segment, our buffering will be
1274                  B + X - X * bw0 / bw
1275
1276           If we choose 2), when switch happens where we aborted current
1277           segment, our buffering will be
1278                  B - (T - X) * bw1 / bw
1279
1280           We should only choose 1) if
1281                  X/T < bw1 / (bw1 + bw0 - bw)
1282        */
1283
1284        // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1285        // our estimate could be far off, and fetching old bandwidth could
1286        // take too long.
1287        if (!mLastBandwidthStable) {
1288            return 0.0f;
1289        }
1290
1291        // Taking the measured current bandwidth at 50% face value only,
1292        // as our bandwidth estimation is a lagging indicator. Being
1293        // conservative on this, we prefer switching to lower bandwidth
1294        // unless we're really confident finishing up the last segment
1295        // of higher bandwidth will be fast.
1296        CHECK(mLastBandwidthBps >= 0);
1297        abortThreshold =
1298                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1299             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1300              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1301              - (float)mLastBandwidthBps * 0.5f);
1302        if (abortThreshold < 0.0f) {
1303            abortThreshold = -1.0f; // do not abort
1304        }
1305        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1306                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1307                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1308                mLastBandwidthBps,
1309                abortThreshold);
1310    }
1311    return abortThreshold;
1312}
1313
1314void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1315    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1316}
1317
1318ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1319    for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1320        if (isBandwidthValid(mBandwidthItems[index])) {
1321            return index;
1322        }
1323    }
1324    // if playlists are all blacklisted, return 0 and hope it's alive
1325    return 0;
1326}
1327
1328size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1329    if (mBandwidthItems.size() < 2) {
1330        // shouldn't be here if we only have 1 bandwidth, check
1331        // logic to get rid of redundant bandwidth polling
1332        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1333        return 0;
1334    }
1335
1336#if 1
1337    char value[PROPERTY_VALUE_MAX];
1338    ssize_t index = -1;
1339    if (property_get("media.httplive.bw-index", value, NULL)) {
1340        char *end;
1341        index = strtol(value, &end, 10);
1342        CHECK(end > value && *end == '\0');
1343
1344        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1345            index = mBandwidthItems.size() - 1;
1346        }
1347    }
1348
1349    if (index < 0) {
1350        char value[PROPERTY_VALUE_MAX];
1351        if (property_get("media.httplive.max-bw", value, NULL)) {
1352            char *end;
1353            long maxBw = strtoul(value, &end, 10);
1354            if (end > value && *end == '\0') {
1355                if (maxBw > 0 && bandwidthBps > maxBw) {
1356                    ALOGV("bandwidth capped to %ld bps", maxBw);
1357                    bandwidthBps = maxBw;
1358                }
1359            }
1360        }
1361
1362        // Pick the highest bandwidth stream that's not currently blacklisted
1363        // below or equal to estimated bandwidth.
1364
1365        index = mBandwidthItems.size() - 1;
1366        ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1367        while (index > lowestBandwidth) {
1368            // be conservative (70%) to avoid overestimating and immediately
1369            // switching down again.
1370            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1371            const BandwidthItem &item = mBandwidthItems[index];
1372            if (item.mBandwidth <= adjustedBandwidthBps
1373                    && isBandwidthValid(item)) {
1374                break;
1375            }
1376            --index;
1377        }
1378    }
1379#elif 0
1380    // Change bandwidth at random()
1381    size_t index = uniformRand() * mBandwidthItems.size();
1382#elif 0
1383    // There's a 50% chance to stay on the current bandwidth and
1384    // a 50% chance to switch to the next higher bandwidth (wrapping around
1385    // to lowest)
1386    const size_t kMinIndex = 0;
1387
1388    static ssize_t mCurBandwidthIndex = -1;
1389
1390    size_t index;
1391    if (mCurBandwidthIndex < 0) {
1392        index = kMinIndex;
1393    } else if (uniformRand() < 0.5) {
1394        index = (size_t)mCurBandwidthIndex;
1395    } else {
1396        index = mCurBandwidthIndex + 1;
1397        if (index == mBandwidthItems.size()) {
1398            index = kMinIndex;
1399        }
1400    }
1401    mCurBandwidthIndex = index;
1402#elif 0
1403    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1404
1405    size_t index = mBandwidthItems.size() - 1;
1406    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1407        --index;
1408    }
1409#elif 1
1410    char value[PROPERTY_VALUE_MAX];
1411    size_t index;
1412    if (property_get("media.httplive.bw-index", value, NULL)) {
1413        char *end;
1414        index = strtoul(value, &end, 10);
1415        CHECK(end > value && *end == '\0');
1416
1417        if (index >= mBandwidthItems.size()) {
1418            index = mBandwidthItems.size() - 1;
1419        }
1420    } else {
1421        index = 0;
1422    }
1423#else
1424    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1425#endif
1426
1427    CHECK_GE(index, 0);
1428
1429    return index;
1430}
1431
1432HLSTime LiveSession::latestMediaSegmentStartTime() const {
1433    HLSTime audioTime(mPacketSources.valueFor(
1434                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1435
1436    HLSTime videoTime(mPacketSources.valueFor(
1437                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1438
1439    return audioTime < videoTime ? videoTime : audioTime;
1440}
1441
1442void LiveSession::onSeek(const sp<AMessage> &msg) {
1443    int64_t timeUs;
1444    CHECK(msg->findInt64("timeUs", &timeUs));
1445    changeConfiguration(timeUs);
1446}
1447
1448status_t LiveSession::getDuration(int64_t *durationUs) const {
1449    int64_t maxDurationUs = -1ll;
1450    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1451        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1452
1453        if (fetcherDurationUs > maxDurationUs) {
1454            maxDurationUs = fetcherDurationUs;
1455        }
1456    }
1457
1458    *durationUs = maxDurationUs;
1459
1460    return OK;
1461}
1462
1463bool LiveSession::isSeekable() const {
1464    int64_t durationUs;
1465    return getDuration(&durationUs) == OK && durationUs >= 0;
1466}
1467
1468bool LiveSession::hasDynamicDuration() const {
1469    return false;
1470}
1471
1472size_t LiveSession::getTrackCount() const {
1473    if (mPlaylist == NULL) {
1474        return 0;
1475    } else {
1476        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1477    }
1478}
1479
1480sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1481    if (mPlaylist == NULL) {
1482        return NULL;
1483    } else {
1484        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1485            sp<AMessage> format = new AMessage();
1486            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1487            format->setString("language", "und");
1488            format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1489            return format;
1490        }
1491        return mPlaylist->getTrackInfo(trackIndex);
1492    }
1493}
1494
1495status_t LiveSession::selectTrack(size_t index, bool select) {
1496    if (mPlaylist == NULL) {
1497        return INVALID_OPERATION;
1498    }
1499
1500    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1501            index, select, mSubtitleGeneration);
1502
1503    ++mSubtitleGeneration;
1504    status_t err = mPlaylist->selectTrack(index, select);
1505    if (err == OK) {
1506        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1507        msg->setInt32("pickTrack", select);
1508        msg->post();
1509    }
1510    return err;
1511}
1512
1513ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1514    if (mPlaylist == NULL) {
1515        return -1;
1516    } else {
1517        return mPlaylist->getSelectedTrack(type);
1518    }
1519}
1520
1521void LiveSession::changeConfiguration(
1522        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1523    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1524          (long long)timeUs, bandwidthIndex, pickTrack);
1525
1526    cancelBandwidthSwitch();
1527
1528    CHECK(!mReconfigurationInProgress);
1529    mReconfigurationInProgress = true;
1530    if (bandwidthIndex >= 0) {
1531        mOrigBandwidthIndex = mCurBandwidthIndex;
1532        mCurBandwidthIndex = bandwidthIndex;
1533        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1534            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1535                    mOrigBandwidthIndex, mCurBandwidthIndex);
1536        }
1537    }
1538    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1539    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1540
1541    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1542    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1543
1544    AString URIs[kMaxStreams];
1545    for (size_t i = 0; i < kMaxStreams; ++i) {
1546        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1547            streamMask |= indexToType(i);
1548        }
1549    }
1550
1551    // Step 1, stop and discard fetchers that are no longer needed.
1552    // Pause those that we'll reuse.
1553    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1554        // skip fetchers that are marked mToBeRemoved,
1555        // these are done and can't be reused
1556        if (mFetcherInfos[i].mToBeRemoved) {
1557            continue;
1558        }
1559
1560        const AString &uri = mFetcherInfos.keyAt(i);
1561        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1562
1563        bool discardFetcher = true, delayRemoval = false;
1564        for (size_t j = 0; j < kMaxStreams; ++j) {
1565            StreamType type = indexToType(j);
1566            if ((streamMask & type) && uri == URIs[j]) {
1567                resumeMask |= type;
1568                streamMask &= ~type;
1569                discardFetcher = false;
1570            }
1571        }
1572        // Delay fetcher removal if not picking tracks, AND old fetcher
1573        // has stream mask that overlaps new variant. (Okay to discard
1574        // old fetcher now, if completely no overlap.)
1575        if (discardFetcher && timeUs < 0ll && !pickTrack
1576                && (fetcher->getStreamTypeMask() & streamMask)) {
1577            discardFetcher = false;
1578            delayRemoval = true;
1579        }
1580
1581        if (discardFetcher) {
1582            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1583            fetcher->stopAsync();
1584        } else {
1585            float threshold = 0.0f; // default to pause after current block (47Kbytes)
1586            bool disconnect = false;
1587            if (timeUs >= 0ll) {
1588                // seeking, no need to finish fetching
1589                disconnect = true;
1590            } else if (delayRemoval) {
1591                // adapting, abort if remaining of current segment is over threshold
1592                threshold = getAbortThreshold(
1593                        mOrigBandwidthIndex, mCurBandwidthIndex);
1594            }
1595
1596            ALOGV("pausing fetcher-%d, threshold=%.2f",
1597                    fetcher->getFetcherID(), threshold);
1598            fetcher->pauseAsync(threshold, disconnect);
1599        }
1600    }
1601
1602    sp<AMessage> msg;
1603    if (timeUs < 0ll) {
1604        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1605        msg = new AMessage(kWhatChangeConfiguration3, this);
1606    } else {
1607        msg = new AMessage(kWhatChangeConfiguration2, this);
1608    }
1609    msg->setInt32("streamMask", streamMask);
1610    msg->setInt32("resumeMask", resumeMask);
1611    msg->setInt32("pickTrack", pickTrack);
1612    msg->setInt64("timeUs", timeUs);
1613    for (size_t i = 0; i < kMaxStreams; ++i) {
1614        if ((streamMask | resumeMask) & indexToType(i)) {
1615            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1616        }
1617    }
1618
1619    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1620    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1621    // fetchers have completed their asynchronous operation, we'll post
1622    // mContinuation, which then is handled below in onChangeConfiguration2.
1623    mContinuationCounter = mFetcherInfos.size();
1624    mContinuation = msg;
1625
1626    if (mContinuationCounter == 0) {
1627        msg->post();
1628    }
1629}
1630
1631void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1632    ALOGV("onChangeConfiguration");
1633
1634    if (!mReconfigurationInProgress) {
1635        int32_t pickTrack = 0;
1636        msg->findInt32("pickTrack", &pickTrack);
1637        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1638    } else {
1639        msg->post(1000000ll); // retry in 1 sec
1640    }
1641}
1642
1643void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1644    ALOGV("onChangeConfiguration2");
1645
1646    mContinuation.clear();
1647
1648    // All fetchers are either suspended or have been removed now.
1649
1650    // If we're seeking, clear all packet sources before we report
1651    // seek complete, to prevent decoder from pulling stale data.
1652    int64_t timeUs;
1653    CHECK(msg->findInt64("timeUs", &timeUs));
1654
1655    if (timeUs >= 0) {
1656        mLastSeekTimeUs = timeUs;
1657        mLastDequeuedTimeUs = timeUs;
1658
1659        for (size_t i = 0; i < mPacketSources.size(); i++) {
1660            sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1661            sp<MetaData> format = packetSource->getFormat();
1662            packetSource->clear();
1663            // Set a tentative format here such that HTTPLiveSource will always have
1664            // a format available when NuPlayer queries. Without an available video
1665            // format when setting a surface NuPlayer might disable video decoding
1666            // altogether. The tentative format will be overwritten by the
1667            // authoritative (and possibly same) format once content from the new
1668            // position is dequeued.
1669            packetSource->setFormat(format);
1670        }
1671
1672        for (size_t i = 0; i < kMaxStreams; ++i) {
1673            mStreams[i].reset();
1674        }
1675
1676        mDiscontinuityOffsetTimesUs.clear();
1677        mDiscontinuityAbsStartTimesUs.clear();
1678
1679        if (mSeekReplyID != NULL) {
1680            CHECK(mSeekReply != NULL);
1681            mSeekReply->setInt32("err", OK);
1682            mSeekReply->postReply(mSeekReplyID);
1683            mSeekReplyID.clear();
1684            mSeekReply.clear();
1685        }
1686
1687        // restart buffer polling after seek becauese previous
1688        // buffering position is no longer valid.
1689        restartPollBuffering();
1690    }
1691
1692    uint32_t streamMask, resumeMask;
1693    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1694    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1695
1696    streamMask |= resumeMask;
1697
1698    AString URIs[kMaxStreams];
1699    for (size_t i = 0; i < kMaxStreams; ++i) {
1700        if (streamMask & indexToType(i)) {
1701            const AString &uriKey = mStreams[i].uriKey();
1702            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1703            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1704        }
1705    }
1706
1707    uint32_t changedMask = 0;
1708    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1709        // stream URI could change even if onChangeConfiguration2 is only
1710        // used for seek. Seek could happen during a bw switch, in this
1711        // case bw switch will be cancelled, but the seekTo position will
1712        // fetch from the new URI.
1713        if ((mStreamMask & streamMask & indexToType(i))
1714                && !mStreams[i].mUri.empty()
1715                && !(URIs[i] == mStreams[i].mUri)) {
1716            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1717                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1718            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1719            if (source->getLatestDequeuedMeta() != NULL) {
1720                source->queueDiscontinuity(
1721                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1722            }
1723        }
1724        // Determine which decoders to shutdown on the player side,
1725        // a decoder has to be shutdown if its streamtype was active
1726        // before but now longer isn't.
1727        if ((mStreamMask & ~streamMask & indexToType(i))) {
1728            changedMask |= indexToType(i);
1729        }
1730    }
1731
1732    if (changedMask == 0) {
1733        // If nothing changed as far as the audio/video decoders
1734        // are concerned we can proceed.
1735        onChangeConfiguration3(msg);
1736        return;
1737    }
1738
1739    // Something changed, inform the player which will shutdown the
1740    // corresponding decoders and will post the reply once that's done.
1741    // Handling the reply will continue executing below in
1742    // onChangeConfiguration3.
1743    sp<AMessage> notify = mNotify->dup();
1744    notify->setInt32("what", kWhatStreamsChanged);
1745    notify->setInt32("changedMask", changedMask);
1746
1747    msg->setWhat(kWhatChangeConfiguration3);
1748    msg->setTarget(this);
1749
1750    notify->setMessage("reply", msg);
1751    notify->post();
1752}
1753
1754void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1755    mContinuation.clear();
1756    // All remaining fetchers are still suspended, the player has shutdown
1757    // any decoders that needed it.
1758
1759    uint32_t streamMask, resumeMask;
1760    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1761    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1762
1763    mNewStreamMask = streamMask | resumeMask;
1764
1765    int64_t timeUs;
1766    int32_t pickTrack;
1767    bool switching = false;
1768    CHECK(msg->findInt64("timeUs", &timeUs));
1769    CHECK(msg->findInt32("pickTrack", &pickTrack));
1770
1771    if (timeUs < 0ll) {
1772        if (!pickTrack) {
1773            // mSwapMask contains streams that are in both old and new variant,
1774            // (in mNewStreamMask & mStreamMask) but with different URIs
1775            // (not in resumeMask).
1776            // For example, old variant has video and audio in two separate
1777            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1778            // should be 0 as there is nothing to swap. We only need to stop video,
1779            // and resume audio.
1780            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1781            switching = (mSwapMask != 0);
1782        }
1783        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1784    } else {
1785        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1786    }
1787
1788    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1789            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1790            (long long)timeUs, switching, pickTrack,
1791            mStreamMask, mNewStreamMask, mSwapMask);
1792
1793    for (size_t i = 0; i < kMaxStreams; ++i) {
1794        if (streamMask & indexToType(i)) {
1795            if (switching) {
1796                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1797            } else {
1798                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1799            }
1800        }
1801    }
1802
1803    // Of all existing fetchers:
1804    // * Resume fetchers that are still needed and assign them original packet sources.
1805    // * Mark otherwise unneeded fetchers for removal.
1806    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1807    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1808        const AString &uri = mFetcherInfos.keyAt(i);
1809        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1810            ALOGV("marking fetcher-%d to be removed",
1811                    mFetcherInfos[i].mFetcher->getFetcherID());
1812
1813            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1814        }
1815    }
1816
1817    // streamMask now only contains the types that need a new fetcher created.
1818    if (streamMask != 0) {
1819        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1820    }
1821
1822    // Find out when the original fetchers have buffered up to and start the new fetchers
1823    // at a later timestamp.
1824    for (size_t i = 0; i < kMaxStreams; i++) {
1825        if (!(indexToType(i) & streamMask)) {
1826            continue;
1827        }
1828
1829        AString uri;
1830        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1831
1832        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1833        CHECK(fetcher != NULL);
1834
1835        HLSTime startTime;
1836        SeekMode seekMode = kSeekModeExactPosition;
1837        sp<AnotherPacketSource> sources[kNumSources];
1838
1839        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1840            startTime = latestMediaSegmentStartTime();
1841        }
1842
1843        // TRICKY: looping from i as earlier streams are already removed from streamMask
1844        for (size_t j = i; j < kMaxStreams; ++j) {
1845            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1846            if ((streamMask & indexToType(j)) && uri == streamUri) {
1847                sources[j] = mPacketSources.valueFor(indexToType(j));
1848
1849                if (timeUs >= 0) {
1850                    startTime.mTimeUs = timeUs;
1851                } else {
1852                    int32_t type;
1853                    sp<AMessage> meta;
1854                    if (!switching) {
1855                        // selecting, or adapting but no swap required
1856                        meta = sources[j]->getLatestDequeuedMeta();
1857                    } else {
1858                        // adapting and swap required
1859                        meta = sources[j]->getLatestEnqueuedMeta();
1860                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1861                            // switching up
1862                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1863                        }
1864                    }
1865
1866                    if ((j == kAudioIndex || j == kVideoIndex)
1867                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1868                        HLSTime tmpTime(meta);
1869                        if (startTime < tmpTime) {
1870                            startTime = tmpTime;
1871                        }
1872                    }
1873
1874                    if (!switching) {
1875                        // selecting, or adapting but no swap required
1876                        sources[j]->clear();
1877                        if (j == kSubtitleIndex) {
1878                            break;
1879                        }
1880
1881                        ALOGV("stream[%zu]: queue format change", j);
1882                        sources[j]->queueDiscontinuity(
1883                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1884                    } else {
1885                        // switching, queue discontinuities after resume
1886                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1887                        sources[j]->clear();
1888                        // the new fetcher might be providing streams that used to be
1889                        // provided by two different fetchers,  if one of the fetcher
1890                        // paused in the middle while the other somehow paused in next
1891                        // seg, we have to start from next seg.
1892                        if (seekMode < mStreams[j].mSeekMode) {
1893                            seekMode = mStreams[j].mSeekMode;
1894                        }
1895                    }
1896                }
1897
1898                streamMask &= ~indexToType(j);
1899            }
1900        }
1901
1902        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1903                "segmentStartTimeUs %lld seekMode %d",
1904                fetcher->getFetcherID(),
1905                (long long)startTime.mTimeUs,
1906                (long long)mLastSeekTimeUs,
1907                (long long)startTime.getSegmentTimeUs(),
1908                seekMode);
1909
1910        // Set the target segment start time to the middle point of the
1911        // segment where the last sample was.
1912        // This gives a better guess if segments of the two variants are not
1913        // perfectly aligned. (If the corresponding segment in new variant
1914        // starts slightly later than that in the old variant, we still want
1915        // to pick that segment, not the one before)
1916        fetcher->startAsync(
1917                sources[kAudioIndex],
1918                sources[kVideoIndex],
1919                sources[kSubtitleIndex],
1920                getMetadataSource(sources, mNewStreamMask, switching),
1921                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1922                startTime.getSegmentTimeUs(),
1923                startTime.mSeq,
1924                seekMode);
1925    }
1926
1927    // All fetchers have now been started, the configuration change
1928    // has completed.
1929
1930    mReconfigurationInProgress = false;
1931    if (switching) {
1932        mSwitchInProgress = true;
1933    } else {
1934        mStreamMask = mNewStreamMask;
1935        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1936            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1937                    mOrigBandwidthIndex, mCurBandwidthIndex);
1938            mOrigBandwidthIndex = mCurBandwidthIndex;
1939        }
1940    }
1941
1942    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1943            mSwitchInProgress, mStreamMask);
1944
1945    if (mDisconnectReplyID != NULL) {
1946        finishDisconnect();
1947    }
1948}
1949
1950void LiveSession::swapPacketSource(StreamType stream) {
1951    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1952
1953    // transfer packets from source2 to source
1954    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1955    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1956
1957    // queue discontinuity in mPacketSource
1958    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1959
1960    // queue packets in mPacketSource2 to mPacketSource
1961    status_t finalResult = OK;
1962    sp<ABuffer> accessUnit;
1963    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1964          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1965        aps->queueAccessUnit(accessUnit);
1966    }
1967    aps2->clear();
1968}
1969
1970void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1971    if (!mSwitchInProgress) {
1972        return;
1973    }
1974
1975    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1976    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1977        return;
1978    }
1979
1980    // Swap packet source of streams provided by old variant
1981    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1982        StreamType stream = indexToType(idx);
1983        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1984            swapPacketSource(stream);
1985
1986            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1987                ALOGW("swapping stream type %d %s to empty stream",
1988                        stream, mStreams[idx].mUri.c_str());
1989            }
1990            mStreams[idx].mUri = mStreams[idx].mNewUri;
1991            mStreams[idx].mNewUri.clear();
1992
1993            mSwapMask &= ~stream;
1994        }
1995    }
1996
1997    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1998
1999    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2000    if (mSwapMask != 0) {
2001        return;
2002    }
2003
2004    // Check if new variant contains extra streams.
2005    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2006    while (extraStreams) {
2007        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2008        extraStreams &= ~stream;
2009
2010        swapPacketSource(stream);
2011
2012        ssize_t idx = typeToIndex(stream);
2013        CHECK(idx >= 0);
2014        if (mStreams[idx].mNewUri.empty()) {
2015            ALOGW("swapping extra stream type %d %s to empty stream",
2016                    stream, mStreams[idx].mUri.c_str());
2017        }
2018        mStreams[idx].mUri = mStreams[idx].mNewUri;
2019        mStreams[idx].mNewUri.clear();
2020    }
2021
2022    // Restart new fetcher (it was paused after the first 47k block)
2023    // and let it fetch into mPacketSources (not mPacketSources2)
2024    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2025        FetcherInfo &info = mFetcherInfos.editValueAt(i);
2026        if (info.mToBeResumed) {
2027            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2028            info.mToBeResumed = false;
2029        }
2030    }
2031
2032    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2033            mOrigBandwidthIndex, mCurBandwidthIndex);
2034
2035    mStreamMask = mNewStreamMask;
2036    mSwitchInProgress = false;
2037    mOrigBandwidthIndex = mCurBandwidthIndex;
2038
2039    restartPollBuffering();
2040}
2041
2042void LiveSession::schedulePollBuffering() {
2043    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2044    msg->setInt32("generation", mPollBufferingGeneration);
2045    msg->post(1000000ll);
2046}
2047
2048void LiveSession::cancelPollBuffering() {
2049    ++mPollBufferingGeneration;
2050    mPrevBufferPercentage = -1;
2051}
2052
2053void LiveSession::restartPollBuffering() {
2054    cancelPollBuffering();
2055    onPollBuffering();
2056}
2057
2058void LiveSession::onPollBuffering() {
2059    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2060            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2061        mSwitchInProgress, mReconfigurationInProgress,
2062        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2063
2064    bool underflow, ready, down, up;
2065    if (checkBuffering(underflow, ready, down, up)) {
2066        if (mInPreparationPhase) {
2067            // Allow down switch even if we're still preparing.
2068            //
2069            // Some streams have a high bandwidth index as default,
2070            // when bandwidth is low, it takes a long time to buffer
2071            // to ready mark, then it immediately pauses after start
2072            // as we have to do a down switch. It's better experience
2073            // to restart from a lower index, if we detect low bw.
2074            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2075                postPrepared(OK);
2076            }
2077        }
2078
2079        if (!mInPreparationPhase) {
2080            if (ready) {
2081                stopBufferingIfNecessary();
2082            } else if (underflow) {
2083                startBufferingIfNecessary();
2084            }
2085            switchBandwidthIfNeeded(up, down);
2086        }
2087    }
2088
2089    schedulePollBuffering();
2090}
2091
2092void LiveSession::cancelBandwidthSwitch(bool resume) {
2093    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2094            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2095    if (!mSwitchInProgress) {
2096        return;
2097    }
2098
2099    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2100        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2101        if (info.mToBeRemoved) {
2102            info.mToBeRemoved = false;
2103            if (resume) {
2104                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2105            }
2106        }
2107    }
2108
2109    for (size_t i = 0; i < kMaxStreams; ++i) {
2110        AString newUri = mStreams[i].mNewUri;
2111        if (!newUri.empty()) {
2112            // clear all mNewUri matching this newUri
2113            for (size_t j = i; j < kMaxStreams; ++j) {
2114                if (mStreams[j].mNewUri == newUri) {
2115                    mStreams[j].mNewUri.clear();
2116                }
2117            }
2118            ALOGV("stopping newUri = %s", newUri.c_str());
2119            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2120            if (index < 0) {
2121                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2122                continue;
2123            }
2124            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2125            info.mToBeRemoved = true;
2126            info.mFetcher->stopAsync();
2127        }
2128    }
2129
2130    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2131            mOrigBandwidthIndex, mCurBandwidthIndex);
2132
2133    mSwitchGeneration++;
2134    mSwitchInProgress = false;
2135    mCurBandwidthIndex = mOrigBandwidthIndex;
2136    mSwapMask = 0;
2137}
2138
2139bool LiveSession::checkBuffering(
2140        bool &underflow, bool &ready, bool &down, bool &up) {
2141    underflow = ready = down = up = false;
2142
2143    if (mReconfigurationInProgress) {
2144        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2145        return false;
2146    }
2147
2148    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2149    activeCount = underflowCount = readyCount = downCount = upCount =0;
2150    int32_t minBufferPercent = -1;
2151    int64_t durationUs;
2152    if (getDuration(&durationUs) != OK) {
2153        durationUs = -1;
2154    }
2155    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2156        // we don't check subtitles for buffering level
2157        if (!(mStreamMask & mPacketSources.keyAt(i)
2158                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2159            continue;
2160        }
2161        // ignore streams that never had any packet queued.
2162        // (it's possible that the variant only has audio or video)
2163        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2164        if (meta == NULL) {
2165            continue;
2166        }
2167
2168        status_t finalResult;
2169        int64_t bufferedDurationUs =
2170                mPacketSources[i]->getBufferedDurationUs(&finalResult);
2171        ALOGV("[%s] buffered %lld us",
2172                getNameForStream(mPacketSources.keyAt(i)),
2173                (long long)bufferedDurationUs);
2174        if (durationUs >= 0) {
2175            int32_t percent;
2176            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2177                percent = 100;
2178            } else {
2179                percent = (int32_t)(100.0 *
2180                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2181            }
2182            if (minBufferPercent < 0 || percent < minBufferPercent) {
2183                minBufferPercent = percent;
2184            }
2185        }
2186
2187        ++activeCount;
2188        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2189        if (bufferedDurationUs > readyMark
2190                || mPacketSources[i]->isFinished(0)) {
2191            ++readyCount;
2192        }
2193        if (!mPacketSources[i]->isFinished(0)) {
2194            if (bufferedDurationUs < kUnderflowMarkUs) {
2195                ++underflowCount;
2196            }
2197            if (bufferedDurationUs > mUpSwitchMark) {
2198                ++upCount;
2199            }
2200            if (bufferedDurationUs < mDownSwitchMark) {
2201                ++downCount;
2202            }
2203        }
2204    }
2205
2206    if (minBufferPercent >= 0) {
2207        notifyBufferingUpdate(minBufferPercent);
2208    }
2209
2210    if (activeCount > 0) {
2211        up        = (upCount == activeCount);
2212        down      = (downCount > 0);
2213        ready     = (readyCount == activeCount);
2214        underflow = (underflowCount > 0);
2215        return true;
2216    }
2217
2218    return false;
2219}
2220
2221void LiveSession::startBufferingIfNecessary() {
2222    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2223            mInPreparationPhase, mBuffering);
2224    if (!mBuffering) {
2225        mBuffering = true;
2226
2227        sp<AMessage> notify = mNotify->dup();
2228        notify->setInt32("what", kWhatBufferingStart);
2229        notify->post();
2230    }
2231}
2232
2233void LiveSession::stopBufferingIfNecessary() {
2234    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2235            mInPreparationPhase, mBuffering);
2236
2237    if (mBuffering) {
2238        mBuffering = false;
2239
2240        sp<AMessage> notify = mNotify->dup();
2241        notify->setInt32("what", kWhatBufferingEnd);
2242        notify->post();
2243    }
2244}
2245
2246void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2247    if (percentage < mPrevBufferPercentage) {
2248        percentage = mPrevBufferPercentage;
2249    } else if (percentage > 100) {
2250        percentage = 100;
2251    }
2252
2253    mPrevBufferPercentage = percentage;
2254
2255    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2256
2257    sp<AMessage> notify = mNotify->dup();
2258    notify->setInt32("what", kWhatBufferingUpdate);
2259    notify->setInt32("percentage", percentage);
2260    notify->post();
2261}
2262
2263bool LiveSession::tryBandwidthFallback() {
2264    if (mInPreparationPhase || mReconfigurationInProgress) {
2265        // Don't try fallback during prepare or reconfig.
2266        // If error happens there, it's likely unrecoverable.
2267        return false;
2268    }
2269    if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2270        // if we're switching up, simply cancel and resume old variant
2271        cancelBandwidthSwitch(true /* resume */);
2272        return true;
2273    } else {
2274        // if we're switching down, we're likely about to underflow (if
2275        // not already underflowing). try the lowest viable bandwidth if
2276        // not on that variant already.
2277        ssize_t lowestValid = getLowestValidBandwidthIndex();
2278        if (mCurBandwidthIndex > lowestValid) {
2279            cancelBandwidthSwitch();
2280            changeConfiguration(-1ll, lowestValid);
2281            return true;
2282        }
2283    }
2284    // return false if we couldn't find any fallback
2285    return false;
2286}
2287
2288/*
2289 * returns true if a bandwidth switch is actually needed (and started),
2290 * returns false otherwise
2291 */
2292bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2293    // no need to check bandwidth if we only have 1 bandwidth settings
2294    if (mBandwidthItems.size() < 2) {
2295        return false;
2296    }
2297
2298    if (mSwitchInProgress) {
2299        if (mBuffering) {
2300            tryBandwidthFallback();
2301        }
2302        return false;
2303    }
2304
2305    int32_t bandwidthBps, shortTermBps;
2306    bool isStable;
2307    if (mBandwidthEstimator->estimateBandwidth(
2308            &bandwidthBps, &isStable, &shortTermBps)) {
2309        ALOGV("bandwidth estimated at %.2f kbps, "
2310                "stable %d, shortTermBps %.2f kbps",
2311                bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2312        mLastBandwidthBps = bandwidthBps;
2313        mLastBandwidthStable = isStable;
2314    } else {
2315        ALOGV("no bandwidth estimate.");
2316        return false;
2317    }
2318
2319    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2320    // canSwithDown and canSwitchUp can't both be true.
2321    // we only want to switch up when measured bw is 120% higher than current variant,
2322    // and we only want to switch down when measured bw is below current variant.
2323    bool canSwitchDown = bufferLow
2324            && (bandwidthBps < (int32_t)curBandwidth);
2325    bool canSwitchUp = bufferHigh
2326            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2327
2328    if (canSwitchDown || canSwitchUp) {
2329        // bandwidth estimating has some delay, if we have to downswitch when
2330        // it hasn't stabilized, use the short term to guess real bandwidth,
2331        // since it may be dropping too fast.
2332        // (note this doesn't apply to upswitch, always use longer average there)
2333        if (!isStable && canSwitchDown) {
2334            if (shortTermBps < bandwidthBps) {
2335                bandwidthBps = shortTermBps;
2336            }
2337        }
2338
2339        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2340
2341        // it's possible that we're checking for canSwitchUp case, but the returned
2342        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2343        // of measured bw. In that case we don't want to do anything, since we have
2344        // both enough buffer and enough bw.
2345        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2346         || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2347            // if not yet prepared, just restart again with new bw index.
2348            // this is faster and playback experience is cleaner.
2349            changeConfiguration(
2350                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2351            return true;
2352        }
2353    }
2354    return false;
2355}
2356
2357void LiveSession::postError(status_t err) {
2358    // if we reached EOS, notify buffering of 100%
2359    if (err == ERROR_END_OF_STREAM) {
2360        notifyBufferingUpdate(100);
2361    }
2362    // we'll stop buffer polling now, before that notify
2363    // stop buffering to stop the spinning icon
2364    stopBufferingIfNecessary();
2365    cancelPollBuffering();
2366
2367    sp<AMessage> notify = mNotify->dup();
2368    notify->setInt32("what", kWhatError);
2369    notify->setInt32("err", err);
2370    notify->post();
2371}
2372
2373void LiveSession::postPrepared(status_t err) {
2374    CHECK(mInPreparationPhase);
2375
2376    sp<AMessage> notify = mNotify->dup();
2377    if (err == OK || err == ERROR_END_OF_STREAM) {
2378        notify->setInt32("what", kWhatPrepared);
2379    } else {
2380        cancelPollBuffering();
2381
2382        notify->setInt32("what", kWhatPreparationFailed);
2383        notify->setInt32("err", err);
2384    }
2385
2386    notify->post();
2387
2388    mInPreparationPhase = false;
2389}
2390
2391
2392}  // namespace android
2393
2394