LiveSession.cpp revision a0d0ba51ad60a68117a0ee78e37ab78715b8a069
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/foundation/AUtils.h>
37#include <media/stagefright/DataSource.h>
38#include <media/stagefright/FileSource.h>
39#include <media/stagefright/MediaErrors.h>
40#include <media/stagefright/MediaHTTP.h>
41#include <media/stagefright/MetaData.h>
42#include <media/stagefright/Utils.h>
43
44#include <utils/Mutex.h>
45
46#include <ctype.h>
47#include <inttypes.h>
48#include <openssl/aes.h>
49#include <openssl/md5.h>
50
51namespace android {
52
53// static
54// Bandwidth Switch Mark Defaults
55const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
56const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
57const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
58const int64_t LiveSession::kResumeThresholdUs = 100000ll;
59
60// Buffer Prepare/Ready/Underflow Marks
61const int64_t LiveSession::kReadyMarkUs = 5000000ll;
62const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
63const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
64
65struct LiveSession::BandwidthEstimator : public RefBase {
66    BandwidthEstimator();
67
68    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
69    bool estimateBandwidth(int32_t *bandwidth);
70
71private:
72    // Bandwidth estimation parameters
73    static const int32_t kMaxBandwidthHistoryItems = 20;
74    static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec
75
76    struct BandwidthEntry {
77        int64_t mDelayUs;
78        size_t mNumBytes;
79    };
80
81    Mutex mLock;
82    List<BandwidthEntry> mBandwidthHistory;
83    int64_t mTotalTransferTimeUs;
84    size_t mTotalTransferBytes;
85
86    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
87};
88
89LiveSession::BandwidthEstimator::BandwidthEstimator() :
90    mTotalTransferTimeUs(0),
91    mTotalTransferBytes(0) {
92}
93
94void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
95        size_t numBytes, int64_t delayUs) {
96    AutoMutex autoLock(mLock);
97
98    BandwidthEntry entry;
99    entry.mDelayUs = delayUs;
100    entry.mNumBytes = numBytes;
101    mTotalTransferTimeUs += delayUs;
102    mTotalTransferBytes += numBytes;
103    mBandwidthHistory.push_back(entry);
104
105    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
106    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
107    while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) {
108        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
109        if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) {
110            break;
111        }
112        mTotalTransferTimeUs -= it->mDelayUs;
113        mTotalTransferBytes -= it->mNumBytes;
114        mBandwidthHistory.erase(mBandwidthHistory.begin());
115    }
116}
117
118bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
119    AutoMutex autoLock(mLock);
120
121    if (mBandwidthHistory.size() < 2) {
122        return false;
123    }
124
125    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
126    return true;
127}
128
129//static
130const char *LiveSession::getKeyForStream(StreamType type) {
131    switch (type) {
132        case STREAMTYPE_VIDEO:
133            return "timeUsVideo";
134        case STREAMTYPE_AUDIO:
135            return "timeUsAudio";
136        case STREAMTYPE_SUBTITLES:
137            return "timeUsSubtitle";
138        default:
139            TRESPASS();
140    }
141    return NULL;
142}
143
144//static
145const char *LiveSession::getNameForStream(StreamType type) {
146    switch (type) {
147        case STREAMTYPE_VIDEO:
148            return "video";
149        case STREAMTYPE_AUDIO:
150            return "audio";
151        case STREAMTYPE_SUBTITLES:
152            return "subs";
153        default:
154            break;
155    }
156    return "unknown";
157}
158
159LiveSession::LiveSession(
160        const sp<AMessage> &notify, uint32_t flags,
161        const sp<IMediaHTTPService> &httpService)
162    : mNotify(notify),
163      mFlags(flags),
164      mHTTPService(httpService),
165      mBuffering(false),
166      mInPreparationPhase(true),
167      mPollBufferingGeneration(0),
168      mPrevBufferPercentage(-1),
169      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
170      mCurBandwidthIndex(-1),
171      mOrigBandwidthIndex(-1),
172      mLastBandwidthBps(-1ll),
173      mBandwidthEstimator(new BandwidthEstimator()),
174      mMaxWidth(720),
175      mMaxHeight(480),
176      mStreamMask(0),
177      mNewStreamMask(0),
178      mSwapMask(0),
179      mSwitchGeneration(0),
180      mSubtitleGeneration(0),
181      mLastDequeuedTimeUs(0ll),
182      mRealTimeBaseUs(0ll),
183      mReconfigurationInProgress(false),
184      mSwitchInProgress(false),
185      mUpSwitchMark(kUpSwitchMarkUs),
186      mDownSwitchMark(kDownSwitchMarkUs),
187      mUpSwitchMargin(kUpSwitchMarginUs),
188      mFirstTimeUsValid(false),
189      mFirstTimeUs(0),
190      mLastSeekTimeUs(0) {
191    mStreams[kAudioIndex] = StreamItem("audio");
192    mStreams[kVideoIndex] = StreamItem("video");
193    mStreams[kSubtitleIndex] = StreamItem("subtitles");
194
195    for (size_t i = 0; i < kMaxStreams; ++i) {
196        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
197        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
198    }
199}
200
201LiveSession::~LiveSession() {
202    if (mFetcherLooper != NULL) {
203        mFetcherLooper->stop();
204    }
205}
206
207status_t LiveSession::dequeueAccessUnit(
208        StreamType stream, sp<ABuffer> *accessUnit) {
209    status_t finalResult = OK;
210    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
211
212    ssize_t streamIdx = typeToIndex(stream);
213    if (streamIdx < 0) {
214        return INVALID_VALUE;
215    }
216    const char *streamStr = getNameForStream(stream);
217    // Do not let client pull data if we don't have data packets yet.
218    // We might only have a format discontinuity queued without data.
219    // When NuPlayerDecoder dequeues the format discontinuity, it will
220    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
221    // thinks it can do seamless change, so will not shutdown decoder.
222    // When the actual format arrives, it can't handle it and get stuck.
223    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
224        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
225                streamStr, finalResult);
226
227        if (finalResult == OK) {
228            return -EAGAIN;
229        } else {
230            return finalResult;
231        }
232    }
233
234    // Let the client dequeue as long as we have buffers available
235    // Do not make pause/resume decisions here.
236
237    status_t err = packetSource->dequeueAccessUnit(accessUnit);
238
239    StreamItem& strm = mStreams[streamIdx];
240    if (err == INFO_DISCONTINUITY) {
241        // adaptive streaming, discontinuities in the playlist
242        int32_t type;
243        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
244
245        sp<AMessage> extra;
246        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
247            extra.clear();
248        }
249
250        ALOGI("[%s] read discontinuity of type %d, extra = %s",
251              streamStr,
252              type,
253              extra == NULL ? "NULL" : extra->debugString().c_str());
254    } else if (err == OK) {
255
256        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
257            int64_t timeUs, originalTimeUs;
258            int32_t discontinuitySeq = 0;
259            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
260            originalTimeUs = timeUs;
261            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
262            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
263                int64_t offsetTimeUs;
264                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
265                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
266                } else {
267                    offsetTimeUs = 0;
268                }
269
270                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
271                    int64_t firstTimeUs;
272                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
273                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
274                    offsetTimeUs += strm.mLastSampleDurationUs;
275                } else {
276                    offsetTimeUs += strm.mLastSampleDurationUs;
277                }
278
279                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
280                strm.mCurDiscontinuitySeq = discontinuitySeq;
281            }
282
283            int32_t discard = 0;
284            int64_t firstTimeUs;
285            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
286                int64_t durUs; // approximate sample duration
287                if (timeUs > strm.mLastDequeuedTimeUs) {
288                    durUs = timeUs - strm.mLastDequeuedTimeUs;
289                } else {
290                    durUs = strm.mLastDequeuedTimeUs - timeUs;
291                }
292                strm.mLastSampleDurationUs = durUs;
293                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
294            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
295                firstTimeUs = timeUs;
296            } else {
297                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
298                firstTimeUs = timeUs;
299            }
300
301            strm.mLastDequeuedTimeUs = timeUs;
302            if (timeUs >= firstTimeUs) {
303                timeUs -= firstTimeUs;
304            } else {
305                timeUs = 0;
306            }
307            timeUs += mLastSeekTimeUs;
308            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
309                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
310            }
311
312            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
313                    streamStr, (long long)timeUs, (long long)originalTimeUs);
314            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
315            mLastDequeuedTimeUs = timeUs;
316            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
317        } else if (stream == STREAMTYPE_SUBTITLES) {
318            int32_t subtitleGeneration;
319            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
320                    && subtitleGeneration != mSubtitleGeneration) {
321               return -EAGAIN;
322            };
323            (*accessUnit)->meta()->setInt32(
324                    "trackIndex", mPlaylist->getSelectedIndex());
325            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
326        }
327    } else {
328        ALOGI("[%s] encountered error %d", streamStr, err);
329    }
330
331    return err;
332}
333
334status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
335    if (!(mStreamMask & stream)) {
336        return UNKNOWN_ERROR;
337    }
338
339    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
340
341    sp<MetaData> meta = packetSource->getFormat();
342
343    if (meta == NULL) {
344        return -EAGAIN;
345    }
346
347    if (stream == STREAMTYPE_AUDIO) {
348        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
349        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
350    } else if (stream == STREAMTYPE_VIDEO) {
351        meta->setInt32(kKeyMaxWidth, mMaxWidth);
352        meta->setInt32(kKeyMaxHeight, mMaxHeight);
353    }
354
355    return convertMetaDataToMessage(meta, format);
356}
357
358sp<HTTPBase> LiveSession::getHTTPDataSource() {
359    return new MediaHTTP(mHTTPService->makeHTTPConnection());
360}
361
362void LiveSession::connectAsync(
363        const char *url, const KeyedVector<String8, String8> *headers) {
364    sp<AMessage> msg = new AMessage(kWhatConnect, this);
365    msg->setString("url", url);
366
367    if (headers != NULL) {
368        msg->setPointer(
369                "headers",
370                new KeyedVector<String8, String8>(*headers));
371    }
372
373    msg->post();
374}
375
376status_t LiveSession::disconnect() {
377    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
378
379    sp<AMessage> response;
380    status_t err = msg->postAndAwaitResponse(&response);
381
382    return err;
383}
384
385status_t LiveSession::seekTo(int64_t timeUs) {
386    sp<AMessage> msg = new AMessage(kWhatSeek, this);
387    msg->setInt64("timeUs", timeUs);
388
389    sp<AMessage> response;
390    status_t err = msg->postAndAwaitResponse(&response);
391
392    return err;
393}
394
395bool LiveSession::checkSwitchProgress(
396        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
397    AString newUri;
398    CHECK(stopParams->findString("uri", &newUri));
399
400    *needResumeUntil = false;
401    sp<AMessage> firstNewMeta[kMaxStreams];
402    for (size_t i = 0; i < kMaxStreams; ++i) {
403        StreamType stream = indexToType(i);
404        if (!(mSwapMask & mNewStreamMask & stream)
405            || (mStreams[i].mNewUri != newUri)) {
406            continue;
407        }
408        if (stream == STREAMTYPE_SUBTITLES) {
409            continue;
410        }
411        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
412
413        // First, get latest dequeued meta, which is where the decoder is at.
414        // (when upswitching, we take the meta after a certain delay, so that
415        // the decoder is left with some cushion)
416        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
417        if (delayUs > 0) {
418            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
419            if (lastDequeueMeta == NULL) {
420                // this means we don't have enough cushion, try again later
421                ALOGV("[%s] up switching failed due to insufficient buffer",
422                        getNameForStream(stream));
423                return false;
424            }
425        } else {
426            // It's okay for lastDequeueMeta to be NULL here, it means the
427            // decoder hasn't even started dequeueing
428            lastDequeueMeta = source->getLatestDequeuedMeta();
429        }
430        // Then, trim off packets at beginning of mPacketSources2 that's before
431        // the latest dequeued time. These samples are definitely too late.
432        firstNewMeta[i] = mPacketSources2.editValueAt(i)
433                            ->trimBuffersBeforeMeta(lastDequeueMeta);
434
435        // Now firstNewMeta[i] is the first sample after the trim.
436        // If it's NULL, we failed because dequeue already past all samples
437        // in mPacketSource2, we have to try again.
438        if (firstNewMeta[i] == NULL) {
439            HLSTime dequeueTime(lastDequeueMeta);
440            ALOGV("[%s] dequeue time (%d, %lld) past start time",
441                    getNameForStream(stream),
442                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
443            return false;
444        }
445
446        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
447        // already fetched, and see if we need to resumeUntil
448        lastEnqueueMeta = source->getLatestEnqueuedMeta();
449        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
450        // boundary, no need to resume as the content will look different anyways
451        if (lastEnqueueMeta != NULL) {
452            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
453
454            // no need to resume old fetcher if new fetcher started in different
455            // discontinuity sequence, as the content will look different.
456            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
457                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
458
459            // update the stopTime for resumeUntil
460            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
461            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
462        }
463    }
464
465    // if we're here, it means dequeue progress hasn't passed some samples in
466    // mPacketSource2, we can trim off the excess in mPacketSource.
467    // (old fetcher might still need to resumeUntil the start time of new fetcher)
468    for (size_t i = 0; i < kMaxStreams; ++i) {
469        StreamType stream = indexToType(i);
470        if (!(mSwapMask & mNewStreamMask & stream)
471            || (newUri != mStreams[i].mNewUri)
472            || stream == STREAMTYPE_SUBTITLES) {
473            continue;
474        }
475        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
476    }
477
478    // no resumeUntil if already underflow
479    *needResumeUntil &= !mBuffering;
480
481    return true;
482}
483
484void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
485    switch (msg->what()) {
486        case kWhatConnect:
487        {
488            onConnect(msg);
489            break;
490        }
491
492        case kWhatDisconnect:
493        {
494            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
495
496            if (mReconfigurationInProgress) {
497                break;
498            }
499
500            finishDisconnect();
501            break;
502        }
503
504        case kWhatSeek:
505        {
506            if (mReconfigurationInProgress) {
507                msg->post(50000);
508                break;
509            }
510
511            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
512            mSeekReply = new AMessage;
513
514            onSeek(msg);
515            break;
516        }
517
518        case kWhatFetcherNotify:
519        {
520            int32_t what;
521            CHECK(msg->findInt32("what", &what));
522
523            switch (what) {
524                case PlaylistFetcher::kWhatStarted:
525                    break;
526                case PlaylistFetcher::kWhatPaused:
527                case PlaylistFetcher::kWhatStopped:
528                {
529                    AString uri;
530                    CHECK(msg->findString("uri", &uri));
531                    ssize_t index = mFetcherInfos.indexOfKey(uri);
532                    if (index < 0) {
533                        // ignore msgs from fetchers that's already gone
534                        break;
535                    }
536
537                    ALOGV("fetcher-%d %s",
538                            mFetcherInfos[index].mFetcher->getFetcherID(),
539                            what == PlaylistFetcher::kWhatPaused ?
540                                    "paused" : "stopped");
541
542                    if (what == PlaylistFetcher::kWhatStopped) {
543                        mFetcherLooper->unregisterHandler(
544                                mFetcherInfos[index].mFetcher->id());
545                        mFetcherInfos.removeItemsAt(index);
546                    } else if (what == PlaylistFetcher::kWhatPaused) {
547                        int32_t seekMode;
548                        CHECK(msg->findInt32("seekMode", &seekMode));
549                        for (size_t i = 0; i < kMaxStreams; ++i) {
550                            if (mStreams[i].mUri == uri) {
551                                mStreams[i].mSeekMode = (SeekMode) seekMode;
552                            }
553                        }
554                    }
555
556                    if (mContinuation != NULL) {
557                        CHECK_GT(mContinuationCounter, 0);
558                        if (--mContinuationCounter == 0) {
559                            mContinuation->post();
560                        }
561                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
562                    }
563                    break;
564                }
565
566                case PlaylistFetcher::kWhatDurationUpdate:
567                {
568                    AString uri;
569                    CHECK(msg->findString("uri", &uri));
570
571                    int64_t durationUs;
572                    CHECK(msg->findInt64("durationUs", &durationUs));
573
574                    ssize_t index = mFetcherInfos.indexOfKey(uri);
575                    if (index >= 0) {
576                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
577                        info->mDurationUs = durationUs;
578                    }
579                    break;
580                }
581
582                case PlaylistFetcher::kWhatTargetDurationUpdate:
583                {
584                    int64_t targetDurationUs;
585                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
586                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
587                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
588                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
589                    break;
590                }
591
592                case PlaylistFetcher::kWhatError:
593                {
594                    status_t err;
595                    CHECK(msg->findInt32("err", &err));
596
597                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
598
599                    // handle EOS on subtitle tracks independently
600                    AString uri;
601                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
602                        ssize_t i = mFetcherInfos.indexOfKey(uri);
603                        if (i >= 0) {
604                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
605                            if (fetcher != NULL) {
606                                uint32_t type = fetcher->getStreamTypeMask();
607                                if (type == STREAMTYPE_SUBTITLES) {
608                                    mPacketSources.valueFor(
609                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
610                                    break;
611                                }
612                            }
613                        }
614                    }
615
616                    if (mInPreparationPhase) {
617                        postPrepared(err);
618                    }
619
620                    cancelBandwidthSwitch();
621
622                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
623
624                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
625
626                    mPacketSources.valueFor(
627                            STREAMTYPE_SUBTITLES)->signalEOS(err);
628
629                    postError(err);
630                    break;
631                }
632
633                case PlaylistFetcher::kWhatStopReached:
634                {
635                    ALOGV("kWhatStopReached");
636
637                    AString oldUri;
638                    CHECK(msg->findString("uri", &oldUri));
639
640                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
641                    if (index < 0) {
642                        break;
643                    }
644
645                    tryToFinishBandwidthSwitch(oldUri);
646                    break;
647                }
648
649                case PlaylistFetcher::kWhatStartedAt:
650                {
651                    int32_t switchGeneration;
652                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
653
654                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
655                            switchGeneration, mSwitchGeneration);
656
657                    if (switchGeneration != mSwitchGeneration) {
658                        break;
659                    }
660
661                    AString uri;
662                    CHECK(msg->findString("uri", &uri));
663
664                    // mark new fetcher mToBeResumed
665                    ssize_t index = mFetcherInfos.indexOfKey(uri);
666                    if (index >= 0) {
667                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
668                    }
669
670                    // temporarily disable packet sources to be swapped to prevent
671                    // NuPlayerDecoder from dequeuing while we check progress
672                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
673                        if ((mSwapMask & mPacketSources.keyAt(i))
674                                && uri == mStreams[i].mNewUri) {
675                            mPacketSources.editValueAt(i)->enable(false);
676                        }
677                    }
678                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
679                    // If switching up, require a cushion bigger than kUnderflowMark
680                    // to avoid buffering immediately after the switch.
681                    // (If we don't have that cushion we'd rather cancel and try again.)
682                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
683                    bool needResumeUntil = false;
684                    sp<AMessage> stopParams = msg;
685                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
686                        // playback time hasn't passed startAt time
687                        if (!needResumeUntil) {
688                            ALOGV("finish switch");
689                            for (size_t i = 0; i < kMaxStreams; ++i) {
690                                if ((mSwapMask & indexToType(i))
691                                        && uri == mStreams[i].mNewUri) {
692                                    // have to make a copy of mStreams[i].mUri because
693                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
694                                    AString oldURI = mStreams[i].mUri;
695                                    tryToFinishBandwidthSwitch(oldURI);
696                                    break;
697                                }
698                            }
699                        } else {
700                            // startAt time is after last enqueue time
701                            // Resume fetcher for the original variant; the resumed fetcher should
702                            // continue until the timestamps found in msg, which is stored by the
703                            // new fetcher to indicate where the new variant has started buffering.
704                            ALOGV("finish switch with resumeUntilAsync");
705                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
706                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
707                                if (info.mToBeRemoved) {
708                                    info.mFetcher->resumeUntilAsync(stopParams);
709                                }
710                            }
711                        }
712                    } else {
713                        // playback time passed startAt time
714                        if (switchUp) {
715                            // if switching up, cancel and retry if condition satisfies again
716                            ALOGV("cancel up switch because we're too late");
717                            cancelBandwidthSwitch(true /* resume */);
718                        } else {
719                            ALOGV("retry down switch at next sample");
720                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
721                        }
722                    }
723                    // re-enable all packet sources
724                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
725                        mPacketSources.editValueAt(i)->enable(true);
726                    }
727
728                    break;
729                }
730
731                default:
732                    TRESPASS();
733            }
734
735            break;
736        }
737
738        case kWhatChangeConfiguration:
739        {
740            onChangeConfiguration(msg);
741            break;
742        }
743
744        case kWhatChangeConfiguration2:
745        {
746            onChangeConfiguration2(msg);
747            break;
748        }
749
750        case kWhatChangeConfiguration3:
751        {
752            onChangeConfiguration3(msg);
753            break;
754        }
755
756        case kWhatFinishDisconnect2:
757        {
758            onFinishDisconnect2();
759            break;
760        }
761
762        case kWhatPollBuffering:
763        {
764            int32_t generation;
765            CHECK(msg->findInt32("generation", &generation));
766            if (generation == mPollBufferingGeneration) {
767                onPollBuffering();
768            }
769            break;
770        }
771
772        default:
773            TRESPASS();
774            break;
775    }
776}
777
778// static
779int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
780    if (a->mBandwidth < b->mBandwidth) {
781        return -1;
782    } else if (a->mBandwidth == b->mBandwidth) {
783        return 0;
784    }
785
786    return 1;
787}
788
789// static
790LiveSession::StreamType LiveSession::indexToType(int idx) {
791    CHECK(idx >= 0 && idx < kMaxStreams);
792    return (StreamType)(1 << idx);
793}
794
795// static
796ssize_t LiveSession::typeToIndex(int32_t type) {
797    switch (type) {
798        case STREAMTYPE_AUDIO:
799            return 0;
800        case STREAMTYPE_VIDEO:
801            return 1;
802        case STREAMTYPE_SUBTITLES:
803            return 2;
804        default:
805            return -1;
806    };
807    return -1;
808}
809
810void LiveSession::onConnect(const sp<AMessage> &msg) {
811    AString url;
812    CHECK(msg->findString("url", &url));
813
814    KeyedVector<String8, String8> *headers = NULL;
815    if (!msg->findPointer("headers", (void **)&headers)) {
816        mExtraHeaders.clear();
817    } else {
818        mExtraHeaders = *headers;
819
820        delete headers;
821        headers = NULL;
822    }
823
824    // TODO currently we don't know if we are coming here from incognito mode
825    ALOGI("onConnect %s", uriDebugString(url).c_str());
826
827    mMasterURL = url;
828
829    bool dummy;
830    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
831
832    if (mPlaylist == NULL) {
833        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
834
835        postPrepared(ERROR_IO);
836        return;
837    }
838
839    // create looper for fetchers
840    if (mFetcherLooper == NULL) {
841        mFetcherLooper = new ALooper();
842
843        mFetcherLooper->setName("Fetcher");
844        mFetcherLooper->start(false, false);
845    }
846
847    // We trust the content provider to make a reasonable choice of preferred
848    // initial bandwidth by listing it first in the variant playlist.
849    // At startup we really don't have a good estimate on the available
850    // network bandwidth since we haven't tranferred any data yet. Once
851    // we have we can make a better informed choice.
852    size_t initialBandwidth = 0;
853    size_t initialBandwidthIndex = 0;
854
855    int32_t maxWidth = 0;
856    int32_t maxHeight = 0;
857
858    if (mPlaylist->isVariantPlaylist()) {
859        Vector<BandwidthItem> itemsWithVideo;
860        for (size_t i = 0; i < mPlaylist->size(); ++i) {
861            BandwidthItem item;
862
863            item.mPlaylistIndex = i;
864
865            sp<AMessage> meta;
866            AString uri;
867            mPlaylist->itemAt(i, &uri, &meta);
868
869            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
870
871            int32_t width, height;
872            if (meta->findInt32("width", &width)) {
873                maxWidth = max(maxWidth, width);
874            }
875            if (meta->findInt32("height", &height)) {
876                maxHeight = max(maxHeight, height);
877            }
878
879            mBandwidthItems.push(item);
880            if (mPlaylist->hasType(i, "video")) {
881                itemsWithVideo.push(item);
882            }
883        }
884        // remove the audio-only variants if we have at least one with video
885        if (!itemsWithVideo.empty()
886                && itemsWithVideo.size() < mBandwidthItems.size()) {
887            mBandwidthItems.clear();
888            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
889                mBandwidthItems.push(itemsWithVideo[i]);
890            }
891        }
892
893        CHECK_GT(mBandwidthItems.size(), 0u);
894        initialBandwidth = mBandwidthItems[0].mBandwidth;
895
896        mBandwidthItems.sort(SortByBandwidth);
897
898        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
899            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
900                initialBandwidthIndex = i;
901                break;
902            }
903        }
904    } else {
905        // dummy item.
906        BandwidthItem item;
907        item.mPlaylistIndex = 0;
908        item.mBandwidth = 0;
909        mBandwidthItems.push(item);
910    }
911
912    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
913    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
914
915    mPlaylist->pickRandomMediaItems();
916    changeConfiguration(
917            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
918}
919
920void LiveSession::finishDisconnect() {
921    ALOGV("finishDisconnect");
922
923    // No reconfiguration is currently pending, make sure none will trigger
924    // during disconnection either.
925    cancelBandwidthSwitch();
926
927    // cancel buffer polling
928    cancelPollBuffering();
929
930    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
931        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
932    }
933
934    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
935
936    mContinuationCounter = mFetcherInfos.size();
937    mContinuation = msg;
938
939    if (mContinuationCounter == 0) {
940        msg->post();
941    }
942}
943
944void LiveSession::onFinishDisconnect2() {
945    mContinuation.clear();
946
947    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
948    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
949
950    mPacketSources.valueFor(
951            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
952
953    sp<AMessage> response = new AMessage;
954    response->setInt32("err", OK);
955
956    response->postReply(mDisconnectReplyID);
957    mDisconnectReplyID.clear();
958}
959
960sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
961    ssize_t index = mFetcherInfos.indexOfKey(uri);
962
963    if (index >= 0) {
964        return NULL;
965    }
966
967    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
968    notify->setString("uri", uri);
969    notify->setInt32("switchGeneration", mSwitchGeneration);
970
971    FetcherInfo info;
972    info.mFetcher = new PlaylistFetcher(
973            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
974    info.mDurationUs = -1ll;
975    info.mToBeRemoved = false;
976    info.mToBeResumed = false;
977    mFetcherLooper->registerHandler(info.mFetcher);
978
979    mFetcherInfos.add(uri, info);
980
981    return info.mFetcher;
982}
983
984/*
985 * Illustration of parameters:
986 *
987 * 0      `range_offset`
988 * +------------+-------------------------------------------------------+--+--+
989 * |            |                                 | next block to fetch |  |  |
990 * |            | `source` handle => `out` buffer |                     |  |  |
991 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
992 * |            |<----------- `range_length` / buffer capacity ----------->|  |
993 * |<------------------------------ file_size ------------------------------->|
994 *
995 * Special parameter values:
996 * - range_length == -1 means entire file
997 * - block_size == 0 means entire range
998 *
999 */
1000ssize_t LiveSession::fetchFile(
1001        const char *url, sp<ABuffer> *out,
1002        int64_t range_offset, int64_t range_length,
1003        uint32_t block_size, /* download block size */
1004        sp<DataSource> *source, /* to return and reuse source */
1005        String8 *actualUrl,
1006        bool forceConnectHTTP /* force connect HTTP when resuing source */) {
1007    off64_t size;
1008    sp<DataSource> temp_source;
1009    if (source == NULL) {
1010        source = &temp_source;
1011    }
1012
1013    if (*source == NULL || forceConnectHTTP) {
1014        if (!strncasecmp(url, "file://", 7)) {
1015            *source = new FileSource(url + 7);
1016        } else if (strncasecmp(url, "http://", 7)
1017                && strncasecmp(url, "https://", 8)) {
1018            return ERROR_UNSUPPORTED;
1019        } else {
1020            KeyedVector<String8, String8> headers = mExtraHeaders;
1021            if (range_offset > 0 || range_length >= 0) {
1022                headers.add(
1023                        String8("Range"),
1024                        String8(
1025                            AStringPrintf(
1026                                "bytes=%lld-%s",
1027                                range_offset,
1028                                range_length < 0
1029                                    ? "" : AStringPrintf("%lld",
1030                                            range_offset + range_length - 1).c_str()).c_str()));
1031            }
1032
1033            HTTPBase* httpDataSource =
1034                    (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
1035            status_t err = httpDataSource->connect(url, &headers);
1036
1037            if (err != OK) {
1038                return err;
1039            }
1040
1041            if (*source == NULL) {
1042                *source = mHTTPDataSource;
1043            }
1044        }
1045    }
1046
1047    status_t getSizeErr = (*source)->getSize(&size);
1048    if (getSizeErr != OK) {
1049        size = 65536;
1050    }
1051
1052    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
1053    if (*out == NULL) {
1054        buffer->setRange(0, 0);
1055    }
1056
1057    ssize_t bytesRead = 0;
1058    // adjust range_length if only reading partial block
1059    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
1060        range_length = buffer->size() + block_size;
1061    }
1062    for (;;) {
1063        // Only resize when we don't know the size.
1064        size_t bufferRemaining = buffer->capacity() - buffer->size();
1065        if (bufferRemaining == 0 && getSizeErr != OK) {
1066            size_t bufferIncrement = buffer->size() / 2;
1067            if (bufferIncrement < 32768) {
1068                bufferIncrement = 32768;
1069            }
1070            bufferRemaining = bufferIncrement;
1071
1072            ALOGV("increasing download buffer to %zu bytes",
1073                 buffer->size() + bufferRemaining);
1074
1075            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
1076            memcpy(copy->data(), buffer->data(), buffer->size());
1077            copy->setRange(0, buffer->size());
1078
1079            buffer = copy;
1080        }
1081
1082        size_t maxBytesToRead = bufferRemaining;
1083        if (range_length >= 0) {
1084            int64_t bytesLeftInRange = range_length - buffer->size();
1085            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
1086                maxBytesToRead = bytesLeftInRange;
1087
1088                if (bytesLeftInRange == 0) {
1089                    break;
1090                }
1091            }
1092        }
1093
1094        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
1095        // to help us break out of the loop.
1096        ssize_t n = (*source)->readAt(
1097                buffer->size(), buffer->data() + buffer->size(),
1098                maxBytesToRead);
1099
1100        if (n < 0) {
1101            return n;
1102        }
1103
1104        if (n == 0) {
1105            break;
1106        }
1107
1108        buffer->setRange(0, buffer->size() + (size_t)n);
1109        bytesRead += n;
1110    }
1111
1112    *out = buffer;
1113    if (actualUrl != NULL) {
1114        *actualUrl = (*source)->getUri();
1115        if (actualUrl->isEmpty()) {
1116            *actualUrl = url;
1117        }
1118    }
1119
1120    return bytesRead;
1121}
1122
1123sp<M3UParser> LiveSession::fetchPlaylist(
1124        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
1125    ALOGV("fetchPlaylist '%s'", url);
1126
1127    *unchanged = false;
1128
1129    sp<ABuffer> buffer;
1130    String8 actualUrl;
1131    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
1132
1133    // close off the connection after use
1134    mHTTPDataSource->disconnect();
1135
1136    if (err <= 0) {
1137        return NULL;
1138    }
1139
1140    // MD5 functionality is not available on the simulator, treat all
1141    // playlists as changed.
1142
1143#if defined(HAVE_ANDROID_OS)
1144    uint8_t hash[16];
1145
1146    MD5_CTX m;
1147    MD5_Init(&m);
1148    MD5_Update(&m, buffer->data(), buffer->size());
1149
1150    MD5_Final(hash, &m);
1151
1152    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
1153        // playlist unchanged
1154        *unchanged = true;
1155
1156        return NULL;
1157    }
1158
1159    if (curPlaylistHash != NULL) {
1160        memcpy(curPlaylistHash, hash, sizeof(hash));
1161    }
1162#endif
1163
1164    sp<M3UParser> playlist =
1165        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
1166
1167    if (playlist->initCheck() != OK) {
1168        ALOGE("failed to parse .m3u8 playlist");
1169
1170        return NULL;
1171    }
1172
1173    return playlist;
1174}
1175
1176#if 0
1177static double uniformRand() {
1178    return (double)rand() / RAND_MAX;
1179}
1180#endif
1181
1182bool LiveSession::resumeFetcher(
1183        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1184    ssize_t index = mFetcherInfos.indexOfKey(uri);
1185    if (index < 0) {
1186        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1187        return false;
1188    }
1189
1190    bool resume = false;
1191    sp<AnotherPacketSource> sources[kMaxStreams];
1192    for (size_t i = 0; i < kMaxStreams; ++i) {
1193        if ((streamMask & indexToType(i))
1194            && ((!newUri && uri == mStreams[i].mUri)
1195            || (newUri && uri == mStreams[i].mNewUri))) {
1196            resume = true;
1197            if (newUri) {
1198                sources[i] = mPacketSources2.valueFor(indexToType(i));
1199                sources[i]->clear();
1200            } else {
1201                sources[i] = mPacketSources.valueFor(indexToType(i));
1202            }
1203        }
1204    }
1205
1206    if (resume) {
1207        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1208        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1209
1210        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1211                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1212
1213        fetcher->startAsync(
1214                sources[kAudioIndex],
1215                sources[kVideoIndex],
1216                sources[kSubtitleIndex],
1217                timeUs, -1, -1, seekMode);
1218    }
1219
1220    return resume;
1221}
1222
1223float LiveSession::getAbortThreshold(
1224        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1225    float abortThreshold = -1.0f;
1226    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1227        /*
1228           If we're switching down, we need to decide whether to
1229
1230           1) finish last segment of high-bandwidth variant, or
1231           2) abort last segment of high-bandwidth variant, and fetch an
1232              overlapping portion from low-bandwidth variant.
1233
1234           Here we try to maximize the amount of buffer left when the
1235           switch point is met. Given the following parameters:
1236
1237           B: our current buffering level in seconds
1238           T: target duration in seconds
1239           X: sample duration in seconds remain to fetch in last segment
1240           bw0: bandwidth of old variant (as specified in playlist)
1241           bw1: bandwidth of new variant (as specified in playlist)
1242           bw: measured bandwidth available
1243
1244           If we choose 1), when switch happens at the end of current
1245           segment, our buffering will be
1246                  B + X - X * bw0 / bw
1247
1248           If we choose 2), when switch happens where we aborted current
1249           segment, our buffering will be
1250                  B - (T - X) * bw1 / bw
1251
1252           We should only choose 1) if
1253                  X/T < bw1 / (bw1 + bw0 - bw)
1254        */
1255
1256        // Taking the measured current bandwidth at 50% face value only,
1257        // as our bandwidth estimation is a lagging indicator. Being
1258        // conservative on this, we prefer switching to lower bandwidth
1259        // unless we're really confident finishing up the last segment
1260        // of higher bandwidth will be fast.
1261        CHECK(mLastBandwidthBps >= 0);
1262        abortThreshold =
1263                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1264             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1265              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1266              - (float)mLastBandwidthBps * 0.5f);
1267        if (abortThreshold < 0.0f) {
1268            abortThreshold = -1.0f; // do not abort
1269        }
1270        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1271                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1272                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1273                mLastBandwidthBps,
1274                abortThreshold);
1275    }
1276    return abortThreshold;
1277}
1278
1279void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1280    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1281}
1282
1283size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1284    if (mBandwidthItems.size() < 2) {
1285        // shouldn't be here if we only have 1 bandwidth, check
1286        // logic to get rid of redundant bandwidth polling
1287        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1288        return 0;
1289    }
1290
1291#if 1
1292    char value[PROPERTY_VALUE_MAX];
1293    ssize_t index = -1;
1294    if (property_get("media.httplive.bw-index", value, NULL)) {
1295        char *end;
1296        index = strtol(value, &end, 10);
1297        CHECK(end > value && *end == '\0');
1298
1299        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1300            index = mBandwidthItems.size() - 1;
1301        }
1302    }
1303
1304    if (index < 0) {
1305        char value[PROPERTY_VALUE_MAX];
1306        if (property_get("media.httplive.max-bw", value, NULL)) {
1307            char *end;
1308            long maxBw = strtoul(value, &end, 10);
1309            if (end > value && *end == '\0') {
1310                if (maxBw > 0 && bandwidthBps > maxBw) {
1311                    ALOGV("bandwidth capped to %ld bps", maxBw);
1312                    bandwidthBps = maxBw;
1313                }
1314            }
1315        }
1316
1317        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1318
1319        index = mBandwidthItems.size() - 1;
1320        while (index > 0) {
1321            // be conservative (70%) to avoid overestimating and immediately
1322            // switching down again.
1323            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1324            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1325                break;
1326            }
1327            --index;
1328        }
1329    }
1330#elif 0
1331    // Change bandwidth at random()
1332    size_t index = uniformRand() * mBandwidthItems.size();
1333#elif 0
1334    // There's a 50% chance to stay on the current bandwidth and
1335    // a 50% chance to switch to the next higher bandwidth (wrapping around
1336    // to lowest)
1337    const size_t kMinIndex = 0;
1338
1339    static ssize_t mCurBandwidthIndex = -1;
1340
1341    size_t index;
1342    if (mCurBandwidthIndex < 0) {
1343        index = kMinIndex;
1344    } else if (uniformRand() < 0.5) {
1345        index = (size_t)mCurBandwidthIndex;
1346    } else {
1347        index = mCurBandwidthIndex + 1;
1348        if (index == mBandwidthItems.size()) {
1349            index = kMinIndex;
1350        }
1351    }
1352    mCurBandwidthIndex = index;
1353#elif 0
1354    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1355
1356    size_t index = mBandwidthItems.size() - 1;
1357    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1358        --index;
1359    }
1360#elif 1
1361    char value[PROPERTY_VALUE_MAX];
1362    size_t index;
1363    if (property_get("media.httplive.bw-index", value, NULL)) {
1364        char *end;
1365        index = strtoul(value, &end, 10);
1366        CHECK(end > value && *end == '\0');
1367
1368        if (index >= mBandwidthItems.size()) {
1369            index = mBandwidthItems.size() - 1;
1370        }
1371    } else {
1372        index = 0;
1373    }
1374#else
1375    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1376#endif
1377
1378    CHECK_GE(index, 0);
1379
1380    return index;
1381}
1382
1383HLSTime LiveSession::latestMediaSegmentStartTime() const {
1384    HLSTime audioTime(mPacketSources.valueFor(
1385                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1386
1387    HLSTime videoTime(mPacketSources.valueFor(
1388                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1389
1390    return audioTime < videoTime ? videoTime : audioTime;
1391}
1392
1393void LiveSession::onSeek(const sp<AMessage> &msg) {
1394    int64_t timeUs;
1395    CHECK(msg->findInt64("timeUs", &timeUs));
1396    changeConfiguration(timeUs);
1397}
1398
1399status_t LiveSession::getDuration(int64_t *durationUs) const {
1400    int64_t maxDurationUs = -1ll;
1401    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1402        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1403
1404        if (fetcherDurationUs > maxDurationUs) {
1405            maxDurationUs = fetcherDurationUs;
1406        }
1407    }
1408
1409    *durationUs = maxDurationUs;
1410
1411    return OK;
1412}
1413
1414bool LiveSession::isSeekable() const {
1415    int64_t durationUs;
1416    return getDuration(&durationUs) == OK && durationUs >= 0;
1417}
1418
1419bool LiveSession::hasDynamicDuration() const {
1420    return false;
1421}
1422
1423size_t LiveSession::getTrackCount() const {
1424    if (mPlaylist == NULL) {
1425        return 0;
1426    } else {
1427        return mPlaylist->getTrackCount();
1428    }
1429}
1430
1431sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1432    if (mPlaylist == NULL) {
1433        return NULL;
1434    } else {
1435        return mPlaylist->getTrackInfo(trackIndex);
1436    }
1437}
1438
1439status_t LiveSession::selectTrack(size_t index, bool select) {
1440    if (mPlaylist == NULL) {
1441        return INVALID_OPERATION;
1442    }
1443
1444    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1445            index, select, mSubtitleGeneration);
1446
1447    ++mSubtitleGeneration;
1448    status_t err = mPlaylist->selectTrack(index, select);
1449    if (err == OK) {
1450        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1451        msg->setInt32("pickTrack", select);
1452        msg->post();
1453    }
1454    return err;
1455}
1456
1457ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1458    if (mPlaylist == NULL) {
1459        return -1;
1460    } else {
1461        return mPlaylist->getSelectedTrack(type);
1462    }
1463}
1464
1465void LiveSession::changeConfiguration(
1466        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1467    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1468          (long long)timeUs, bandwidthIndex, pickTrack);
1469
1470    cancelBandwidthSwitch();
1471
1472    CHECK(!mReconfigurationInProgress);
1473    mReconfigurationInProgress = true;
1474    if (bandwidthIndex >= 0) {
1475        mOrigBandwidthIndex = mCurBandwidthIndex;
1476        mCurBandwidthIndex = bandwidthIndex;
1477        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1478            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1479                    mOrigBandwidthIndex, mCurBandwidthIndex);
1480        }
1481    }
1482    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1483    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1484
1485    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1486    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1487
1488    AString URIs[kMaxStreams];
1489    for (size_t i = 0; i < kMaxStreams; ++i) {
1490        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1491            streamMask |= indexToType(i);
1492        }
1493    }
1494
1495    // Step 1, stop and discard fetchers that are no longer needed.
1496    // Pause those that we'll reuse.
1497    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1498        // skip fetchers that are marked mToBeRemoved,
1499        // these are done and can't be reused
1500        if (mFetcherInfos[i].mToBeRemoved) {
1501            continue;
1502        }
1503
1504        const AString &uri = mFetcherInfos.keyAt(i);
1505        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1506
1507        bool discardFetcher = true, delayRemoval = false;
1508        for (size_t j = 0; j < kMaxStreams; ++j) {
1509            StreamType type = indexToType(j);
1510            if ((streamMask & type) && uri == URIs[j]) {
1511                resumeMask |= type;
1512                streamMask &= ~type;
1513                discardFetcher = false;
1514            }
1515        }
1516        // Delay fetcher removal if not picking tracks, AND old fetcher
1517        // has stream mask that overlaps new variant. (Okay to discard
1518        // old fetcher now, if completely no overlap.)
1519        if (discardFetcher && timeUs < 0ll && !pickTrack
1520                && (fetcher->getStreamTypeMask() & streamMask)) {
1521            discardFetcher = false;
1522            delayRemoval = true;
1523        }
1524
1525        if (discardFetcher) {
1526            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1527            fetcher->stopAsync();
1528        } else {
1529            float threshold = -1.0f; // always finish fetching by default
1530            if (timeUs >= 0ll) {
1531                // seeking, no need to finish fetching
1532                threshold = 0.0f;
1533            } else if (delayRemoval) {
1534                // adapting, abort if remaining of current segment is over threshold
1535                threshold = getAbortThreshold(
1536                        mOrigBandwidthIndex, mCurBandwidthIndex);
1537            }
1538
1539            ALOGV("pausing fetcher-%d, threshold=%.2f",
1540                    fetcher->getFetcherID(), threshold);
1541            fetcher->pauseAsync(threshold);
1542        }
1543    }
1544
1545    sp<AMessage> msg;
1546    if (timeUs < 0ll) {
1547        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1548        msg = new AMessage(kWhatChangeConfiguration3, this);
1549    } else {
1550        msg = new AMessage(kWhatChangeConfiguration2, this);
1551    }
1552    msg->setInt32("streamMask", streamMask);
1553    msg->setInt32("resumeMask", resumeMask);
1554    msg->setInt32("pickTrack", pickTrack);
1555    msg->setInt64("timeUs", timeUs);
1556    for (size_t i = 0; i < kMaxStreams; ++i) {
1557        if ((streamMask | resumeMask) & indexToType(i)) {
1558            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1559        }
1560    }
1561
1562    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1563    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1564    // fetchers have completed their asynchronous operation, we'll post
1565    // mContinuation, which then is handled below in onChangeConfiguration2.
1566    mContinuationCounter = mFetcherInfos.size();
1567    mContinuation = msg;
1568
1569    if (mContinuationCounter == 0) {
1570        msg->post();
1571    }
1572}
1573
1574void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1575    ALOGV("onChangeConfiguration");
1576
1577    if (!mReconfigurationInProgress) {
1578        int32_t pickTrack = 0;
1579        msg->findInt32("pickTrack", &pickTrack);
1580        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1581    } else {
1582        msg->post(1000000ll); // retry in 1 sec
1583    }
1584}
1585
1586void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1587    ALOGV("onChangeConfiguration2");
1588
1589    mContinuation.clear();
1590
1591    // All fetchers are either suspended or have been removed now.
1592
1593    // If we're seeking, clear all packet sources before we report
1594    // seek complete, to prevent decoder from pulling stale data.
1595    int64_t timeUs;
1596    CHECK(msg->findInt64("timeUs", &timeUs));
1597
1598    if (timeUs >= 0) {
1599        mLastSeekTimeUs = timeUs;
1600        mLastDequeuedTimeUs = timeUs;
1601
1602        for (size_t i = 0; i < mPacketSources.size(); i++) {
1603            mPacketSources.editValueAt(i)->clear();
1604        }
1605
1606        for (size_t i = 0; i < kMaxStreams; ++i) {
1607            mStreams[i].mCurDiscontinuitySeq = 0;
1608        }
1609
1610        mDiscontinuityOffsetTimesUs.clear();
1611        mDiscontinuityAbsStartTimesUs.clear();
1612
1613        if (mSeekReplyID != NULL) {
1614            CHECK(mSeekReply != NULL);
1615            mSeekReply->setInt32("err", OK);
1616            mSeekReply->postReply(mSeekReplyID);
1617            mSeekReplyID.clear();
1618            mSeekReply.clear();
1619        }
1620
1621        // restart buffer polling after seek becauese previous
1622        // buffering position is no longer valid.
1623        restartPollBuffering();
1624    }
1625
1626    uint32_t streamMask, resumeMask;
1627    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1628    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1629
1630    streamMask |= resumeMask;
1631
1632    AString URIs[kMaxStreams];
1633    for (size_t i = 0; i < kMaxStreams; ++i) {
1634        if (streamMask & indexToType(i)) {
1635            const AString &uriKey = mStreams[i].uriKey();
1636            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1637            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1638        }
1639    }
1640
1641    uint32_t changedMask = 0;
1642    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1643        // stream URI could change even if onChangeConfiguration2 is only
1644        // used for seek. Seek could happen during a bw switch, in this
1645        // case bw switch will be cancelled, but the seekTo position will
1646        // fetch from the new URI.
1647        if ((mStreamMask & streamMask & indexToType(i))
1648                && !mStreams[i].mUri.empty()
1649                && !(URIs[i] == mStreams[i].mUri)) {
1650            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1651                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1652            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1653            if (source->getLatestDequeuedMeta() != NULL) {
1654                source->queueDiscontinuity(
1655                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1656            }
1657        }
1658        // Determine which decoders to shutdown on the player side,
1659        // a decoder has to be shutdown if its streamtype was active
1660        // before but now longer isn't.
1661        if ((mStreamMask & ~streamMask & indexToType(i))) {
1662            changedMask |= indexToType(i);
1663        }
1664    }
1665
1666    if (changedMask == 0) {
1667        // If nothing changed as far as the audio/video decoders
1668        // are concerned we can proceed.
1669        onChangeConfiguration3(msg);
1670        return;
1671    }
1672
1673    // Something changed, inform the player which will shutdown the
1674    // corresponding decoders and will post the reply once that's done.
1675    // Handling the reply will continue executing below in
1676    // onChangeConfiguration3.
1677    sp<AMessage> notify = mNotify->dup();
1678    notify->setInt32("what", kWhatStreamsChanged);
1679    notify->setInt32("changedMask", changedMask);
1680
1681    msg->setWhat(kWhatChangeConfiguration3);
1682    msg->setTarget(this);
1683
1684    notify->setMessage("reply", msg);
1685    notify->post();
1686}
1687
1688void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1689    mContinuation.clear();
1690    // All remaining fetchers are still suspended, the player has shutdown
1691    // any decoders that needed it.
1692
1693    uint32_t streamMask, resumeMask;
1694    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1695    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1696
1697    mNewStreamMask = streamMask | resumeMask;
1698
1699    int64_t timeUs;
1700    int32_t pickTrack;
1701    bool switching = false;
1702    CHECK(msg->findInt64("timeUs", &timeUs));
1703    CHECK(msg->findInt32("pickTrack", &pickTrack));
1704
1705    if (timeUs < 0ll) {
1706        if (!pickTrack) {
1707            // mSwapMask contains streams that are in both old and new variant,
1708            // (in mNewStreamMask & mStreamMask) but with different URIs
1709            // (not in resumeMask).
1710            // For example, old variant has video and audio in two separate
1711            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1712            // should be 0 as there is nothing to swap. We only need to stop video,
1713            // and resume audio.
1714            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1715            switching = (mSwapMask != 0);
1716        }
1717        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1718    } else {
1719        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1720    }
1721
1722    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1723            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1724            (long long)timeUs, switching, pickTrack,
1725            mStreamMask, mNewStreamMask, mSwapMask);
1726
1727    for (size_t i = 0; i < kMaxStreams; ++i) {
1728        if (streamMask & indexToType(i)) {
1729            if (switching) {
1730                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1731            } else {
1732                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1733            }
1734        }
1735    }
1736
1737    // Of all existing fetchers:
1738    // * Resume fetchers that are still needed and assign them original packet sources.
1739    // * Mark otherwise unneeded fetchers for removal.
1740    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1741    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1742        const AString &uri = mFetcherInfos.keyAt(i);
1743        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1744            ALOGV("marking fetcher-%d to be removed",
1745                    mFetcherInfos[i].mFetcher->getFetcherID());
1746
1747            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1748        }
1749    }
1750
1751    // streamMask now only contains the types that need a new fetcher created.
1752    if (streamMask != 0) {
1753        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1754    }
1755
1756    // Find out when the original fetchers have buffered up to and start the new fetchers
1757    // at a later timestamp.
1758    for (size_t i = 0; i < kMaxStreams; i++) {
1759        if (!(indexToType(i) & streamMask)) {
1760            continue;
1761        }
1762
1763        AString uri;
1764        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1765
1766        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1767        CHECK(fetcher != NULL);
1768
1769        HLSTime startTime;
1770        SeekMode seekMode = kSeekModeExactPosition;
1771        sp<AnotherPacketSource> sources[kMaxStreams];
1772
1773        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1774            startTime = latestMediaSegmentStartTime();
1775        }
1776
1777        // TRICKY: looping from i as earlier streams are already removed from streamMask
1778        for (size_t j = i; j < kMaxStreams; ++j) {
1779            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1780            if ((streamMask & indexToType(j)) && uri == streamUri) {
1781                sources[j] = mPacketSources.valueFor(indexToType(j));
1782
1783                if (timeUs >= 0) {
1784                    startTime.mTimeUs = timeUs;
1785                } else {
1786                    int32_t type;
1787                    sp<AMessage> meta;
1788                    if (!switching) {
1789                        // selecting, or adapting but no swap required
1790                        meta = sources[j]->getLatestDequeuedMeta();
1791                    } else {
1792                        // adapting and swap required
1793                        meta = sources[j]->getLatestEnqueuedMeta();
1794                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1795                            // switching up
1796                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1797                        }
1798                    }
1799
1800                    if (j != kSubtitleIndex && meta != NULL
1801                            && !meta->findInt32("discontinuity", &type)) {
1802                        HLSTime tmpTime(meta);
1803                        if (startTime < tmpTime) {
1804                            startTime = tmpTime;
1805                        }
1806                    }
1807
1808                    if (!switching) {
1809                        // selecting, or adapting but no swap required
1810                        sources[j]->clear();
1811                        if (j == kSubtitleIndex) {
1812                            break;
1813                        }
1814
1815                        ALOGV("stream[%zu]: queue format change", j);
1816                        sources[j]->queueDiscontinuity(
1817                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1818                    } else {
1819                        // switching, queue discontinuities after resume
1820                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1821                        sources[j]->clear();
1822                        // the new fetcher might be providing streams that used to be
1823                        // provided by two different fetchers,  if one of the fetcher
1824                        // paused in the middle while the other somehow paused in next
1825                        // seg, we have to start from next seg.
1826                        if (seekMode < mStreams[j].mSeekMode) {
1827                            seekMode = mStreams[j].mSeekMode;
1828                        }
1829                    }
1830                }
1831
1832                streamMask &= ~indexToType(j);
1833            }
1834        }
1835
1836        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1837                "segmentStartTimeUs %lld seekMode %d",
1838                fetcher->getFetcherID(),
1839                (long long)startTime.mTimeUs,
1840                (long long)mLastSeekTimeUs,
1841                (long long)startTime.getSegmentTimeUs(true /* midpoint */),
1842                seekMode);
1843
1844        // Set the target segment start time to the middle point of the
1845        // segment where the last sample was.
1846        // This gives a better guess if segments of the two variants are not
1847        // perfectly aligned. (If the corresponding segment in new variant
1848        // starts slightly later than that in the old variant, we still want
1849        // to pick that segment, not the one before)
1850        fetcher->startAsync(
1851                sources[kAudioIndex],
1852                sources[kVideoIndex],
1853                sources[kSubtitleIndex],
1854                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1855                startTime.getSegmentTimeUs(true /* midpoint */),
1856                startTime.mSeq,
1857                seekMode);
1858    }
1859
1860    // All fetchers have now been started, the configuration change
1861    // has completed.
1862
1863    mReconfigurationInProgress = false;
1864    if (switching) {
1865        mSwitchInProgress = true;
1866    } else {
1867        mStreamMask = mNewStreamMask;
1868        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1869            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1870                    mOrigBandwidthIndex, mCurBandwidthIndex);
1871            mOrigBandwidthIndex = mCurBandwidthIndex;
1872        }
1873    }
1874
1875    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1876            mSwitchInProgress, mStreamMask);
1877
1878    if (mDisconnectReplyID != NULL) {
1879        finishDisconnect();
1880    }
1881}
1882
1883void LiveSession::swapPacketSource(StreamType stream) {
1884    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1885
1886    // transfer packets from source2 to source
1887    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1888    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1889
1890    // queue discontinuity in mPacketSource
1891    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1892
1893    // queue packets in mPacketSource2 to mPacketSource
1894    status_t finalResult = OK;
1895    sp<ABuffer> accessUnit;
1896    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1897          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1898        aps->queueAccessUnit(accessUnit);
1899    }
1900    aps2->clear();
1901}
1902
1903void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1904    if (!mSwitchInProgress) {
1905        return;
1906    }
1907
1908    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1909    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1910        return;
1911    }
1912
1913    // Swap packet source of streams provided by old variant
1914    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1915        StreamType stream = indexToType(idx);
1916        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1917            swapPacketSource(stream);
1918
1919            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1920                ALOGW("swapping stream type %d %s to empty stream",
1921                        stream, mStreams[idx].mUri.c_str());
1922            }
1923            mStreams[idx].mUri = mStreams[idx].mNewUri;
1924            mStreams[idx].mNewUri.clear();
1925
1926            mSwapMask &= ~stream;
1927        }
1928    }
1929
1930    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1931
1932    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
1933    if (mSwapMask != 0) {
1934        return;
1935    }
1936
1937    // Check if new variant contains extra streams.
1938    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1939    while (extraStreams) {
1940        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
1941        extraStreams &= ~stream;
1942
1943        swapPacketSource(stream);
1944
1945        ssize_t idx = typeToIndex(stream);
1946        CHECK(idx >= 0);
1947        if (mStreams[idx].mNewUri.empty()) {
1948            ALOGW("swapping extra stream type %d %s to empty stream",
1949                    stream, mStreams[idx].mUri.c_str());
1950        }
1951        mStreams[idx].mUri = mStreams[idx].mNewUri;
1952        mStreams[idx].mNewUri.clear();
1953    }
1954
1955    // Restart new fetcher (it was paused after the first 47k block)
1956    // and let it fetch into mPacketSources (not mPacketSources2)
1957    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1958        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1959        if (info.mToBeResumed) {
1960            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
1961            info.mToBeResumed = false;
1962        }
1963    }
1964
1965    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
1966            mOrigBandwidthIndex, mCurBandwidthIndex);
1967
1968    mStreamMask = mNewStreamMask;
1969    mSwitchInProgress = false;
1970    mOrigBandwidthIndex = mCurBandwidthIndex;
1971
1972    restartPollBuffering();
1973}
1974
1975void LiveSession::schedulePollBuffering() {
1976    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1977    msg->setInt32("generation", mPollBufferingGeneration);
1978    msg->post(1000000ll);
1979}
1980
1981void LiveSession::cancelPollBuffering() {
1982    ++mPollBufferingGeneration;
1983    mPrevBufferPercentage = -1;
1984}
1985
1986void LiveSession::restartPollBuffering() {
1987    cancelPollBuffering();
1988    onPollBuffering();
1989}
1990
1991void LiveSession::onPollBuffering() {
1992    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1993            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
1994        mSwitchInProgress, mReconfigurationInProgress,
1995        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
1996
1997    bool underflow, ready, down, up;
1998    if (checkBuffering(underflow, ready, down, up)) {
1999        if (mInPreparationPhase) {
2000            // Allow down switch even if we're still preparing.
2001            //
2002            // Some streams have a high bandwidth index as default,
2003            // when bandwidth is low, it takes a long time to buffer
2004            // to ready mark, then it immediately pauses after start
2005            // as we have to do a down switch. It's better experience
2006            // to restart from a lower index, if we detect low bw.
2007            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2008                postPrepared(OK);
2009            }
2010        }
2011
2012        if (!mInPreparationPhase) {
2013            if (ready) {
2014                stopBufferingIfNecessary();
2015            } else if (underflow) {
2016                startBufferingIfNecessary();
2017            }
2018            switchBandwidthIfNeeded(up, down);
2019        }
2020    }
2021
2022    schedulePollBuffering();
2023}
2024
2025void LiveSession::cancelBandwidthSwitch(bool resume) {
2026    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2027            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2028    if (!mSwitchInProgress) {
2029        return;
2030    }
2031
2032    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2033        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2034        if (info.mToBeRemoved) {
2035            info.mToBeRemoved = false;
2036            if (resume) {
2037                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2038            }
2039        }
2040    }
2041
2042    for (size_t i = 0; i < kMaxStreams; ++i) {
2043        AString newUri = mStreams[i].mNewUri;
2044        if (!newUri.empty()) {
2045            // clear all mNewUri matching this newUri
2046            for (size_t j = i; j < kMaxStreams; ++j) {
2047                if (mStreams[j].mNewUri == newUri) {
2048                    mStreams[j].mNewUri.clear();
2049                }
2050            }
2051            ALOGV("stopping newUri = %s", newUri.c_str());
2052            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2053            if (index < 0) {
2054                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2055                continue;
2056            }
2057            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2058            info.mToBeRemoved = true;
2059            info.mFetcher->stopAsync();
2060        }
2061    }
2062
2063    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2064            mOrigBandwidthIndex, mCurBandwidthIndex);
2065
2066    mSwitchGeneration++;
2067    mSwitchInProgress = false;
2068    mCurBandwidthIndex = mOrigBandwidthIndex;
2069    mSwapMask = 0;
2070}
2071
2072bool LiveSession::checkBuffering(
2073        bool &underflow, bool &ready, bool &down, bool &up) {
2074    underflow = ready = down = up = false;
2075
2076    if (mReconfigurationInProgress) {
2077        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2078        return false;
2079    }
2080
2081    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2082    activeCount = underflowCount = readyCount = downCount = upCount =0;
2083    int32_t minBufferPercent = -1;
2084    int64_t durationUs;
2085    if (getDuration(&durationUs) != OK) {
2086        durationUs = -1;
2087    }
2088    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2089        // we don't check subtitles for buffering level
2090        if (!(mStreamMask & mPacketSources.keyAt(i)
2091                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2092            continue;
2093        }
2094        // ignore streams that never had any packet queued.
2095        // (it's possible that the variant only has audio or video)
2096        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2097        if (meta == NULL) {
2098            continue;
2099        }
2100
2101        int64_t bufferedDurationUs =
2102                mPacketSources[i]->getEstimatedDurationUs();
2103        ALOGV("[%s] buffered %lld us",
2104                getNameForStream(mPacketSources.keyAt(i)),
2105                (long long)bufferedDurationUs);
2106        if (durationUs >= 0) {
2107            int32_t percent;
2108            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2109                percent = 100;
2110            } else {
2111                percent = (int32_t)(100.0 *
2112                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2113            }
2114            if (minBufferPercent < 0 || percent < minBufferPercent) {
2115                minBufferPercent = percent;
2116            }
2117        }
2118
2119        ++activeCount;
2120        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2121        if (bufferedDurationUs > readyMark
2122                || mPacketSources[i]->isFinished(0)) {
2123            ++readyCount;
2124        }
2125        if (!mPacketSources[i]->isFinished(0)) {
2126            if (bufferedDurationUs < kUnderflowMarkUs) {
2127                ++underflowCount;
2128            }
2129            if (bufferedDurationUs > mUpSwitchMark) {
2130                ++upCount;
2131            }
2132            if (bufferedDurationUs < mDownSwitchMark) {
2133                ++downCount;
2134            }
2135        }
2136    }
2137
2138    if (minBufferPercent >= 0) {
2139        notifyBufferingUpdate(minBufferPercent);
2140    }
2141
2142    if (activeCount > 0) {
2143        up        = (upCount == activeCount);
2144        down      = (downCount > 0);
2145        ready     = (readyCount == activeCount);
2146        underflow = (underflowCount > 0);
2147        return true;
2148    }
2149
2150    return false;
2151}
2152
2153void LiveSession::startBufferingIfNecessary() {
2154    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2155            mInPreparationPhase, mBuffering);
2156    if (!mBuffering) {
2157        mBuffering = true;
2158
2159        sp<AMessage> notify = mNotify->dup();
2160        notify->setInt32("what", kWhatBufferingStart);
2161        notify->post();
2162    }
2163}
2164
2165void LiveSession::stopBufferingIfNecessary() {
2166    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2167            mInPreparationPhase, mBuffering);
2168
2169    if (mBuffering) {
2170        mBuffering = false;
2171
2172        sp<AMessage> notify = mNotify->dup();
2173        notify->setInt32("what", kWhatBufferingEnd);
2174        notify->post();
2175    }
2176}
2177
2178void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2179    if (percentage < mPrevBufferPercentage) {
2180        percentage = mPrevBufferPercentage;
2181    } else if (percentage > 100) {
2182        percentage = 100;
2183    }
2184
2185    mPrevBufferPercentage = percentage;
2186
2187    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2188
2189    sp<AMessage> notify = mNotify->dup();
2190    notify->setInt32("what", kWhatBufferingUpdate);
2191    notify->setInt32("percentage", percentage);
2192    notify->post();
2193}
2194
2195/*
2196 * returns true if a bandwidth switch is actually needed (and started),
2197 * returns false otherwise
2198 */
2199bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2200    // no need to check bandwidth if we only have 1 bandwidth settings
2201    if (mSwitchInProgress || mBandwidthItems.size() < 2) {
2202        return false;
2203    }
2204
2205    int32_t bandwidthBps;
2206    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
2207        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
2208        mLastBandwidthBps = bandwidthBps;
2209    } else {
2210        ALOGV("no bandwidth estimate.");
2211        return false;
2212    }
2213
2214    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2215    // canSwithDown and canSwitchUp can't both be true.
2216    // we only want to switch up when measured bw is 120% higher than current variant,
2217    // and we only want to switch down when measured bw is below current variant.
2218    bool canSwithDown = bufferLow
2219            && (bandwidthBps < (int32_t)curBandwidth);
2220    bool canSwitchUp = bufferHigh
2221            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2222
2223    if (canSwithDown || canSwitchUp) {
2224        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2225
2226        // it's possible that we're checking for canSwitchUp case, but the returned
2227        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2228        // of measured bw. In that case we don't want to do anything, since we have
2229        // both enough buffer and enough bw.
2230        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2231         || (canSwithDown && bandwidthIndex < mCurBandwidthIndex)) {
2232            // if not yet prepared, just restart again with new bw index.
2233            // this is faster and playback experience is cleaner.
2234            changeConfiguration(
2235                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2236            return true;
2237        }
2238    }
2239    return false;
2240}
2241
2242void LiveSession::postError(status_t err) {
2243    // if we reached EOS, notify buffering of 100%
2244    if (err == ERROR_END_OF_STREAM) {
2245        notifyBufferingUpdate(100);
2246    }
2247    // we'll stop buffer polling now, before that notify
2248    // stop buffering to stop the spinning icon
2249    stopBufferingIfNecessary();
2250    cancelPollBuffering();
2251
2252    sp<AMessage> notify = mNotify->dup();
2253    notify->setInt32("what", kWhatError);
2254    notify->setInt32("err", err);
2255    notify->post();
2256}
2257
2258void LiveSession::postPrepared(status_t err) {
2259    CHECK(mInPreparationPhase);
2260
2261    sp<AMessage> notify = mNotify->dup();
2262    if (err == OK || err == ERROR_END_OF_STREAM) {
2263        notify->setInt32("what", kWhatPrepared);
2264    } else {
2265        cancelPollBuffering();
2266
2267        notify->setInt32("what", kWhatPreparationFailed);
2268        notify->setInt32("err", err);
2269    }
2270
2271    notify->post();
2272
2273    mInPreparationPhase = false;
2274}
2275
2276
2277}  // namespace android
2278
2279