LiveSession.cpp revision 5abbd3dcbb0bb32a3d4b90dddbcf90458967eb6f
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22#include "HTTPDownloader.h"
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "mpeg2ts/AnotherPacketSource.h"
27
28#include <cutils/properties.h>
29#include <media/IMediaHTTPService.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/foundation/AUtils.h>
34#include <media/stagefright/MediaDefs.h>
35#include <media/stagefright/MetaData.h>
36#include <media/stagefright/Utils.h>
37
38#include <utils/Mutex.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42
43namespace android {
44
45// static
46// Bandwidth Switch Mark Defaults
47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52// Buffer Prepare/Ready/Underflow Marks
53const int64_t LiveSession::kReadyMarkUs = 5000000ll;
54const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
55const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
56
57struct LiveSession::BandwidthEstimator : public RefBase {
58    BandwidthEstimator();
59
60    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61    bool estimateBandwidth(int32_t *bandwidth, bool *isStable = NULL);
62
63private:
64    // Bandwidth estimation parameters
65    static const int32_t kMinBandwidthHistoryItems = 20;
66    static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
67    static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
68
69    struct BandwidthEntry {
70        int64_t mDelayUs;
71        size_t mNumBytes;
72    };
73
74    Mutex mLock;
75    List<BandwidthEntry> mBandwidthHistory;
76    List<int32_t> mPrevEstimates;
77    bool mHasNewSample;
78    bool mIsStable;
79    int64_t mTotalTransferTimeUs;
80    size_t mTotalTransferBytes;
81
82    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
83};
84
85LiveSession::BandwidthEstimator::BandwidthEstimator() :
86    mHasNewSample(false),
87    mIsStable(true),
88    mTotalTransferTimeUs(0),
89    mTotalTransferBytes(0) {
90}
91
92void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
93        size_t numBytes, int64_t delayUs) {
94    AutoMutex autoLock(mLock);
95
96    BandwidthEntry entry;
97    entry.mDelayUs = delayUs;
98    entry.mNumBytes = numBytes;
99    mTotalTransferTimeUs += delayUs;
100    mTotalTransferBytes += numBytes;
101    mBandwidthHistory.push_back(entry);
102    mHasNewSample = true;
103
104    // Remove no more than 10% of total transfer time at a time
105    // to avoid sudden jump on bandwidth estimation. There might
106    // be long blocking reads that takes up signification time,
107    // we have to keep a longer window in that case.
108    int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
109    if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
110        bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
111    } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
112        bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
113    }
114    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
115    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
116    while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
117        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
118        if (mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
119            break;
120        }
121        mTotalTransferTimeUs -= it->mDelayUs;
122        mTotalTransferBytes -= it->mNumBytes;
123        mBandwidthHistory.erase(mBandwidthHistory.begin());
124    }
125}
126
127bool LiveSession::BandwidthEstimator::estimateBandwidth(
128        int32_t *bandwidthBps, bool *isStable) {
129    AutoMutex autoLock(mLock);
130
131    if (mBandwidthHistory.size() < 2) {
132        return false;
133    }
134
135    if (!mHasNewSample) {
136        *bandwidthBps = *(--mPrevEstimates.end());
137        if (isStable) {
138            *isStable = mIsStable;
139        }
140        return true;
141    }
142
143    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
144    mPrevEstimates.push_back(*bandwidthBps);
145    while (mPrevEstimates.size() > 3) {
146        mPrevEstimates.erase(mPrevEstimates.begin());
147    }
148    mHasNewSample = false;
149
150    int32_t minEstimate = -1, maxEstimate = -1;
151    List<int32_t>::iterator it;
152    for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
153        int32_t estimate = *it;
154        if (minEstimate < 0 || minEstimate > estimate) {
155            minEstimate = estimate;
156        }
157        if (maxEstimate < 0 || maxEstimate < estimate) {
158            maxEstimate = estimate;
159        }
160    }
161    mIsStable = (maxEstimate <= minEstimate * 4 / 3);
162    if (isStable) {
163       *isStable = mIsStable;
164    }
165#if 0
166    {
167        char dumpStr[1024] = {0};
168        size_t itemIdx = 0;
169        size_t histSize = mBandwidthHistory.size();
170        sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
171            *bandwidthBps, mIsStable, histSize);
172        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
173        for (; it != mBandwidthHistory.end(); ++it) {
174            if (itemIdx > 50) {
175                sprintf(dumpStr + strlen(dumpStr),
176                        "...(%zd more items)... }", histSize - itemIdx);
177                break;
178            }
179            sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
180                it->mNumBytes / 1024,
181                (double)it->mDelayUs * 1.0e-6,
182                (it == (--mBandwidthHistory.end())) ? "}" : ", ");
183            itemIdx++;
184        }
185        ALOGE(dumpStr);
186    }
187#endif
188    return true;
189}
190
191//static
192const char *LiveSession::getKeyForStream(StreamType type) {
193    switch (type) {
194        case STREAMTYPE_VIDEO:
195            return "timeUsVideo";
196        case STREAMTYPE_AUDIO:
197            return "timeUsAudio";
198        case STREAMTYPE_SUBTITLES:
199            return "timeUsSubtitle";
200        case STREAMTYPE_METADATA:
201            return "timeUsMetadata"; // unused
202        default:
203            TRESPASS();
204    }
205    return NULL;
206}
207
208//static
209const char *LiveSession::getNameForStream(StreamType type) {
210    switch (type) {
211        case STREAMTYPE_VIDEO:
212            return "video";
213        case STREAMTYPE_AUDIO:
214            return "audio";
215        case STREAMTYPE_SUBTITLES:
216            return "subs";
217        case STREAMTYPE_METADATA:
218            return "metadata";
219        default:
220            break;
221    }
222    return "unknown";
223}
224
225//static
226ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
227    switch (type) {
228        case STREAMTYPE_VIDEO:
229            return ATSParser::VIDEO;
230        case STREAMTYPE_AUDIO:
231            return ATSParser::AUDIO;
232        case STREAMTYPE_METADATA:
233            return ATSParser::META;
234        case STREAMTYPE_SUBTITLES:
235        default:
236            TRESPASS();
237    }
238    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
239}
240
241LiveSession::LiveSession(
242        const sp<AMessage> &notify, uint32_t flags,
243        const sp<IMediaHTTPService> &httpService)
244    : mNotify(notify),
245      mFlags(flags),
246      mHTTPService(httpService),
247      mBuffering(false),
248      mInPreparationPhase(true),
249      mPollBufferingGeneration(0),
250      mPrevBufferPercentage(-1),
251      mCurBandwidthIndex(-1),
252      mOrigBandwidthIndex(-1),
253      mLastBandwidthBps(-1ll),
254      mBandwidthEstimator(new BandwidthEstimator()),
255      mMaxWidth(720),
256      mMaxHeight(480),
257      mStreamMask(0),
258      mNewStreamMask(0),
259      mSwapMask(0),
260      mSwitchGeneration(0),
261      mSubtitleGeneration(0),
262      mLastDequeuedTimeUs(0ll),
263      mRealTimeBaseUs(0ll),
264      mReconfigurationInProgress(false),
265      mSwitchInProgress(false),
266      mUpSwitchMark(kUpSwitchMarkUs),
267      mDownSwitchMark(kDownSwitchMarkUs),
268      mUpSwitchMargin(kUpSwitchMarginUs),
269      mFirstTimeUsValid(false),
270      mFirstTimeUs(0),
271      mLastSeekTimeUs(0),
272      mHasMetadata(false) {
273    mStreams[kAudioIndex] = StreamItem("audio");
274    mStreams[kVideoIndex] = StreamItem("video");
275    mStreams[kSubtitleIndex] = StreamItem("subtitles");
276
277    for (size_t i = 0; i < kNumSources; ++i) {
278        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
279        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
280    }
281}
282
283LiveSession::~LiveSession() {
284    if (mFetcherLooper != NULL) {
285        mFetcherLooper->stop();
286    }
287}
288
289int64_t LiveSession::calculateMediaTimeUs(
290        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
291    if (timeUs >= firstTimeUs) {
292        timeUs -= firstTimeUs;
293    } else {
294        timeUs = 0;
295    }
296    timeUs += mLastSeekTimeUs;
297    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
298        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
299    }
300    return timeUs;
301}
302
303status_t LiveSession::dequeueAccessUnit(
304        StreamType stream, sp<ABuffer> *accessUnit) {
305    status_t finalResult = OK;
306    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
307
308    ssize_t streamIdx = typeToIndex(stream);
309    if (streamIdx < 0) {
310        return BAD_VALUE;
311    }
312    const char *streamStr = getNameForStream(stream);
313    // Do not let client pull data if we don't have data packets yet.
314    // We might only have a format discontinuity queued without data.
315    // When NuPlayerDecoder dequeues the format discontinuity, it will
316    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
317    // thinks it can do seamless change, so will not shutdown decoder.
318    // When the actual format arrives, it can't handle it and get stuck.
319    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
320        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
321                streamStr, finalResult);
322
323        if (finalResult == OK) {
324            return -EAGAIN;
325        } else {
326            return finalResult;
327        }
328    }
329
330    // Let the client dequeue as long as we have buffers available
331    // Do not make pause/resume decisions here.
332
333    status_t err = packetSource->dequeueAccessUnit(accessUnit);
334
335    if (err == INFO_DISCONTINUITY) {
336        // adaptive streaming, discontinuities in the playlist
337        int32_t type;
338        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
339
340        sp<AMessage> extra;
341        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
342            extra.clear();
343        }
344
345        ALOGI("[%s] read discontinuity of type %d, extra = %s",
346              streamStr,
347              type,
348              extra == NULL ? "NULL" : extra->debugString().c_str());
349    } else if (err == OK) {
350
351        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
352            int64_t timeUs, originalTimeUs;
353            int32_t discontinuitySeq = 0;
354            StreamItem& strm = mStreams[streamIdx];
355            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
356            originalTimeUs = timeUs;
357            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
358            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
359                int64_t offsetTimeUs;
360                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
361                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
362                } else {
363                    offsetTimeUs = 0;
364                }
365
366                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
367                        && strm.mLastDequeuedTimeUs >= 0) {
368                    int64_t firstTimeUs;
369                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
370                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
371                    offsetTimeUs += strm.mLastSampleDurationUs;
372                } else {
373                    offsetTimeUs += strm.mLastSampleDurationUs;
374                }
375
376                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
377                strm.mCurDiscontinuitySeq = discontinuitySeq;
378            }
379
380            int32_t discard = 0;
381            int64_t firstTimeUs;
382            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
383                int64_t durUs; // approximate sample duration
384                if (timeUs > strm.mLastDequeuedTimeUs) {
385                    durUs = timeUs - strm.mLastDequeuedTimeUs;
386                } else {
387                    durUs = strm.mLastDequeuedTimeUs - timeUs;
388                }
389                strm.mLastSampleDurationUs = durUs;
390                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
391            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
392                firstTimeUs = timeUs;
393            } else {
394                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
395                firstTimeUs = timeUs;
396            }
397
398            strm.mLastDequeuedTimeUs = timeUs;
399            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
400
401            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
402                    streamStr, (long long)timeUs, (long long)originalTimeUs);
403            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
404            mLastDequeuedTimeUs = timeUs;
405            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
406        } else if (stream == STREAMTYPE_SUBTITLES) {
407            int32_t subtitleGeneration;
408            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
409                    && subtitleGeneration != mSubtitleGeneration) {
410               return -EAGAIN;
411            };
412            (*accessUnit)->meta()->setInt32(
413                    "trackIndex", mPlaylist->getSelectedIndex());
414            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
415        } else if (stream == STREAMTYPE_METADATA) {
416            HLSTime mdTime((*accessUnit)->meta());
417            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
418                packetSource->requeueAccessUnit((*accessUnit));
419                return -EAGAIN;
420            } else {
421                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
422                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
423                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
424                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
425            }
426        }
427    } else {
428        ALOGI("[%s] encountered error %d", streamStr, err);
429    }
430
431    return err;
432}
433
434status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
435    if (!(mStreamMask & stream)) {
436        return UNKNOWN_ERROR;
437    }
438
439    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
440
441    sp<MetaData> meta = packetSource->getFormat();
442
443    if (meta == NULL) {
444        return -EAGAIN;
445    }
446
447    if (stream == STREAMTYPE_AUDIO) {
448        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
449        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
450    } else if (stream == STREAMTYPE_VIDEO) {
451        meta->setInt32(kKeyMaxWidth, mMaxWidth);
452        meta->setInt32(kKeyMaxHeight, mMaxHeight);
453    }
454
455    return convertMetaDataToMessage(meta, format);
456}
457
458sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
459    return new HTTPDownloader(mHTTPService, mExtraHeaders);
460}
461
462void LiveSession::connectAsync(
463        const char *url, const KeyedVector<String8, String8> *headers) {
464    sp<AMessage> msg = new AMessage(kWhatConnect, this);
465    msg->setString("url", url);
466
467    if (headers != NULL) {
468        msg->setPointer(
469                "headers",
470                new KeyedVector<String8, String8>(*headers));
471    }
472
473    msg->post();
474}
475
476status_t LiveSession::disconnect() {
477    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
478
479    sp<AMessage> response;
480    status_t err = msg->postAndAwaitResponse(&response);
481
482    return err;
483}
484
485status_t LiveSession::seekTo(int64_t timeUs) {
486    sp<AMessage> msg = new AMessage(kWhatSeek, this);
487    msg->setInt64("timeUs", timeUs);
488
489    sp<AMessage> response;
490    status_t err = msg->postAndAwaitResponse(&response);
491
492    return err;
493}
494
495bool LiveSession::checkSwitchProgress(
496        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
497    AString newUri;
498    CHECK(stopParams->findString("uri", &newUri));
499
500    *needResumeUntil = false;
501    sp<AMessage> firstNewMeta[kMaxStreams];
502    for (size_t i = 0; i < kMaxStreams; ++i) {
503        StreamType stream = indexToType(i);
504        if (!(mSwapMask & mNewStreamMask & stream)
505            || (mStreams[i].mNewUri != newUri)) {
506            continue;
507        }
508        if (stream == STREAMTYPE_SUBTITLES) {
509            continue;
510        }
511        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
512
513        // First, get latest dequeued meta, which is where the decoder is at.
514        // (when upswitching, we take the meta after a certain delay, so that
515        // the decoder is left with some cushion)
516        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
517        if (delayUs > 0) {
518            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
519            if (lastDequeueMeta == NULL) {
520                // this means we don't have enough cushion, try again later
521                ALOGV("[%s] up switching failed due to insufficient buffer",
522                        getNameForStream(stream));
523                return false;
524            }
525        } else {
526            // It's okay for lastDequeueMeta to be NULL here, it means the
527            // decoder hasn't even started dequeueing
528            lastDequeueMeta = source->getLatestDequeuedMeta();
529        }
530        // Then, trim off packets at beginning of mPacketSources2 that's before
531        // the latest dequeued time. These samples are definitely too late.
532        firstNewMeta[i] = mPacketSources2.editValueAt(i)
533                            ->trimBuffersBeforeMeta(lastDequeueMeta);
534
535        // Now firstNewMeta[i] is the first sample after the trim.
536        // If it's NULL, we failed because dequeue already past all samples
537        // in mPacketSource2, we have to try again.
538        if (firstNewMeta[i] == NULL) {
539            HLSTime dequeueTime(lastDequeueMeta);
540            ALOGV("[%s] dequeue time (%d, %lld) past start time",
541                    getNameForStream(stream),
542                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
543            return false;
544        }
545
546        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
547        // already fetched, and see if we need to resumeUntil
548        lastEnqueueMeta = source->getLatestEnqueuedMeta();
549        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
550        // boundary, no need to resume as the content will look different anyways
551        if (lastEnqueueMeta != NULL) {
552            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
553
554            // no need to resume old fetcher if new fetcher started in different
555            // discontinuity sequence, as the content will look different.
556            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
557                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
558
559            // update the stopTime for resumeUntil
560            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
561            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
562        }
563    }
564
565    // if we're here, it means dequeue progress hasn't passed some samples in
566    // mPacketSource2, we can trim off the excess in mPacketSource.
567    // (old fetcher might still need to resumeUntil the start time of new fetcher)
568    for (size_t i = 0; i < kMaxStreams; ++i) {
569        StreamType stream = indexToType(i);
570        if (!(mSwapMask & mNewStreamMask & stream)
571            || (newUri != mStreams[i].mNewUri)
572            || stream == STREAMTYPE_SUBTITLES) {
573            continue;
574        }
575        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
576    }
577
578    // no resumeUntil if already underflow
579    *needResumeUntil &= !mBuffering;
580
581    return true;
582}
583
584void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
585    switch (msg->what()) {
586        case kWhatConnect:
587        {
588            onConnect(msg);
589            break;
590        }
591
592        case kWhatDisconnect:
593        {
594            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
595
596            if (mReconfigurationInProgress) {
597                break;
598            }
599
600            finishDisconnect();
601            break;
602        }
603
604        case kWhatSeek:
605        {
606            if (mReconfigurationInProgress) {
607                msg->post(50000);
608                break;
609            }
610
611            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
612            mSeekReply = new AMessage;
613
614            onSeek(msg);
615            break;
616        }
617
618        case kWhatFetcherNotify:
619        {
620            int32_t what;
621            CHECK(msg->findInt32("what", &what));
622
623            switch (what) {
624                case PlaylistFetcher::kWhatStarted:
625                    break;
626                case PlaylistFetcher::kWhatPaused:
627                case PlaylistFetcher::kWhatStopped:
628                {
629                    AString uri;
630                    CHECK(msg->findString("uri", &uri));
631                    ssize_t index = mFetcherInfos.indexOfKey(uri);
632                    if (index < 0) {
633                        // ignore msgs from fetchers that's already gone
634                        break;
635                    }
636
637                    ALOGV("fetcher-%d %s",
638                            mFetcherInfos[index].mFetcher->getFetcherID(),
639                            what == PlaylistFetcher::kWhatPaused ?
640                                    "paused" : "stopped");
641
642                    if (what == PlaylistFetcher::kWhatStopped) {
643                        mFetcherLooper->unregisterHandler(
644                                mFetcherInfos[index].mFetcher->id());
645                        mFetcherInfos.removeItemsAt(index);
646                    } else if (what == PlaylistFetcher::kWhatPaused) {
647                        int32_t seekMode;
648                        CHECK(msg->findInt32("seekMode", &seekMode));
649                        for (size_t i = 0; i < kMaxStreams; ++i) {
650                            if (mStreams[i].mUri == uri) {
651                                mStreams[i].mSeekMode = (SeekMode) seekMode;
652                            }
653                        }
654                    }
655
656                    if (mContinuation != NULL) {
657                        CHECK_GT(mContinuationCounter, 0);
658                        if (--mContinuationCounter == 0) {
659                            mContinuation->post();
660                        }
661                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
662                    }
663                    break;
664                }
665
666                case PlaylistFetcher::kWhatDurationUpdate:
667                {
668                    AString uri;
669                    CHECK(msg->findString("uri", &uri));
670
671                    int64_t durationUs;
672                    CHECK(msg->findInt64("durationUs", &durationUs));
673
674                    ssize_t index = mFetcherInfos.indexOfKey(uri);
675                    if (index >= 0) {
676                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
677                        info->mDurationUs = durationUs;
678                    }
679                    break;
680                }
681
682                case PlaylistFetcher::kWhatTargetDurationUpdate:
683                {
684                    int64_t targetDurationUs;
685                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
686                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
687                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
688                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
689                    break;
690                }
691
692                case PlaylistFetcher::kWhatError:
693                {
694                    status_t err;
695                    CHECK(msg->findInt32("err", &err));
696
697                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
698
699                    // handle EOS on subtitle tracks independently
700                    AString uri;
701                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
702                        ssize_t i = mFetcherInfos.indexOfKey(uri);
703                        if (i >= 0) {
704                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
705                            if (fetcher != NULL) {
706                                uint32_t type = fetcher->getStreamTypeMask();
707                                if (type == STREAMTYPE_SUBTITLES) {
708                                    mPacketSources.valueFor(
709                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
710                                    break;
711                                }
712                            }
713                        }
714                    }
715
716                    if (mInPreparationPhase) {
717                        postPrepared(err);
718                    }
719
720                    cancelBandwidthSwitch();
721
722                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
723
724                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
725
726                    mPacketSources.valueFor(
727                            STREAMTYPE_SUBTITLES)->signalEOS(err);
728
729                    postError(err);
730                    break;
731                }
732
733                case PlaylistFetcher::kWhatStopReached:
734                {
735                    ALOGV("kWhatStopReached");
736
737                    AString oldUri;
738                    CHECK(msg->findString("uri", &oldUri));
739
740                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
741                    if (index < 0) {
742                        break;
743                    }
744
745                    tryToFinishBandwidthSwitch(oldUri);
746                    break;
747                }
748
749                case PlaylistFetcher::kWhatStartedAt:
750                {
751                    int32_t switchGeneration;
752                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
753
754                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
755                            switchGeneration, mSwitchGeneration);
756
757                    if (switchGeneration != mSwitchGeneration) {
758                        break;
759                    }
760
761                    AString uri;
762                    CHECK(msg->findString("uri", &uri));
763
764                    // mark new fetcher mToBeResumed
765                    ssize_t index = mFetcherInfos.indexOfKey(uri);
766                    if (index >= 0) {
767                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
768                    }
769
770                    // temporarily disable packet sources to be swapped to prevent
771                    // NuPlayerDecoder from dequeuing while we check progress
772                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
773                        if ((mSwapMask & mPacketSources.keyAt(i))
774                                && uri == mStreams[i].mNewUri) {
775                            mPacketSources.editValueAt(i)->enable(false);
776                        }
777                    }
778                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
779                    // If switching up, require a cushion bigger than kUnderflowMark
780                    // to avoid buffering immediately after the switch.
781                    // (If we don't have that cushion we'd rather cancel and try again.)
782                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
783                    bool needResumeUntil = false;
784                    sp<AMessage> stopParams = msg;
785                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
786                        // playback time hasn't passed startAt time
787                        if (!needResumeUntil) {
788                            ALOGV("finish switch");
789                            for (size_t i = 0; i < kMaxStreams; ++i) {
790                                if ((mSwapMask & indexToType(i))
791                                        && uri == mStreams[i].mNewUri) {
792                                    // have to make a copy of mStreams[i].mUri because
793                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
794                                    AString oldURI = mStreams[i].mUri;
795                                    tryToFinishBandwidthSwitch(oldURI);
796                                    break;
797                                }
798                            }
799                        } else {
800                            // startAt time is after last enqueue time
801                            // Resume fetcher for the original variant; the resumed fetcher should
802                            // continue until the timestamps found in msg, which is stored by the
803                            // new fetcher to indicate where the new variant has started buffering.
804                            ALOGV("finish switch with resumeUntilAsync");
805                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
806                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
807                                if (info.mToBeRemoved) {
808                                    info.mFetcher->resumeUntilAsync(stopParams);
809                                }
810                            }
811                        }
812                    } else {
813                        // playback time passed startAt time
814                        if (switchUp) {
815                            // if switching up, cancel and retry if condition satisfies again
816                            ALOGV("cancel up switch because we're too late");
817                            cancelBandwidthSwitch(true /* resume */);
818                        } else {
819                            ALOGV("retry down switch at next sample");
820                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
821                        }
822                    }
823                    // re-enable all packet sources
824                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
825                        mPacketSources.editValueAt(i)->enable(true);
826                    }
827
828                    break;
829                }
830
831                case PlaylistFetcher::kWhatPlaylistFetched:
832                {
833                    onMasterPlaylistFetched(msg);
834                    break;
835                }
836
837                case PlaylistFetcher::kWhatMetadataDetected:
838                {
839                    if (!mHasMetadata) {
840                        mHasMetadata = true;
841                        sp<AMessage> notify = mNotify->dup();
842                        notify->setInt32("what", kWhatMetadataDetected);
843                        notify->post();
844                    }
845                    break;
846                }
847
848                default:
849                    TRESPASS();
850            }
851
852            break;
853        }
854
855        case kWhatChangeConfiguration:
856        {
857            onChangeConfiguration(msg);
858            break;
859        }
860
861        case kWhatChangeConfiguration2:
862        {
863            onChangeConfiguration2(msg);
864            break;
865        }
866
867        case kWhatChangeConfiguration3:
868        {
869            onChangeConfiguration3(msg);
870            break;
871        }
872
873        case kWhatPollBuffering:
874        {
875            int32_t generation;
876            CHECK(msg->findInt32("generation", &generation));
877            if (generation == mPollBufferingGeneration) {
878                onPollBuffering();
879            }
880            break;
881        }
882
883        default:
884            TRESPASS();
885            break;
886    }
887}
888
889// static
890int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
891    if (a->mBandwidth < b->mBandwidth) {
892        return -1;
893    } else if (a->mBandwidth == b->mBandwidth) {
894        return 0;
895    }
896
897    return 1;
898}
899
900// static
901LiveSession::StreamType LiveSession::indexToType(int idx) {
902    CHECK(idx >= 0 && idx < kNumSources);
903    return (StreamType)(1 << idx);
904}
905
906// static
907ssize_t LiveSession::typeToIndex(int32_t type) {
908    switch (type) {
909        case STREAMTYPE_AUDIO:
910            return 0;
911        case STREAMTYPE_VIDEO:
912            return 1;
913        case STREAMTYPE_SUBTITLES:
914            return 2;
915        case STREAMTYPE_METADATA:
916            return 3;
917        default:
918            return -1;
919    };
920    return -1;
921}
922
923void LiveSession::onConnect(const sp<AMessage> &msg) {
924    CHECK(msg->findString("url", &mMasterURL));
925
926    // TODO currently we don't know if we are coming here from incognito mode
927    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
928
929    KeyedVector<String8, String8> *headers = NULL;
930    if (!msg->findPointer("headers", (void **)&headers)) {
931        mExtraHeaders.clear();
932    } else {
933        mExtraHeaders = *headers;
934
935        delete headers;
936        headers = NULL;
937    }
938
939    // create looper for fetchers
940    if (mFetcherLooper == NULL) {
941        mFetcherLooper = new ALooper();
942
943        mFetcherLooper->setName("Fetcher");
944        mFetcherLooper->start(false, false);
945    }
946
947    // create fetcher to fetch the master playlist
948    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
949}
950
951void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
952    AString uri;
953    CHECK(msg->findString("uri", &uri));
954    ssize_t index = mFetcherInfos.indexOfKey(uri);
955    if (index < 0) {
956        ALOGW("fetcher for master playlist is gone.");
957        return;
958    }
959
960    // no longer useful, remove
961    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
962    mFetcherInfos.removeItemsAt(index);
963
964    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
965    if (mPlaylist == NULL) {
966        ALOGE("unable to fetch master playlist %s.",
967                uriDebugString(mMasterURL).c_str());
968
969        postPrepared(ERROR_IO);
970        return;
971    }
972    // We trust the content provider to make a reasonable choice of preferred
973    // initial bandwidth by listing it first in the variant playlist.
974    // At startup we really don't have a good estimate on the available
975    // network bandwidth since we haven't tranferred any data yet. Once
976    // we have we can make a better informed choice.
977    size_t initialBandwidth = 0;
978    size_t initialBandwidthIndex = 0;
979
980    int32_t maxWidth = 0;
981    int32_t maxHeight = 0;
982
983    if (mPlaylist->isVariantPlaylist()) {
984        Vector<BandwidthItem> itemsWithVideo;
985        for (size_t i = 0; i < mPlaylist->size(); ++i) {
986            BandwidthItem item;
987
988            item.mPlaylistIndex = i;
989
990            sp<AMessage> meta;
991            AString uri;
992            mPlaylist->itemAt(i, &uri, &meta);
993
994            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
995
996            int32_t width, height;
997            if (meta->findInt32("width", &width)) {
998                maxWidth = max(maxWidth, width);
999            }
1000            if (meta->findInt32("height", &height)) {
1001                maxHeight = max(maxHeight, height);
1002            }
1003
1004            mBandwidthItems.push(item);
1005            if (mPlaylist->hasType(i, "video")) {
1006                itemsWithVideo.push(item);
1007            }
1008        }
1009        // remove the audio-only variants if we have at least one with video
1010        if (!itemsWithVideo.empty()
1011                && itemsWithVideo.size() < mBandwidthItems.size()) {
1012            mBandwidthItems.clear();
1013            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1014                mBandwidthItems.push(itemsWithVideo[i]);
1015            }
1016        }
1017
1018        CHECK_GT(mBandwidthItems.size(), 0u);
1019        initialBandwidth = mBandwidthItems[0].mBandwidth;
1020
1021        mBandwidthItems.sort(SortByBandwidth);
1022
1023        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1024            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1025                initialBandwidthIndex = i;
1026                break;
1027            }
1028        }
1029    } else {
1030        // dummy item.
1031        BandwidthItem item;
1032        item.mPlaylistIndex = 0;
1033        item.mBandwidth = 0;
1034        mBandwidthItems.push(item);
1035    }
1036
1037    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1038    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1039
1040    mPlaylist->pickRandomMediaItems();
1041    changeConfiguration(
1042            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1043}
1044
1045void LiveSession::finishDisconnect() {
1046    ALOGV("finishDisconnect");
1047
1048    // No reconfiguration is currently pending, make sure none will trigger
1049    // during disconnection either.
1050    cancelBandwidthSwitch();
1051
1052    // cancel buffer polling
1053    cancelPollBuffering();
1054
1055    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1056    //
1057    // Some fetchers might be stuck in connect/getSize at this point. These
1058    // operations will eventually timeout (as we have a timeout set in
1059    // MediaHTTPConnection), but we don't want to block the main UI thread
1060    // until then. Here we just need to make sure we clear all references
1061    // to the fetchers, so that when they finally exit from the blocking
1062    // operation, they can be destructed.
1063    //
1064    // There is one very tricky point though. For this scheme to work, the
1065    // fecther must hold a reference to LiveSession, so that LiveSession is
1066    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1067    // own destructor when it waits for mFetcherLooper to stop, which still
1068    // blocks main UI thread.
1069    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1070        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1071        mFetcherLooper->unregisterHandler(
1072                mFetcherInfos.valueAt(i).mFetcher->id());
1073    }
1074    mFetcherInfos.clear();
1075
1076    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1077    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1078
1079    mPacketSources.valueFor(
1080            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1081
1082    sp<AMessage> response = new AMessage;
1083    response->setInt32("err", OK);
1084
1085    response->postReply(mDisconnectReplyID);
1086    mDisconnectReplyID.clear();
1087}
1088
1089sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1090    ssize_t index = mFetcherInfos.indexOfKey(uri);
1091
1092    if (index >= 0) {
1093        return NULL;
1094    }
1095
1096    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1097    notify->setString("uri", uri);
1098    notify->setInt32("switchGeneration", mSwitchGeneration);
1099
1100    FetcherInfo info;
1101    info.mFetcher = new PlaylistFetcher(
1102            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1103    info.mDurationUs = -1ll;
1104    info.mToBeRemoved = false;
1105    info.mToBeResumed = false;
1106    mFetcherLooper->registerHandler(info.mFetcher);
1107
1108    mFetcherInfos.add(uri, info);
1109
1110    return info.mFetcher;
1111}
1112
1113#if 0
1114static double uniformRand() {
1115    return (double)rand() / RAND_MAX;
1116}
1117#endif
1118
1119bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1120    ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1121            newUri ? "true" : "false",
1122            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1123    return i >= 0
1124            && ((!newUri && uri == mStreams[i].mUri)
1125            || (newUri && uri == mStreams[i].mNewUri));
1126}
1127
1128sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1129        size_t trackIndex, bool newUri) {
1130    StreamType type = indexToType(trackIndex);
1131    sp<AnotherPacketSource> source = NULL;
1132    if (newUri) {
1133        source = mPacketSources2.valueFor(type);
1134        source->clear();
1135    } else {
1136        source = mPacketSources.valueFor(type);
1137    };
1138    return source;
1139}
1140
1141sp<AnotherPacketSource> LiveSession::getMetadataSource(
1142        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1143    // todo: One case where the following strategy can fail is when audio and video
1144    // are in separate playlists, both are transport streams, and the metadata
1145    // is actually contained in the audio stream.
1146    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1147            streamMask, newUri ? "true" : "false");
1148
1149    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1150            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1151            // ... audio fetcher for audio only variant
1152        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1153    }
1154
1155    return NULL;
1156}
1157
1158bool LiveSession::resumeFetcher(
1159        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1160    ssize_t index = mFetcherInfos.indexOfKey(uri);
1161    if (index < 0) {
1162        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1163        return false;
1164    }
1165
1166    bool resume = false;
1167    sp<AnotherPacketSource> sources[kNumSources];
1168    for (size_t i = 0; i < kMaxStreams; ++i) {
1169        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1170            resume = true;
1171            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1172        }
1173    }
1174
1175    if (resume) {
1176        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1177        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1178
1179        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1180                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1181
1182        fetcher->startAsync(
1183                sources[kAudioIndex],
1184                sources[kVideoIndex],
1185                sources[kSubtitleIndex],
1186                getMetadataSource(sources, streamMask, newUri),
1187                timeUs, -1, -1, seekMode);
1188    }
1189
1190    return resume;
1191}
1192
1193float LiveSession::getAbortThreshold(
1194        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1195    float abortThreshold = -1.0f;
1196    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1197        /*
1198           If we're switching down, we need to decide whether to
1199
1200           1) finish last segment of high-bandwidth variant, or
1201           2) abort last segment of high-bandwidth variant, and fetch an
1202              overlapping portion from low-bandwidth variant.
1203
1204           Here we try to maximize the amount of buffer left when the
1205           switch point is met. Given the following parameters:
1206
1207           B: our current buffering level in seconds
1208           T: target duration in seconds
1209           X: sample duration in seconds remain to fetch in last segment
1210           bw0: bandwidth of old variant (as specified in playlist)
1211           bw1: bandwidth of new variant (as specified in playlist)
1212           bw: measured bandwidth available
1213
1214           If we choose 1), when switch happens at the end of current
1215           segment, our buffering will be
1216                  B + X - X * bw0 / bw
1217
1218           If we choose 2), when switch happens where we aborted current
1219           segment, our buffering will be
1220                  B - (T - X) * bw1 / bw
1221
1222           We should only choose 1) if
1223                  X/T < bw1 / (bw1 + bw0 - bw)
1224        */
1225
1226        // Taking the measured current bandwidth at 50% face value only,
1227        // as our bandwidth estimation is a lagging indicator. Being
1228        // conservative on this, we prefer switching to lower bandwidth
1229        // unless we're really confident finishing up the last segment
1230        // of higher bandwidth will be fast.
1231        CHECK(mLastBandwidthBps >= 0);
1232        abortThreshold =
1233                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1234             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1235              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1236              - (float)mLastBandwidthBps * 0.5f);
1237        if (abortThreshold < 0.0f) {
1238            abortThreshold = -1.0f; // do not abort
1239        }
1240        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1241                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1242                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1243                mLastBandwidthBps,
1244                abortThreshold);
1245    }
1246    return abortThreshold;
1247}
1248
1249void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1250    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1251}
1252
1253size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1254    if (mBandwidthItems.size() < 2) {
1255        // shouldn't be here if we only have 1 bandwidth, check
1256        // logic to get rid of redundant bandwidth polling
1257        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1258        return 0;
1259    }
1260
1261#if 1
1262    char value[PROPERTY_VALUE_MAX];
1263    ssize_t index = -1;
1264    if (property_get("media.httplive.bw-index", value, NULL)) {
1265        char *end;
1266        index = strtol(value, &end, 10);
1267        CHECK(end > value && *end == '\0');
1268
1269        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1270            index = mBandwidthItems.size() - 1;
1271        }
1272    }
1273
1274    if (index < 0) {
1275        char value[PROPERTY_VALUE_MAX];
1276        if (property_get("media.httplive.max-bw", value, NULL)) {
1277            char *end;
1278            long maxBw = strtoul(value, &end, 10);
1279            if (end > value && *end == '\0') {
1280                if (maxBw > 0 && bandwidthBps > maxBw) {
1281                    ALOGV("bandwidth capped to %ld bps", maxBw);
1282                    bandwidthBps = maxBw;
1283                }
1284            }
1285        }
1286
1287        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1288
1289        index = mBandwidthItems.size() - 1;
1290        while (index > 0) {
1291            // be conservative (70%) to avoid overestimating and immediately
1292            // switching down again.
1293            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1294            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1295                break;
1296            }
1297            --index;
1298        }
1299    }
1300#elif 0
1301    // Change bandwidth at random()
1302    size_t index = uniformRand() * mBandwidthItems.size();
1303#elif 0
1304    // There's a 50% chance to stay on the current bandwidth and
1305    // a 50% chance to switch to the next higher bandwidth (wrapping around
1306    // to lowest)
1307    const size_t kMinIndex = 0;
1308
1309    static ssize_t mCurBandwidthIndex = -1;
1310
1311    size_t index;
1312    if (mCurBandwidthIndex < 0) {
1313        index = kMinIndex;
1314    } else if (uniformRand() < 0.5) {
1315        index = (size_t)mCurBandwidthIndex;
1316    } else {
1317        index = mCurBandwidthIndex + 1;
1318        if (index == mBandwidthItems.size()) {
1319            index = kMinIndex;
1320        }
1321    }
1322    mCurBandwidthIndex = index;
1323#elif 0
1324    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1325
1326    size_t index = mBandwidthItems.size() - 1;
1327    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1328        --index;
1329    }
1330#elif 1
1331    char value[PROPERTY_VALUE_MAX];
1332    size_t index;
1333    if (property_get("media.httplive.bw-index", value, NULL)) {
1334        char *end;
1335        index = strtoul(value, &end, 10);
1336        CHECK(end > value && *end == '\0');
1337
1338        if (index >= mBandwidthItems.size()) {
1339            index = mBandwidthItems.size() - 1;
1340        }
1341    } else {
1342        index = 0;
1343    }
1344#else
1345    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1346#endif
1347
1348    CHECK_GE(index, 0);
1349
1350    return index;
1351}
1352
1353HLSTime LiveSession::latestMediaSegmentStartTime() const {
1354    HLSTime audioTime(mPacketSources.valueFor(
1355                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1356
1357    HLSTime videoTime(mPacketSources.valueFor(
1358                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1359
1360    return audioTime < videoTime ? videoTime : audioTime;
1361}
1362
1363void LiveSession::onSeek(const sp<AMessage> &msg) {
1364    int64_t timeUs;
1365    CHECK(msg->findInt64("timeUs", &timeUs));
1366    changeConfiguration(timeUs);
1367}
1368
1369status_t LiveSession::getDuration(int64_t *durationUs) const {
1370    int64_t maxDurationUs = -1ll;
1371    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1372        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1373
1374        if (fetcherDurationUs > maxDurationUs) {
1375            maxDurationUs = fetcherDurationUs;
1376        }
1377    }
1378
1379    *durationUs = maxDurationUs;
1380
1381    return OK;
1382}
1383
1384bool LiveSession::isSeekable() const {
1385    int64_t durationUs;
1386    return getDuration(&durationUs) == OK && durationUs >= 0;
1387}
1388
1389bool LiveSession::hasDynamicDuration() const {
1390    return false;
1391}
1392
1393size_t LiveSession::getTrackCount() const {
1394    if (mPlaylist == NULL) {
1395        return 0;
1396    } else {
1397        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1398    }
1399}
1400
1401sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1402    if (mPlaylist == NULL) {
1403        return NULL;
1404    } else {
1405        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1406            sp<AMessage> format = new AMessage();
1407            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1408            format->setString("language", "und");
1409            format->setString("mime", MEDIA_MIMETYPE_DATA_METADATA);
1410            return format;
1411        }
1412        return mPlaylist->getTrackInfo(trackIndex);
1413    }
1414}
1415
1416status_t LiveSession::selectTrack(size_t index, bool select) {
1417    if (mPlaylist == NULL) {
1418        return INVALID_OPERATION;
1419    }
1420
1421    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1422            index, select, mSubtitleGeneration);
1423
1424    ++mSubtitleGeneration;
1425    status_t err = mPlaylist->selectTrack(index, select);
1426    if (err == OK) {
1427        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1428        msg->setInt32("pickTrack", select);
1429        msg->post();
1430    }
1431    return err;
1432}
1433
1434ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1435    if (mPlaylist == NULL) {
1436        return -1;
1437    } else {
1438        return mPlaylist->getSelectedTrack(type);
1439    }
1440}
1441
1442void LiveSession::changeConfiguration(
1443        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1444    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1445          (long long)timeUs, bandwidthIndex, pickTrack);
1446
1447    cancelBandwidthSwitch();
1448
1449    CHECK(!mReconfigurationInProgress);
1450    mReconfigurationInProgress = true;
1451    if (bandwidthIndex >= 0) {
1452        mOrigBandwidthIndex = mCurBandwidthIndex;
1453        mCurBandwidthIndex = bandwidthIndex;
1454        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1455            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1456                    mOrigBandwidthIndex, mCurBandwidthIndex);
1457        }
1458    }
1459    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1460    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1461
1462    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1463    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1464
1465    AString URIs[kMaxStreams];
1466    for (size_t i = 0; i < kMaxStreams; ++i) {
1467        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1468            streamMask |= indexToType(i);
1469        }
1470    }
1471
1472    // Step 1, stop and discard fetchers that are no longer needed.
1473    // Pause those that we'll reuse.
1474    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1475        // skip fetchers that are marked mToBeRemoved,
1476        // these are done and can't be reused
1477        if (mFetcherInfos[i].mToBeRemoved) {
1478            continue;
1479        }
1480
1481        const AString &uri = mFetcherInfos.keyAt(i);
1482        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1483
1484        bool discardFetcher = true, delayRemoval = false;
1485        for (size_t j = 0; j < kMaxStreams; ++j) {
1486            StreamType type = indexToType(j);
1487            if ((streamMask & type) && uri == URIs[j]) {
1488                resumeMask |= type;
1489                streamMask &= ~type;
1490                discardFetcher = false;
1491            }
1492        }
1493        // Delay fetcher removal if not picking tracks, AND old fetcher
1494        // has stream mask that overlaps new variant. (Okay to discard
1495        // old fetcher now, if completely no overlap.)
1496        if (discardFetcher && timeUs < 0ll && !pickTrack
1497                && (fetcher->getStreamTypeMask() & streamMask)) {
1498            discardFetcher = false;
1499            delayRemoval = true;
1500        }
1501
1502        if (discardFetcher) {
1503            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1504            fetcher->stopAsync();
1505        } else {
1506            float threshold = -1.0f; // always finish fetching by default
1507            bool disconnect = false;
1508            if (timeUs >= 0ll) {
1509                // seeking, no need to finish fetching
1510                threshold = 0.0f;
1511                disconnect = true;
1512            } else if (delayRemoval) {
1513                // adapting, abort if remaining of current segment is over threshold
1514                threshold = getAbortThreshold(
1515                        mOrigBandwidthIndex, mCurBandwidthIndex);
1516            }
1517
1518            ALOGV("pausing fetcher-%d, threshold=%.2f",
1519                    fetcher->getFetcherID(), threshold);
1520            fetcher->pauseAsync(threshold, disconnect);
1521        }
1522    }
1523
1524    sp<AMessage> msg;
1525    if (timeUs < 0ll) {
1526        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1527        msg = new AMessage(kWhatChangeConfiguration3, this);
1528    } else {
1529        msg = new AMessage(kWhatChangeConfiguration2, this);
1530    }
1531    msg->setInt32("streamMask", streamMask);
1532    msg->setInt32("resumeMask", resumeMask);
1533    msg->setInt32("pickTrack", pickTrack);
1534    msg->setInt64("timeUs", timeUs);
1535    for (size_t i = 0; i < kMaxStreams; ++i) {
1536        if ((streamMask | resumeMask) & indexToType(i)) {
1537            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1538        }
1539    }
1540
1541    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1542    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1543    // fetchers have completed their asynchronous operation, we'll post
1544    // mContinuation, which then is handled below in onChangeConfiguration2.
1545    mContinuationCounter = mFetcherInfos.size();
1546    mContinuation = msg;
1547
1548    if (mContinuationCounter == 0) {
1549        msg->post();
1550    }
1551}
1552
1553void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1554    ALOGV("onChangeConfiguration");
1555
1556    if (!mReconfigurationInProgress) {
1557        int32_t pickTrack = 0;
1558        msg->findInt32("pickTrack", &pickTrack);
1559        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1560    } else {
1561        msg->post(1000000ll); // retry in 1 sec
1562    }
1563}
1564
1565void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1566    ALOGV("onChangeConfiguration2");
1567
1568    mContinuation.clear();
1569
1570    // All fetchers are either suspended or have been removed now.
1571
1572    // If we're seeking, clear all packet sources before we report
1573    // seek complete, to prevent decoder from pulling stale data.
1574    int64_t timeUs;
1575    CHECK(msg->findInt64("timeUs", &timeUs));
1576
1577    if (timeUs >= 0) {
1578        mLastSeekTimeUs = timeUs;
1579        mLastDequeuedTimeUs = timeUs;
1580
1581        for (size_t i = 0; i < mPacketSources.size(); i++) {
1582            mPacketSources.editValueAt(i)->clear();
1583        }
1584
1585        for (size_t i = 0; i < kMaxStreams; ++i) {
1586            mStreams[i].reset();
1587        }
1588
1589        mDiscontinuityOffsetTimesUs.clear();
1590        mDiscontinuityAbsStartTimesUs.clear();
1591
1592        if (mSeekReplyID != NULL) {
1593            CHECK(mSeekReply != NULL);
1594            mSeekReply->setInt32("err", OK);
1595            mSeekReply->postReply(mSeekReplyID);
1596            mSeekReplyID.clear();
1597            mSeekReply.clear();
1598        }
1599
1600        // restart buffer polling after seek becauese previous
1601        // buffering position is no longer valid.
1602        restartPollBuffering();
1603    }
1604
1605    uint32_t streamMask, resumeMask;
1606    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1607    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1608
1609    streamMask |= resumeMask;
1610
1611    AString URIs[kMaxStreams];
1612    for (size_t i = 0; i < kMaxStreams; ++i) {
1613        if (streamMask & indexToType(i)) {
1614            const AString &uriKey = mStreams[i].uriKey();
1615            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1616            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1617        }
1618    }
1619
1620    uint32_t changedMask = 0;
1621    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1622        // stream URI could change even if onChangeConfiguration2 is only
1623        // used for seek. Seek could happen during a bw switch, in this
1624        // case bw switch will be cancelled, but the seekTo position will
1625        // fetch from the new URI.
1626        if ((mStreamMask & streamMask & indexToType(i))
1627                && !mStreams[i].mUri.empty()
1628                && !(URIs[i] == mStreams[i].mUri)) {
1629            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1630                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1631            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1632            if (source->getLatestDequeuedMeta() != NULL) {
1633                source->queueDiscontinuity(
1634                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1635            }
1636        }
1637        // Determine which decoders to shutdown on the player side,
1638        // a decoder has to be shutdown if its streamtype was active
1639        // before but now longer isn't.
1640        if ((mStreamMask & ~streamMask & indexToType(i))) {
1641            changedMask |= indexToType(i);
1642        }
1643    }
1644
1645    if (changedMask == 0) {
1646        // If nothing changed as far as the audio/video decoders
1647        // are concerned we can proceed.
1648        onChangeConfiguration3(msg);
1649        return;
1650    }
1651
1652    // Something changed, inform the player which will shutdown the
1653    // corresponding decoders and will post the reply once that's done.
1654    // Handling the reply will continue executing below in
1655    // onChangeConfiguration3.
1656    sp<AMessage> notify = mNotify->dup();
1657    notify->setInt32("what", kWhatStreamsChanged);
1658    notify->setInt32("changedMask", changedMask);
1659
1660    msg->setWhat(kWhatChangeConfiguration3);
1661    msg->setTarget(this);
1662
1663    notify->setMessage("reply", msg);
1664    notify->post();
1665}
1666
1667void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1668    mContinuation.clear();
1669    // All remaining fetchers are still suspended, the player has shutdown
1670    // any decoders that needed it.
1671
1672    uint32_t streamMask, resumeMask;
1673    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1674    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1675
1676    mNewStreamMask = streamMask | resumeMask;
1677
1678    int64_t timeUs;
1679    int32_t pickTrack;
1680    bool switching = false;
1681    CHECK(msg->findInt64("timeUs", &timeUs));
1682    CHECK(msg->findInt32("pickTrack", &pickTrack));
1683
1684    if (timeUs < 0ll) {
1685        if (!pickTrack) {
1686            // mSwapMask contains streams that are in both old and new variant,
1687            // (in mNewStreamMask & mStreamMask) but with different URIs
1688            // (not in resumeMask).
1689            // For example, old variant has video and audio in two separate
1690            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1691            // should be 0 as there is nothing to swap. We only need to stop video,
1692            // and resume audio.
1693            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1694            switching = (mSwapMask != 0);
1695        }
1696        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1697    } else {
1698        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1699    }
1700
1701    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1702            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1703            (long long)timeUs, switching, pickTrack,
1704            mStreamMask, mNewStreamMask, mSwapMask);
1705
1706    for (size_t i = 0; i < kMaxStreams; ++i) {
1707        if (streamMask & indexToType(i)) {
1708            if (switching) {
1709                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1710            } else {
1711                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1712            }
1713        }
1714    }
1715
1716    // Of all existing fetchers:
1717    // * Resume fetchers that are still needed and assign them original packet sources.
1718    // * Mark otherwise unneeded fetchers for removal.
1719    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1720    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1721        const AString &uri = mFetcherInfos.keyAt(i);
1722        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1723            ALOGV("marking fetcher-%d to be removed",
1724                    mFetcherInfos[i].mFetcher->getFetcherID());
1725
1726            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1727        }
1728    }
1729
1730    // streamMask now only contains the types that need a new fetcher created.
1731    if (streamMask != 0) {
1732        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1733    }
1734
1735    // Find out when the original fetchers have buffered up to and start the new fetchers
1736    // at a later timestamp.
1737    for (size_t i = 0; i < kMaxStreams; i++) {
1738        if (!(indexToType(i) & streamMask)) {
1739            continue;
1740        }
1741
1742        AString uri;
1743        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1744
1745        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1746        CHECK(fetcher != NULL);
1747
1748        HLSTime startTime;
1749        SeekMode seekMode = kSeekModeExactPosition;
1750        sp<AnotherPacketSource> sources[kNumSources];
1751
1752        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1753            startTime = latestMediaSegmentStartTime();
1754        }
1755
1756        // TRICKY: looping from i as earlier streams are already removed from streamMask
1757        for (size_t j = i; j < kMaxStreams; ++j) {
1758            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1759            if ((streamMask & indexToType(j)) && uri == streamUri) {
1760                sources[j] = mPacketSources.valueFor(indexToType(j));
1761
1762                if (timeUs >= 0) {
1763                    startTime.mTimeUs = timeUs;
1764                } else {
1765                    int32_t type;
1766                    sp<AMessage> meta;
1767                    if (!switching) {
1768                        // selecting, or adapting but no swap required
1769                        meta = sources[j]->getLatestDequeuedMeta();
1770                    } else {
1771                        // adapting and swap required
1772                        meta = sources[j]->getLatestEnqueuedMeta();
1773                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1774                            // switching up
1775                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1776                        }
1777                    }
1778
1779                    if ((j == kAudioIndex || j == kVideoIndex)
1780                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1781                        HLSTime tmpTime(meta);
1782                        if (startTime < tmpTime) {
1783                            startTime = tmpTime;
1784                        }
1785                    }
1786
1787                    if (!switching) {
1788                        // selecting, or adapting but no swap required
1789                        sources[j]->clear();
1790                        if (j == kSubtitleIndex) {
1791                            break;
1792                        }
1793
1794                        ALOGV("stream[%zu]: queue format change", j);
1795                        sources[j]->queueDiscontinuity(
1796                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1797                    } else {
1798                        // switching, queue discontinuities after resume
1799                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1800                        sources[j]->clear();
1801                        // the new fetcher might be providing streams that used to be
1802                        // provided by two different fetchers,  if one of the fetcher
1803                        // paused in the middle while the other somehow paused in next
1804                        // seg, we have to start from next seg.
1805                        if (seekMode < mStreams[j].mSeekMode) {
1806                            seekMode = mStreams[j].mSeekMode;
1807                        }
1808                    }
1809                }
1810
1811                streamMask &= ~indexToType(j);
1812            }
1813        }
1814
1815        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1816                "segmentStartTimeUs %lld seekMode %d",
1817                fetcher->getFetcherID(),
1818                (long long)startTime.mTimeUs,
1819                (long long)mLastSeekTimeUs,
1820                (long long)startTime.getSegmentTimeUs(),
1821                seekMode);
1822
1823        // Set the target segment start time to the middle point of the
1824        // segment where the last sample was.
1825        // This gives a better guess if segments of the two variants are not
1826        // perfectly aligned. (If the corresponding segment in new variant
1827        // starts slightly later than that in the old variant, we still want
1828        // to pick that segment, not the one before)
1829        fetcher->startAsync(
1830                sources[kAudioIndex],
1831                sources[kVideoIndex],
1832                sources[kSubtitleIndex],
1833                getMetadataSource(sources, mNewStreamMask, switching),
1834                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1835                startTime.getSegmentTimeUs(),
1836                startTime.mSeq,
1837                seekMode);
1838    }
1839
1840    // All fetchers have now been started, the configuration change
1841    // has completed.
1842
1843    mReconfigurationInProgress = false;
1844    if (switching) {
1845        mSwitchInProgress = true;
1846    } else {
1847        mStreamMask = mNewStreamMask;
1848        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1849            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1850                    mOrigBandwidthIndex, mCurBandwidthIndex);
1851            mOrigBandwidthIndex = mCurBandwidthIndex;
1852        }
1853    }
1854
1855    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1856            mSwitchInProgress, mStreamMask);
1857
1858    if (mDisconnectReplyID != NULL) {
1859        finishDisconnect();
1860    }
1861}
1862
1863void LiveSession::swapPacketSource(StreamType stream) {
1864    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1865
1866    // transfer packets from source2 to source
1867    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1868    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1869
1870    // queue discontinuity in mPacketSource
1871    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1872
1873    // queue packets in mPacketSource2 to mPacketSource
1874    status_t finalResult = OK;
1875    sp<ABuffer> accessUnit;
1876    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1877          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1878        aps->queueAccessUnit(accessUnit);
1879    }
1880    aps2->clear();
1881}
1882
1883void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1884    if (!mSwitchInProgress) {
1885        return;
1886    }
1887
1888    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1889    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1890        return;
1891    }
1892
1893    // Swap packet source of streams provided by old variant
1894    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1895        StreamType stream = indexToType(idx);
1896        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1897            swapPacketSource(stream);
1898
1899            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1900                ALOGW("swapping stream type %d %s to empty stream",
1901                        stream, mStreams[idx].mUri.c_str());
1902            }
1903            mStreams[idx].mUri = mStreams[idx].mNewUri;
1904            mStreams[idx].mNewUri.clear();
1905
1906            mSwapMask &= ~stream;
1907        }
1908    }
1909
1910    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1911
1912    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
1913    if (mSwapMask != 0) {
1914        return;
1915    }
1916
1917    // Check if new variant contains extra streams.
1918    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1919    while (extraStreams) {
1920        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
1921        extraStreams &= ~stream;
1922
1923        swapPacketSource(stream);
1924
1925        ssize_t idx = typeToIndex(stream);
1926        CHECK(idx >= 0);
1927        if (mStreams[idx].mNewUri.empty()) {
1928            ALOGW("swapping extra stream type %d %s to empty stream",
1929                    stream, mStreams[idx].mUri.c_str());
1930        }
1931        mStreams[idx].mUri = mStreams[idx].mNewUri;
1932        mStreams[idx].mNewUri.clear();
1933    }
1934
1935    // Restart new fetcher (it was paused after the first 47k block)
1936    // and let it fetch into mPacketSources (not mPacketSources2)
1937    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1938        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1939        if (info.mToBeResumed) {
1940            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
1941            info.mToBeResumed = false;
1942        }
1943    }
1944
1945    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
1946            mOrigBandwidthIndex, mCurBandwidthIndex);
1947
1948    mStreamMask = mNewStreamMask;
1949    mSwitchInProgress = false;
1950    mOrigBandwidthIndex = mCurBandwidthIndex;
1951
1952    restartPollBuffering();
1953}
1954
1955void LiveSession::schedulePollBuffering() {
1956    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1957    msg->setInt32("generation", mPollBufferingGeneration);
1958    msg->post(1000000ll);
1959}
1960
1961void LiveSession::cancelPollBuffering() {
1962    ++mPollBufferingGeneration;
1963    mPrevBufferPercentage = -1;
1964}
1965
1966void LiveSession::restartPollBuffering() {
1967    cancelPollBuffering();
1968    onPollBuffering();
1969}
1970
1971void LiveSession::onPollBuffering() {
1972    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1973            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
1974        mSwitchInProgress, mReconfigurationInProgress,
1975        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
1976
1977    bool underflow, ready, down, up;
1978    if (checkBuffering(underflow, ready, down, up)) {
1979        if (mInPreparationPhase) {
1980            // Allow down switch even if we're still preparing.
1981            //
1982            // Some streams have a high bandwidth index as default,
1983            // when bandwidth is low, it takes a long time to buffer
1984            // to ready mark, then it immediately pauses after start
1985            // as we have to do a down switch. It's better experience
1986            // to restart from a lower index, if we detect low bw.
1987            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
1988                postPrepared(OK);
1989            }
1990        }
1991
1992        if (!mInPreparationPhase) {
1993            if (ready) {
1994                stopBufferingIfNecessary();
1995            } else if (underflow) {
1996                startBufferingIfNecessary();
1997            }
1998            switchBandwidthIfNeeded(up, down);
1999        }
2000    }
2001
2002    schedulePollBuffering();
2003}
2004
2005void LiveSession::cancelBandwidthSwitch(bool resume) {
2006    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2007            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2008    if (!mSwitchInProgress) {
2009        return;
2010    }
2011
2012    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2013        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2014        if (info.mToBeRemoved) {
2015            info.mToBeRemoved = false;
2016            if (resume) {
2017                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2018            }
2019        }
2020    }
2021
2022    for (size_t i = 0; i < kMaxStreams; ++i) {
2023        AString newUri = mStreams[i].mNewUri;
2024        if (!newUri.empty()) {
2025            // clear all mNewUri matching this newUri
2026            for (size_t j = i; j < kMaxStreams; ++j) {
2027                if (mStreams[j].mNewUri == newUri) {
2028                    mStreams[j].mNewUri.clear();
2029                }
2030            }
2031            ALOGV("stopping newUri = %s", newUri.c_str());
2032            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2033            if (index < 0) {
2034                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2035                continue;
2036            }
2037            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2038            info.mToBeRemoved = true;
2039            info.mFetcher->stopAsync();
2040        }
2041    }
2042
2043    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2044            mOrigBandwidthIndex, mCurBandwidthIndex);
2045
2046    mSwitchGeneration++;
2047    mSwitchInProgress = false;
2048    mCurBandwidthIndex = mOrigBandwidthIndex;
2049    mSwapMask = 0;
2050}
2051
2052bool LiveSession::checkBuffering(
2053        bool &underflow, bool &ready, bool &down, bool &up) {
2054    underflow = ready = down = up = false;
2055
2056    if (mReconfigurationInProgress) {
2057        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2058        return false;
2059    }
2060
2061    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2062    activeCount = underflowCount = readyCount = downCount = upCount =0;
2063    int32_t minBufferPercent = -1;
2064    int64_t durationUs;
2065    if (getDuration(&durationUs) != OK) {
2066        durationUs = -1;
2067    }
2068    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2069        // we don't check subtitles for buffering level
2070        if (!(mStreamMask & mPacketSources.keyAt(i)
2071                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2072            continue;
2073        }
2074        // ignore streams that never had any packet queued.
2075        // (it's possible that the variant only has audio or video)
2076        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2077        if (meta == NULL) {
2078            continue;
2079        }
2080
2081        status_t finalResult;
2082        int64_t bufferedDurationUs =
2083                mPacketSources[i]->getBufferedDurationUs(&finalResult);
2084        ALOGV("[%s] buffered %lld us",
2085                getNameForStream(mPacketSources.keyAt(i)),
2086                (long long)bufferedDurationUs);
2087        if (durationUs >= 0) {
2088            int32_t percent;
2089            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2090                percent = 100;
2091            } else {
2092                percent = (int32_t)(100.0 *
2093                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2094            }
2095            if (minBufferPercent < 0 || percent < minBufferPercent) {
2096                minBufferPercent = percent;
2097            }
2098        }
2099
2100        ++activeCount;
2101        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2102        if (bufferedDurationUs > readyMark
2103                || mPacketSources[i]->isFinished(0)) {
2104            ++readyCount;
2105        }
2106        if (!mPacketSources[i]->isFinished(0)) {
2107            if (bufferedDurationUs < kUnderflowMarkUs) {
2108                ++underflowCount;
2109            }
2110            if (bufferedDurationUs > mUpSwitchMark) {
2111                ++upCount;
2112            }
2113            if (bufferedDurationUs < mDownSwitchMark) {
2114                ++downCount;
2115            }
2116        }
2117    }
2118
2119    if (minBufferPercent >= 0) {
2120        notifyBufferingUpdate(minBufferPercent);
2121    }
2122
2123    if (activeCount > 0) {
2124        up        = (upCount == activeCount);
2125        down      = (downCount > 0);
2126        ready     = (readyCount == activeCount);
2127        underflow = (underflowCount > 0);
2128        return true;
2129    }
2130
2131    return false;
2132}
2133
2134void LiveSession::startBufferingIfNecessary() {
2135    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2136            mInPreparationPhase, mBuffering);
2137    if (!mBuffering) {
2138        mBuffering = true;
2139
2140        sp<AMessage> notify = mNotify->dup();
2141        notify->setInt32("what", kWhatBufferingStart);
2142        notify->post();
2143    }
2144}
2145
2146void LiveSession::stopBufferingIfNecessary() {
2147    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2148            mInPreparationPhase, mBuffering);
2149
2150    if (mBuffering) {
2151        mBuffering = false;
2152
2153        sp<AMessage> notify = mNotify->dup();
2154        notify->setInt32("what", kWhatBufferingEnd);
2155        notify->post();
2156    }
2157}
2158
2159void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2160    if (percentage < mPrevBufferPercentage) {
2161        percentage = mPrevBufferPercentage;
2162    } else if (percentage > 100) {
2163        percentage = 100;
2164    }
2165
2166    mPrevBufferPercentage = percentage;
2167
2168    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2169
2170    sp<AMessage> notify = mNotify->dup();
2171    notify->setInt32("what", kWhatBufferingUpdate);
2172    notify->setInt32("percentage", percentage);
2173    notify->post();
2174}
2175
2176/*
2177 * returns true if a bandwidth switch is actually needed (and started),
2178 * returns false otherwise
2179 */
2180bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2181    // no need to check bandwidth if we only have 1 bandwidth settings
2182    if (mSwitchInProgress || mBandwidthItems.size() < 2) {
2183        return false;
2184    }
2185
2186    int32_t bandwidthBps;
2187    bool isStable;
2188    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps, &isStable)) {
2189        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
2190        mLastBandwidthBps = bandwidthBps;
2191    } else {
2192        ALOGV("no bandwidth estimate.");
2193        return false;
2194    }
2195
2196    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2197    // canSwithDown and canSwitchUp can't both be true.
2198    // we only want to switch up when measured bw is 120% higher than current variant,
2199    // and we only want to switch down when measured bw is below current variant.
2200    bool canSwitchDown = bufferLow
2201            && (bandwidthBps < (int32_t)curBandwidth);
2202    bool canSwitchUp = bufferHigh
2203            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2204
2205    if (canSwitchDown || canSwitchUp) {
2206        // bandwidth estimating has some delay, if we have to downswitch when
2207        // it hasn't stabilized, be very conservative on bandwidth.
2208        if (!isStable && canSwitchDown) {
2209            bandwidthBps /= 2;
2210        }
2211
2212        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2213
2214        // it's possible that we're checking for canSwitchUp case, but the returned
2215        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2216        // of measured bw. In that case we don't want to do anything, since we have
2217        // both enough buffer and enough bw.
2218        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2219         || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2220            // if not yet prepared, just restart again with new bw index.
2221            // this is faster and playback experience is cleaner.
2222            changeConfiguration(
2223                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2224            return true;
2225        }
2226    }
2227    return false;
2228}
2229
2230void LiveSession::postError(status_t err) {
2231    // if we reached EOS, notify buffering of 100%
2232    if (err == ERROR_END_OF_STREAM) {
2233        notifyBufferingUpdate(100);
2234    }
2235    // we'll stop buffer polling now, before that notify
2236    // stop buffering to stop the spinning icon
2237    stopBufferingIfNecessary();
2238    cancelPollBuffering();
2239
2240    sp<AMessage> notify = mNotify->dup();
2241    notify->setInt32("what", kWhatError);
2242    notify->setInt32("err", err);
2243    notify->post();
2244}
2245
2246void LiveSession::postPrepared(status_t err) {
2247    CHECK(mInPreparationPhase);
2248
2249    sp<AMessage> notify = mNotify->dup();
2250    if (err == OK || err == ERROR_END_OF_STREAM) {
2251        notify->setInt32("what", kWhatPrepared);
2252    } else {
2253        cancelPollBuffering();
2254
2255        notify->setInt32("what", kWhatPreparationFailed);
2256        notify->setInt32("err", err);
2257    }
2258
2259    notify->post();
2260
2261    mInPreparationPhase = false;
2262}
2263
2264
2265}  // namespace android
2266
2267