LiveSession.cpp revision a1151185c7eb3b4c483f7067deba1775fd0a2510
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/foundation/AUtils.h>
37#include <media/stagefright/DataSource.h>
38#include <media/stagefright/FileSource.h>
39#include <media/stagefright/MediaErrors.h>
40#include <media/stagefright/MediaHTTP.h>
41#include <media/stagefright/MetaData.h>
42#include <media/stagefright/Utils.h>
43
44#include <utils/Mutex.h>
45
46#include <ctype.h>
47#include <inttypes.h>
48#include <openssl/aes.h>
49#include <openssl/md5.h>
50
51namespace android {
52
53// static
54// Bandwidth Switch Mark Defaults
55const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
56const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
57const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
58const int64_t LiveSession::kResumeThresholdUs = 100000ll;
59
60// Buffer Prepare/Ready/Underflow Marks
61const int64_t LiveSession::kReadyMarkUs = 5000000ll;
62const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
63const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
64
65struct LiveSession::BandwidthEstimator : public RefBase {
66    BandwidthEstimator();
67
68    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
69    bool estimateBandwidth(int32_t *bandwidth);
70
71private:
72    // Bandwidth estimation parameters
73    static const int32_t kMaxBandwidthHistoryItems = 20;
74    static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec
75
76    struct BandwidthEntry {
77        int64_t mDelayUs;
78        size_t mNumBytes;
79    };
80
81    Mutex mLock;
82    List<BandwidthEntry> mBandwidthHistory;
83    int64_t mTotalTransferTimeUs;
84    size_t mTotalTransferBytes;
85
86    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
87};
88
89LiveSession::BandwidthEstimator::BandwidthEstimator() :
90    mTotalTransferTimeUs(0),
91    mTotalTransferBytes(0) {
92}
93
94void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
95        size_t numBytes, int64_t delayUs) {
96    AutoMutex autoLock(mLock);
97
98    BandwidthEntry entry;
99    entry.mDelayUs = delayUs;
100    entry.mNumBytes = numBytes;
101    mTotalTransferTimeUs += delayUs;
102    mTotalTransferBytes += numBytes;
103    mBandwidthHistory.push_back(entry);
104
105    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
106    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
107    while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) {
108        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
109        if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) {
110            break;
111        }
112        mTotalTransferTimeUs -= it->mDelayUs;
113        mTotalTransferBytes -= it->mNumBytes;
114        mBandwidthHistory.erase(mBandwidthHistory.begin());
115    }
116}
117
118bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
119    AutoMutex autoLock(mLock);
120
121    if (mBandwidthHistory.size() < 2) {
122        return false;
123    }
124
125    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
126    return true;
127}
128
129//static
130const char *LiveSession::getKeyForStream(StreamType type) {
131    switch (type) {
132        case STREAMTYPE_VIDEO:
133            return "timeUsVideo";
134        case STREAMTYPE_AUDIO:
135            return "timeUsAudio";
136        case STREAMTYPE_SUBTITLES:
137            return "timeUsSubtitle";
138        default:
139            TRESPASS();
140    }
141    return NULL;
142}
143
144//static
145const char *LiveSession::getNameForStream(StreamType type) {
146    switch (type) {
147        case STREAMTYPE_VIDEO:
148            return "video";
149        case STREAMTYPE_AUDIO:
150            return "audio";
151        case STREAMTYPE_SUBTITLES:
152            return "subs";
153        default:
154            break;
155    }
156    return "unknown";
157}
158
159LiveSession::LiveSession(
160        const sp<AMessage> &notify, uint32_t flags,
161        const sp<IMediaHTTPService> &httpService)
162    : mNotify(notify),
163      mFlags(flags),
164      mHTTPService(httpService),
165      mBuffering(false),
166      mInPreparationPhase(true),
167      mPollBufferingGeneration(0),
168      mPrevBufferPercentage(-1),
169      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
170      mCurBandwidthIndex(-1),
171      mOrigBandwidthIndex(-1),
172      mLastBandwidthBps(-1ll),
173      mBandwidthEstimator(new BandwidthEstimator()),
174      mStreamMask(0),
175      mNewStreamMask(0),
176      mSwapMask(0),
177      mSwitchGeneration(0),
178      mSubtitleGeneration(0),
179      mLastDequeuedTimeUs(0ll),
180      mRealTimeBaseUs(0ll),
181      mReconfigurationInProgress(false),
182      mSwitchInProgress(false),
183      mUpSwitchMark(kUpSwitchMarkUs),
184      mDownSwitchMark(kDownSwitchMarkUs),
185      mUpSwitchMargin(kUpSwitchMarginUs),
186      mFirstTimeUsValid(false),
187      mFirstTimeUs(0),
188      mLastSeekTimeUs(0) {
189    mStreams[kAudioIndex] = StreamItem("audio");
190    mStreams[kVideoIndex] = StreamItem("video");
191    mStreams[kSubtitleIndex] = StreamItem("subtitles");
192
193    for (size_t i = 0; i < kMaxStreams; ++i) {
194        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
195        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
196    }
197}
198
199LiveSession::~LiveSession() {
200    if (mFetcherLooper != NULL) {
201        mFetcherLooper->stop();
202    }
203}
204
205status_t LiveSession::dequeueAccessUnit(
206        StreamType stream, sp<ABuffer> *accessUnit) {
207    status_t finalResult = OK;
208    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
209
210    ssize_t streamIdx = typeToIndex(stream);
211    if (streamIdx < 0) {
212        return INVALID_VALUE;
213    }
214    const char *streamStr = getNameForStream(stream);
215    // Do not let client pull data if we don't have data packets yet.
216    // We might only have a format discontinuity queued without data.
217    // When NuPlayerDecoder dequeues the format discontinuity, it will
218    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
219    // thinks it can do seamless change, so will not shutdown decoder.
220    // When the actual format arrives, it can't handle it and get stuck.
221    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
222        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
223                streamStr, finalResult);
224
225        if (finalResult == OK) {
226            return -EAGAIN;
227        } else {
228            return finalResult;
229        }
230    }
231
232    // Let the client dequeue as long as we have buffers available
233    // Do not make pause/resume decisions here.
234
235    status_t err = packetSource->dequeueAccessUnit(accessUnit);
236
237    StreamItem& strm = mStreams[streamIdx];
238    if (err == INFO_DISCONTINUITY) {
239        // adaptive streaming, discontinuities in the playlist
240        int32_t type;
241        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
242
243        sp<AMessage> extra;
244        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
245            extra.clear();
246        }
247
248        ALOGI("[%s] read discontinuity of type %d, extra = %s",
249              streamStr,
250              type,
251              extra == NULL ? "NULL" : extra->debugString().c_str());
252    } else if (err == OK) {
253
254        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
255            int64_t timeUs, originalTimeUs;
256            int32_t discontinuitySeq = 0;
257            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
258            originalTimeUs = timeUs;
259            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
260            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
261                int64_t offsetTimeUs;
262                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
263                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
264                } else {
265                    offsetTimeUs = 0;
266                }
267
268                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
269                    int64_t firstTimeUs;
270                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
271                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
272                    offsetTimeUs += strm.mLastSampleDurationUs;
273                } else {
274                    offsetTimeUs += strm.mLastSampleDurationUs;
275                }
276
277                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
278                strm.mCurDiscontinuitySeq = discontinuitySeq;
279            }
280
281            int32_t discard = 0;
282            int64_t firstTimeUs;
283            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
284                int64_t durUs; // approximate sample duration
285                if (timeUs > strm.mLastDequeuedTimeUs) {
286                    durUs = timeUs - strm.mLastDequeuedTimeUs;
287                } else {
288                    durUs = strm.mLastDequeuedTimeUs - timeUs;
289                }
290                strm.mLastSampleDurationUs = durUs;
291                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
292            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
293                firstTimeUs = timeUs;
294            } else {
295                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
296                firstTimeUs = timeUs;
297            }
298
299            strm.mLastDequeuedTimeUs = timeUs;
300            if (timeUs >= firstTimeUs) {
301                timeUs -= firstTimeUs;
302            } else {
303                timeUs = 0;
304            }
305            timeUs += mLastSeekTimeUs;
306            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
307                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
308            }
309
310            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
311                    streamStr, (long long)timeUs, (long long)originalTimeUs);
312            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
313            mLastDequeuedTimeUs = timeUs;
314            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
315        } else if (stream == STREAMTYPE_SUBTITLES) {
316            int32_t subtitleGeneration;
317            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
318                    && subtitleGeneration != mSubtitleGeneration) {
319               return -EAGAIN;
320            };
321            (*accessUnit)->meta()->setInt32(
322                    "trackIndex", mPlaylist->getSelectedIndex());
323            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
324        }
325    } else {
326        ALOGI("[%s] encountered error %d", streamStr, err);
327    }
328
329    return err;
330}
331
332status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
333    if (!(mStreamMask & stream)) {
334        return UNKNOWN_ERROR;
335    }
336
337    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
338
339    sp<MetaData> meta = packetSource->getFormat();
340
341    if (meta == NULL) {
342        return -EAGAIN;
343    }
344
345    if (stream == STREAMTYPE_AUDIO) {
346        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
347        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
348    }
349
350    return convertMetaDataToMessage(meta, format);
351}
352
353sp<HTTPBase> LiveSession::getHTTPDataSource() {
354    return new MediaHTTP(mHTTPService->makeHTTPConnection());
355}
356
357void LiveSession::connectAsync(
358        const char *url, const KeyedVector<String8, String8> *headers) {
359    sp<AMessage> msg = new AMessage(kWhatConnect, this);
360    msg->setString("url", url);
361
362    if (headers != NULL) {
363        msg->setPointer(
364                "headers",
365                new KeyedVector<String8, String8>(*headers));
366    }
367
368    msg->post();
369}
370
371status_t LiveSession::disconnect() {
372    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
373
374    sp<AMessage> response;
375    status_t err = msg->postAndAwaitResponse(&response);
376
377    return err;
378}
379
380status_t LiveSession::seekTo(int64_t timeUs) {
381    sp<AMessage> msg = new AMessage(kWhatSeek, this);
382    msg->setInt64("timeUs", timeUs);
383
384    sp<AMessage> response;
385    status_t err = msg->postAndAwaitResponse(&response);
386
387    return err;
388}
389
390bool LiveSession::checkSwitchProgress(
391        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
392    AString newUri;
393    CHECK(stopParams->findString("uri", &newUri));
394
395    *needResumeUntil = false;
396    sp<AMessage> firstNewMeta[kMaxStreams];
397    for (size_t i = 0; i < kMaxStreams; ++i) {
398        StreamType stream = indexToType(i);
399        if (!(mSwapMask & mNewStreamMask & stream)
400            || (mStreams[i].mNewUri != newUri)) {
401            continue;
402        }
403        if (stream == STREAMTYPE_SUBTITLES) {
404            continue;
405        }
406        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
407
408        // First, get latest dequeued meta, which is where the decoder is at.
409        // (when upswitching, we take the meta after a certain delay, so that
410        // the decoder is left with some cushion)
411        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
412        if (delayUs > 0) {
413            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
414            if (lastDequeueMeta == NULL) {
415                // this means we don't have enough cushion, try again later
416                ALOGV("[%s] up switching failed due to insufficient buffer",
417                        getNameForStream(stream));
418                return false;
419            }
420        } else {
421            // It's okay for lastDequeueMeta to be NULL here, it means the
422            // decoder hasn't even started dequeueing
423            lastDequeueMeta = source->getLatestDequeuedMeta();
424        }
425        // Then, trim off packets at beginning of mPacketSources2 that's before
426        // the latest dequeued time. These samples are definitely too late.
427        firstNewMeta[i] = mPacketSources2.editValueAt(i)
428                            ->trimBuffersBeforeMeta(lastDequeueMeta);
429
430        // Now firstNewMeta[i] is the first sample after the trim.
431        // If it's NULL, we failed because dequeue already past all samples
432        // in mPacketSource2, we have to try again.
433        if (firstNewMeta[i] == NULL) {
434            HLSTime dequeueTime(lastDequeueMeta);
435            ALOGV("[%s] dequeue time (%d, %lld) past start time",
436                    getNameForStream(stream),
437                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
438            return false;
439        }
440
441        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
442        // already fetched, and see if we need to resumeUntil
443        lastEnqueueMeta = source->getLatestEnqueuedMeta();
444        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
445        // boundary, no need to resume as the content will look different anyways
446        if (lastEnqueueMeta != NULL) {
447            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
448
449            // no need to resume old fetcher if new fetcher started in different
450            // discontinuity sequence, as the content will look different.
451            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
452                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
453
454            // update the stopTime for resumeUntil
455            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
456            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
457        }
458    }
459
460    // if we're here, it means dequeue progress hasn't passed some samples in
461    // mPacketSource2, we can trim off the excess in mPacketSource.
462    // (old fetcher might still need to resumeUntil the start time of new fetcher)
463    for (size_t i = 0; i < kMaxStreams; ++i) {
464        StreamType stream = indexToType(i);
465        if (!(mSwapMask & mNewStreamMask & stream)
466            || (newUri != mStreams[i].mNewUri)
467            || stream == STREAMTYPE_SUBTITLES) {
468            continue;
469        }
470        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
471    }
472
473    // no resumeUntil if already underflow
474    *needResumeUntil &= !mBuffering;
475
476    return true;
477}
478
479void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
480    switch (msg->what()) {
481        case kWhatConnect:
482        {
483            onConnect(msg);
484            break;
485        }
486
487        case kWhatDisconnect:
488        {
489            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
490
491            if (mReconfigurationInProgress) {
492                break;
493            }
494
495            finishDisconnect();
496            break;
497        }
498
499        case kWhatSeek:
500        {
501            if (mReconfigurationInProgress) {
502                msg->post(50000);
503                break;
504            }
505
506            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
507            mSeekReply = new AMessage;
508
509            onSeek(msg);
510            break;
511        }
512
513        case kWhatFetcherNotify:
514        {
515            int32_t what;
516            CHECK(msg->findInt32("what", &what));
517
518            switch (what) {
519                case PlaylistFetcher::kWhatStarted:
520                    break;
521                case PlaylistFetcher::kWhatPaused:
522                case PlaylistFetcher::kWhatStopped:
523                {
524                    AString uri;
525                    CHECK(msg->findString("uri", &uri));
526                    ssize_t index = mFetcherInfos.indexOfKey(uri);
527                    if (index < 0) {
528                        // ignore msgs from fetchers that's already gone
529                        break;
530                    }
531
532                    ALOGV("fetcher-%d %s",
533                            mFetcherInfos[index].mFetcher->getFetcherID(),
534                            what == PlaylistFetcher::kWhatPaused ?
535                                    "paused" : "stopped");
536
537                    if (what == PlaylistFetcher::kWhatStopped) {
538                        mFetcherLooper->unregisterHandler(
539                                mFetcherInfos[index].mFetcher->id());
540                        mFetcherInfos.removeItemsAt(index);
541                    } else if (what == PlaylistFetcher::kWhatPaused) {
542                        int32_t seekMode;
543                        CHECK(msg->findInt32("seekMode", &seekMode));
544                        for (size_t i = 0; i < kMaxStreams; ++i) {
545                            if (mStreams[i].mUri == uri) {
546                                mStreams[i].mSeekMode = (SeekMode) seekMode;
547                            }
548                        }
549                    }
550
551                    if (mContinuation != NULL) {
552                        CHECK_GT(mContinuationCounter, 0);
553                        if (--mContinuationCounter == 0) {
554                            mContinuation->post();
555                        }
556                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
557                    }
558                    break;
559                }
560
561                case PlaylistFetcher::kWhatDurationUpdate:
562                {
563                    AString uri;
564                    CHECK(msg->findString("uri", &uri));
565
566                    int64_t durationUs;
567                    CHECK(msg->findInt64("durationUs", &durationUs));
568
569                    ssize_t index = mFetcherInfos.indexOfKey(uri);
570                    if (index >= 0) {
571                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
572                        info->mDurationUs = durationUs;
573                    }
574                    break;
575                }
576
577                case PlaylistFetcher::kWhatTargetDurationUpdate:
578                {
579                    int64_t targetDurationUs;
580                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
581                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
582                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
583                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
584                    break;
585                }
586
587                case PlaylistFetcher::kWhatError:
588                {
589                    status_t err;
590                    CHECK(msg->findInt32("err", &err));
591
592                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
593
594                    // handle EOS on subtitle tracks independently
595                    AString uri;
596                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
597                        ssize_t i = mFetcherInfos.indexOfKey(uri);
598                        if (i >= 0) {
599                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
600                            if (fetcher != NULL) {
601                                uint32_t type = fetcher->getStreamTypeMask();
602                                if (type == STREAMTYPE_SUBTITLES) {
603                                    mPacketSources.valueFor(
604                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
605                                    break;
606                                }
607                            }
608                        }
609                    }
610
611                    if (mInPreparationPhase) {
612                        postPrepared(err);
613                    }
614
615                    cancelBandwidthSwitch();
616
617                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
618
619                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
620
621                    mPacketSources.valueFor(
622                            STREAMTYPE_SUBTITLES)->signalEOS(err);
623
624                    postError(err);
625                    break;
626                }
627
628                case PlaylistFetcher::kWhatStopReached:
629                {
630                    ALOGV("kWhatStopReached");
631
632                    AString oldUri;
633                    CHECK(msg->findString("uri", &oldUri));
634
635                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
636                    if (index < 0) {
637                        break;
638                    }
639
640                    tryToFinishBandwidthSwitch(oldUri);
641                    break;
642                }
643
644                case PlaylistFetcher::kWhatStartedAt:
645                {
646                    int32_t switchGeneration;
647                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
648
649                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
650                            switchGeneration, mSwitchGeneration);
651
652                    if (switchGeneration != mSwitchGeneration) {
653                        break;
654                    }
655
656                    AString uri;
657                    CHECK(msg->findString("uri", &uri));
658
659                    // mark new fetcher mToBeResumed
660                    ssize_t index = mFetcherInfos.indexOfKey(uri);
661                    if (index >= 0) {
662                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
663                    }
664
665                    // temporarily disable packet sources to be swapped to prevent
666                    // NuPlayerDecoder from dequeuing while we check progress
667                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
668                        if ((mSwapMask & mPacketSources.keyAt(i))
669                                && uri == mStreams[i].mNewUri) {
670                            mPacketSources.editValueAt(i)->enable(false);
671                        }
672                    }
673                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
674                    // If switching up, require a cushion bigger than kUnderflowMark
675                    // to avoid buffering immediately after the switch.
676                    // (If we don't have that cushion we'd rather cancel and try again.)
677                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
678                    bool needResumeUntil = false;
679                    sp<AMessage> stopParams = msg;
680                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
681                        // playback time hasn't passed startAt time
682                        if (!needResumeUntil) {
683                            ALOGV("finish switch");
684                            for (size_t i = 0; i < kMaxStreams; ++i) {
685                                if ((mSwapMask & indexToType(i))
686                                        && uri == mStreams[i].mNewUri) {
687                                    // have to make a copy of mStreams[i].mUri because
688                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
689                                    AString oldURI = mStreams[i].mUri;
690                                    tryToFinishBandwidthSwitch(oldURI);
691                                    break;
692                                }
693                            }
694                        } else {
695                            // startAt time is after last enqueue time
696                            // Resume fetcher for the original variant; the resumed fetcher should
697                            // continue until the timestamps found in msg, which is stored by the
698                            // new fetcher to indicate where the new variant has started buffering.
699                            ALOGV("finish switch with resumeUntilAsync");
700                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
701                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
702                                if (info.mToBeRemoved) {
703                                    info.mFetcher->resumeUntilAsync(stopParams);
704                                }
705                            }
706                        }
707                    } else {
708                        // playback time passed startAt time
709                        if (switchUp) {
710                            // if switching up, cancel and retry if condition satisfies again
711                            ALOGV("cancel up switch because we're too late");
712                            cancelBandwidthSwitch(true /* resume */);
713                        } else {
714                            ALOGV("retry down switch at next sample");
715                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
716                        }
717                    }
718                    // re-enable all packet sources
719                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
720                        mPacketSources.editValueAt(i)->enable(true);
721                    }
722
723                    break;
724                }
725
726                default:
727                    TRESPASS();
728            }
729
730            break;
731        }
732
733        case kWhatChangeConfiguration:
734        {
735            onChangeConfiguration(msg);
736            break;
737        }
738
739        case kWhatChangeConfiguration2:
740        {
741            onChangeConfiguration2(msg);
742            break;
743        }
744
745        case kWhatChangeConfiguration3:
746        {
747            onChangeConfiguration3(msg);
748            break;
749        }
750
751        case kWhatFinishDisconnect2:
752        {
753            onFinishDisconnect2();
754            break;
755        }
756
757        case kWhatPollBuffering:
758        {
759            int32_t generation;
760            CHECK(msg->findInt32("generation", &generation));
761            if (generation == mPollBufferingGeneration) {
762                onPollBuffering();
763            }
764            break;
765        }
766
767        default:
768            TRESPASS();
769            break;
770    }
771}
772
773// static
774int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
775    if (a->mBandwidth < b->mBandwidth) {
776        return -1;
777    } else if (a->mBandwidth == b->mBandwidth) {
778        return 0;
779    }
780
781    return 1;
782}
783
784// static
785LiveSession::StreamType LiveSession::indexToType(int idx) {
786    CHECK(idx >= 0 && idx < kMaxStreams);
787    return (StreamType)(1 << idx);
788}
789
790// static
791ssize_t LiveSession::typeToIndex(int32_t type) {
792    switch (type) {
793        case STREAMTYPE_AUDIO:
794            return 0;
795        case STREAMTYPE_VIDEO:
796            return 1;
797        case STREAMTYPE_SUBTITLES:
798            return 2;
799        default:
800            return -1;
801    };
802    return -1;
803}
804
805void LiveSession::onConnect(const sp<AMessage> &msg) {
806    AString url;
807    CHECK(msg->findString("url", &url));
808
809    KeyedVector<String8, String8> *headers = NULL;
810    if (!msg->findPointer("headers", (void **)&headers)) {
811        mExtraHeaders.clear();
812    } else {
813        mExtraHeaders = *headers;
814
815        delete headers;
816        headers = NULL;
817    }
818
819    // TODO currently we don't know if we are coming here from incognito mode
820    ALOGI("onConnect %s", uriDebugString(url).c_str());
821
822    mMasterURL = url;
823
824    bool dummy;
825    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
826
827    if (mPlaylist == NULL) {
828        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
829
830        postPrepared(ERROR_IO);
831        return;
832    }
833
834    // create looper for fetchers
835    if (mFetcherLooper == NULL) {
836        mFetcherLooper = new ALooper();
837
838        mFetcherLooper->setName("Fetcher");
839        mFetcherLooper->start(false, false);
840    }
841
842    // We trust the content provider to make a reasonable choice of preferred
843    // initial bandwidth by listing it first in the variant playlist.
844    // At startup we really don't have a good estimate on the available
845    // network bandwidth since we haven't tranferred any data yet. Once
846    // we have we can make a better informed choice.
847    size_t initialBandwidth = 0;
848    size_t initialBandwidthIndex = 0;
849
850    if (mPlaylist->isVariantPlaylist()) {
851        Vector<BandwidthItem> itemsWithVideo;
852        for (size_t i = 0; i < mPlaylist->size(); ++i) {
853            BandwidthItem item;
854
855            item.mPlaylistIndex = i;
856
857            sp<AMessage> meta;
858            AString uri;
859            mPlaylist->itemAt(i, &uri, &meta);
860
861            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
862
863            mBandwidthItems.push(item);
864            if (mPlaylist->hasType(i, "video")) {
865                itemsWithVideo.push(item);
866            }
867        }
868        // remove the audio-only variants if we have at least one with video
869        if (!itemsWithVideo.empty()
870                && itemsWithVideo.size() < mBandwidthItems.size()) {
871            mBandwidthItems.clear();
872            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
873                mBandwidthItems.push(itemsWithVideo[i]);
874            }
875        }
876
877        CHECK_GT(mBandwidthItems.size(), 0u);
878        initialBandwidth = mBandwidthItems[0].mBandwidth;
879
880        mBandwidthItems.sort(SortByBandwidth);
881
882        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
883            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
884                initialBandwidthIndex = i;
885                break;
886            }
887        }
888    } else {
889        // dummy item.
890        BandwidthItem item;
891        item.mPlaylistIndex = 0;
892        item.mBandwidth = 0;
893        mBandwidthItems.push(item);
894    }
895
896    mPlaylist->pickRandomMediaItems();
897    changeConfiguration(
898            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
899}
900
901void LiveSession::finishDisconnect() {
902    ALOGV("finishDisconnect");
903
904    // No reconfiguration is currently pending, make sure none will trigger
905    // during disconnection either.
906    cancelBandwidthSwitch();
907
908    // cancel buffer polling
909    cancelPollBuffering();
910
911    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
912        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
913    }
914
915    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
916
917    mContinuationCounter = mFetcherInfos.size();
918    mContinuation = msg;
919
920    if (mContinuationCounter == 0) {
921        msg->post();
922    }
923}
924
925void LiveSession::onFinishDisconnect2() {
926    mContinuation.clear();
927
928    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
929    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
930
931    mPacketSources.valueFor(
932            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
933
934    sp<AMessage> response = new AMessage;
935    response->setInt32("err", OK);
936
937    response->postReply(mDisconnectReplyID);
938    mDisconnectReplyID.clear();
939}
940
941sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
942    ssize_t index = mFetcherInfos.indexOfKey(uri);
943
944    if (index >= 0) {
945        return NULL;
946    }
947
948    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
949    notify->setString("uri", uri);
950    notify->setInt32("switchGeneration", mSwitchGeneration);
951
952    FetcherInfo info;
953    info.mFetcher = new PlaylistFetcher(
954            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
955    info.mDurationUs = -1ll;
956    info.mToBeRemoved = false;
957    info.mToBeResumed = false;
958    mFetcherLooper->registerHandler(info.mFetcher);
959
960    mFetcherInfos.add(uri, info);
961
962    return info.mFetcher;
963}
964
965/*
966 * Illustration of parameters:
967 *
968 * 0      `range_offset`
969 * +------------+-------------------------------------------------------+--+--+
970 * |            |                                 | next block to fetch |  |  |
971 * |            | `source` handle => `out` buffer |                     |  |  |
972 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
973 * |            |<----------- `range_length` / buffer capacity ----------->|  |
974 * |<------------------------------ file_size ------------------------------->|
975 *
976 * Special parameter values:
977 * - range_length == -1 means entire file
978 * - block_size == 0 means entire range
979 *
980 */
981ssize_t LiveSession::fetchFile(
982        const char *url, sp<ABuffer> *out,
983        int64_t range_offset, int64_t range_length,
984        uint32_t block_size, /* download block size */
985        sp<DataSource> *source, /* to return and reuse source */
986        String8 *actualUrl,
987        bool forceConnectHTTP /* force connect HTTP when resuing source */) {
988    off64_t size;
989    sp<DataSource> temp_source;
990    if (source == NULL) {
991        source = &temp_source;
992    }
993
994    if (*source == NULL || forceConnectHTTP) {
995        if (!strncasecmp(url, "file://", 7)) {
996            *source = new FileSource(url + 7);
997        } else if (strncasecmp(url, "http://", 7)
998                && strncasecmp(url, "https://", 8)) {
999            return ERROR_UNSUPPORTED;
1000        } else {
1001            KeyedVector<String8, String8> headers = mExtraHeaders;
1002            if (range_offset > 0 || range_length >= 0) {
1003                headers.add(
1004                        String8("Range"),
1005                        String8(
1006                            AStringPrintf(
1007                                "bytes=%lld-%s",
1008                                range_offset,
1009                                range_length < 0
1010                                    ? "" : AStringPrintf("%lld",
1011                                            range_offset + range_length - 1).c_str()).c_str()));
1012            }
1013
1014            HTTPBase* httpDataSource =
1015                    (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
1016            status_t err = httpDataSource->connect(url, &headers);
1017
1018            if (err != OK) {
1019                return err;
1020            }
1021
1022            if (*source == NULL) {
1023                *source = mHTTPDataSource;
1024            }
1025        }
1026    }
1027
1028    status_t getSizeErr = (*source)->getSize(&size);
1029    if (getSizeErr != OK) {
1030        size = 65536;
1031    }
1032
1033    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
1034    if (*out == NULL) {
1035        buffer->setRange(0, 0);
1036    }
1037
1038    ssize_t bytesRead = 0;
1039    // adjust range_length if only reading partial block
1040    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
1041        range_length = buffer->size() + block_size;
1042    }
1043    for (;;) {
1044        // Only resize when we don't know the size.
1045        size_t bufferRemaining = buffer->capacity() - buffer->size();
1046        if (bufferRemaining == 0 && getSizeErr != OK) {
1047            size_t bufferIncrement = buffer->size() / 2;
1048            if (bufferIncrement < 32768) {
1049                bufferIncrement = 32768;
1050            }
1051            bufferRemaining = bufferIncrement;
1052
1053            ALOGV("increasing download buffer to %zu bytes",
1054                 buffer->size() + bufferRemaining);
1055
1056            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
1057            memcpy(copy->data(), buffer->data(), buffer->size());
1058            copy->setRange(0, buffer->size());
1059
1060            buffer = copy;
1061        }
1062
1063        size_t maxBytesToRead = bufferRemaining;
1064        if (range_length >= 0) {
1065            int64_t bytesLeftInRange = range_length - buffer->size();
1066            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
1067                maxBytesToRead = bytesLeftInRange;
1068
1069                if (bytesLeftInRange == 0) {
1070                    break;
1071                }
1072            }
1073        }
1074
1075        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
1076        // to help us break out of the loop.
1077        ssize_t n = (*source)->readAt(
1078                buffer->size(), buffer->data() + buffer->size(),
1079                maxBytesToRead);
1080
1081        if (n < 0) {
1082            return n;
1083        }
1084
1085        if (n == 0) {
1086            break;
1087        }
1088
1089        buffer->setRange(0, buffer->size() + (size_t)n);
1090        bytesRead += n;
1091    }
1092
1093    *out = buffer;
1094    if (actualUrl != NULL) {
1095        *actualUrl = (*source)->getUri();
1096        if (actualUrl->isEmpty()) {
1097            *actualUrl = url;
1098        }
1099    }
1100
1101    return bytesRead;
1102}
1103
1104sp<M3UParser> LiveSession::fetchPlaylist(
1105        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
1106    ALOGV("fetchPlaylist '%s'", url);
1107
1108    *unchanged = false;
1109
1110    sp<ABuffer> buffer;
1111    String8 actualUrl;
1112    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
1113
1114    // close off the connection after use
1115    mHTTPDataSource->disconnect();
1116
1117    if (err <= 0) {
1118        return NULL;
1119    }
1120
1121    // MD5 functionality is not available on the simulator, treat all
1122    // playlists as changed.
1123
1124#if defined(HAVE_ANDROID_OS)
1125    uint8_t hash[16];
1126
1127    MD5_CTX m;
1128    MD5_Init(&m);
1129    MD5_Update(&m, buffer->data(), buffer->size());
1130
1131    MD5_Final(hash, &m);
1132
1133    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
1134        // playlist unchanged
1135        *unchanged = true;
1136
1137        return NULL;
1138    }
1139
1140    if (curPlaylistHash != NULL) {
1141        memcpy(curPlaylistHash, hash, sizeof(hash));
1142    }
1143#endif
1144
1145    sp<M3UParser> playlist =
1146        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
1147
1148    if (playlist->initCheck() != OK) {
1149        ALOGE("failed to parse .m3u8 playlist");
1150
1151        return NULL;
1152    }
1153
1154    return playlist;
1155}
1156
1157#if 0
1158static double uniformRand() {
1159    return (double)rand() / RAND_MAX;
1160}
1161#endif
1162
1163bool LiveSession::resumeFetcher(
1164        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1165    ssize_t index = mFetcherInfos.indexOfKey(uri);
1166    if (index < 0) {
1167        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1168        return false;
1169    }
1170
1171    bool resume = false;
1172    sp<AnotherPacketSource> sources[kMaxStreams];
1173    for (size_t i = 0; i < kMaxStreams; ++i) {
1174        if ((streamMask & indexToType(i))
1175            && ((!newUri && uri == mStreams[i].mUri)
1176            || (newUri && uri == mStreams[i].mNewUri))) {
1177            resume = true;
1178            if (newUri) {
1179                sources[i] = mPacketSources2.valueFor(indexToType(i));
1180                sources[i]->clear();
1181            } else {
1182                sources[i] = mPacketSources.valueFor(indexToType(i));
1183            }
1184        }
1185    }
1186
1187    if (resume) {
1188        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1189        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1190
1191        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1192                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1193
1194        fetcher->startAsync(
1195                sources[kAudioIndex],
1196                sources[kVideoIndex],
1197                sources[kSubtitleIndex],
1198                timeUs, -1, -1, seekMode);
1199    }
1200
1201    return resume;
1202}
1203
1204float LiveSession::getAbortThreshold(
1205        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1206    float abortThreshold = -1.0f;
1207    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1208        /*
1209           If we're switching down, we need to decide whether to
1210
1211           1) finish last segment of high-bandwidth variant, or
1212           2) abort last segment of high-bandwidth variant, and fetch an
1213              overlapping portion from low-bandwidth variant.
1214
1215           Here we try to maximize the amount of buffer left when the
1216           switch point is met. Given the following parameters:
1217
1218           B: our current buffering level in seconds
1219           T: target duration in seconds
1220           X: sample duration in seconds remain to fetch in last segment
1221           bw0: bandwidth of old variant (as specified in playlist)
1222           bw1: bandwidth of new variant (as specified in playlist)
1223           bw: measured bandwidth available
1224
1225           If we choose 1), when switch happens at the end of current
1226           segment, our buffering will be
1227                  B + X - X * bw0 / bw
1228
1229           If we choose 2), when switch happens where we aborted current
1230           segment, our buffering will be
1231                  B - (T - X) * bw1 / bw
1232
1233           We should only choose 1) if
1234                  X/T < bw1 / (bw1 + bw0 - bw)
1235        */
1236
1237        // Taking the measured current bandwidth at 50% face value only,
1238        // as our bandwidth estimation is a lagging indicator. Being
1239        // conservative on this, we prefer switching to lower bandwidth
1240        // unless we're really confident finishing up the last segment
1241        // of higher bandwidth will be fast.
1242        CHECK(mLastBandwidthBps >= 0);
1243        abortThreshold =
1244                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1245             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1246              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1247              - (float)mLastBandwidthBps * 0.5f);
1248        if (abortThreshold < 0.0f) {
1249            abortThreshold = -1.0f; // do not abort
1250        }
1251        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1252                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1253                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1254                mLastBandwidthBps,
1255                abortThreshold);
1256    }
1257    return abortThreshold;
1258}
1259
1260void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1261    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1262}
1263
1264size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1265    if (mBandwidthItems.size() < 2) {
1266        // shouldn't be here if we only have 1 bandwidth, check
1267        // logic to get rid of redundant bandwidth polling
1268        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1269        return 0;
1270    }
1271
1272#if 1
1273    char value[PROPERTY_VALUE_MAX];
1274    ssize_t index = -1;
1275    if (property_get("media.httplive.bw-index", value, NULL)) {
1276        char *end;
1277        index = strtol(value, &end, 10);
1278        CHECK(end > value && *end == '\0');
1279
1280        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1281            index = mBandwidthItems.size() - 1;
1282        }
1283    }
1284
1285    if (index < 0) {
1286        char value[PROPERTY_VALUE_MAX];
1287        if (property_get("media.httplive.max-bw", value, NULL)) {
1288            char *end;
1289            long maxBw = strtoul(value, &end, 10);
1290            if (end > value && *end == '\0') {
1291                if (maxBw > 0 && bandwidthBps > maxBw) {
1292                    ALOGV("bandwidth capped to %ld bps", maxBw);
1293                    bandwidthBps = maxBw;
1294                }
1295            }
1296        }
1297
1298        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1299
1300        index = mBandwidthItems.size() - 1;
1301        while (index > 0) {
1302            // be conservative (70%) to avoid overestimating and immediately
1303            // switching down again.
1304            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1305            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1306                break;
1307            }
1308            --index;
1309        }
1310    }
1311#elif 0
1312    // Change bandwidth at random()
1313    size_t index = uniformRand() * mBandwidthItems.size();
1314#elif 0
1315    // There's a 50% chance to stay on the current bandwidth and
1316    // a 50% chance to switch to the next higher bandwidth (wrapping around
1317    // to lowest)
1318    const size_t kMinIndex = 0;
1319
1320    static ssize_t mCurBandwidthIndex = -1;
1321
1322    size_t index;
1323    if (mCurBandwidthIndex < 0) {
1324        index = kMinIndex;
1325    } else if (uniformRand() < 0.5) {
1326        index = (size_t)mCurBandwidthIndex;
1327    } else {
1328        index = mCurBandwidthIndex + 1;
1329        if (index == mBandwidthItems.size()) {
1330            index = kMinIndex;
1331        }
1332    }
1333    mCurBandwidthIndex = index;
1334#elif 0
1335    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1336
1337    size_t index = mBandwidthItems.size() - 1;
1338    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1339        --index;
1340    }
1341#elif 1
1342    char value[PROPERTY_VALUE_MAX];
1343    size_t index;
1344    if (property_get("media.httplive.bw-index", value, NULL)) {
1345        char *end;
1346        index = strtoul(value, &end, 10);
1347        CHECK(end > value && *end == '\0');
1348
1349        if (index >= mBandwidthItems.size()) {
1350            index = mBandwidthItems.size() - 1;
1351        }
1352    } else {
1353        index = 0;
1354    }
1355#else
1356    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1357#endif
1358
1359    CHECK_GE(index, 0);
1360
1361    return index;
1362}
1363
1364HLSTime LiveSession::latestMediaSegmentStartTime() const {
1365    HLSTime audioTime(mPacketSources.valueFor(
1366                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1367
1368    HLSTime videoTime(mPacketSources.valueFor(
1369                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1370
1371    return audioTime < videoTime ? videoTime : audioTime;
1372}
1373
1374void LiveSession::onSeek(const sp<AMessage> &msg) {
1375    int64_t timeUs;
1376    CHECK(msg->findInt64("timeUs", &timeUs));
1377    changeConfiguration(timeUs);
1378}
1379
1380status_t LiveSession::getDuration(int64_t *durationUs) const {
1381    int64_t maxDurationUs = -1ll;
1382    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1383        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1384
1385        if (fetcherDurationUs > maxDurationUs) {
1386            maxDurationUs = fetcherDurationUs;
1387        }
1388    }
1389
1390    *durationUs = maxDurationUs;
1391
1392    return OK;
1393}
1394
1395bool LiveSession::isSeekable() const {
1396    int64_t durationUs;
1397    return getDuration(&durationUs) == OK && durationUs >= 0;
1398}
1399
1400bool LiveSession::hasDynamicDuration() const {
1401    return false;
1402}
1403
1404size_t LiveSession::getTrackCount() const {
1405    if (mPlaylist == NULL) {
1406        return 0;
1407    } else {
1408        return mPlaylist->getTrackCount();
1409    }
1410}
1411
1412sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1413    if (mPlaylist == NULL) {
1414        return NULL;
1415    } else {
1416        return mPlaylist->getTrackInfo(trackIndex);
1417    }
1418}
1419
1420status_t LiveSession::selectTrack(size_t index, bool select) {
1421    if (mPlaylist == NULL) {
1422        return INVALID_OPERATION;
1423    }
1424
1425    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1426            index, select, mSubtitleGeneration);
1427
1428    ++mSubtitleGeneration;
1429    status_t err = mPlaylist->selectTrack(index, select);
1430    if (err == OK) {
1431        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1432        msg->setInt32("pickTrack", select);
1433        msg->post();
1434    }
1435    return err;
1436}
1437
1438ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1439    if (mPlaylist == NULL) {
1440        return -1;
1441    } else {
1442        return mPlaylist->getSelectedTrack(type);
1443    }
1444}
1445
1446void LiveSession::changeConfiguration(
1447        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1448    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1449          (long long)timeUs, bandwidthIndex, pickTrack);
1450
1451    cancelBandwidthSwitch();
1452
1453    CHECK(!mReconfigurationInProgress);
1454    mReconfigurationInProgress = true;
1455    if (bandwidthIndex >= 0) {
1456        mOrigBandwidthIndex = mCurBandwidthIndex;
1457        mCurBandwidthIndex = bandwidthIndex;
1458    }
1459    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1460    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1461
1462    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1463    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1464
1465    AString URIs[kMaxStreams];
1466    for (size_t i = 0; i < kMaxStreams; ++i) {
1467        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1468            streamMask |= indexToType(i);
1469        }
1470    }
1471
1472    // Step 1, stop and discard fetchers that are no longer needed.
1473    // Pause those that we'll reuse.
1474    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1475        // skip fetchers that are marked mToBeRemoved,
1476        // these are done and can't be reused
1477        if (mFetcherInfos[i].mToBeRemoved) {
1478            continue;
1479        }
1480
1481        const AString &uri = mFetcherInfos.keyAt(i);
1482        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1483
1484        bool discardFetcher = true, delayRemoval = false;
1485        for (size_t j = 0; j < kMaxStreams; ++j) {
1486            StreamType type = indexToType(j);
1487            if ((streamMask & type) && uri == URIs[j]) {
1488                resumeMask |= type;
1489                streamMask &= ~type;
1490                discardFetcher = false;
1491            }
1492        }
1493        // Delay fetcher removal if not picking tracks, AND old fetcher
1494        // has stream mask that overlaps new variant. (Okay to discard
1495        // old fetcher now, if completely no overlap.)
1496        if (discardFetcher && timeUs < 0ll && !pickTrack
1497                && (fetcher->getStreamTypeMask() & streamMask)) {
1498            discardFetcher = false;
1499            delayRemoval = true;
1500        }
1501
1502        if (discardFetcher) {
1503            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1504            fetcher->stopAsync();
1505        } else {
1506            float threshold = -1.0f; // always finish fetching by default
1507            if (timeUs >= 0ll) {
1508                // seeking, no need to finish fetching
1509                threshold = 0.0f;
1510            } else if (delayRemoval) {
1511                // adapting, abort if remaining of current segment is over threshold
1512                threshold = getAbortThreshold(
1513                        mOrigBandwidthIndex, mCurBandwidthIndex);
1514            }
1515
1516            ALOGV("pausing fetcher-%d, threshold=%.2f",
1517                    fetcher->getFetcherID(), threshold);
1518            fetcher->pauseAsync(threshold);
1519        }
1520    }
1521
1522    sp<AMessage> msg;
1523    if (timeUs < 0ll) {
1524        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1525        msg = new AMessage(kWhatChangeConfiguration3, this);
1526    } else {
1527        msg = new AMessage(kWhatChangeConfiguration2, this);
1528    }
1529    msg->setInt32("streamMask", streamMask);
1530    msg->setInt32("resumeMask", resumeMask);
1531    msg->setInt32("pickTrack", pickTrack);
1532    msg->setInt64("timeUs", timeUs);
1533    for (size_t i = 0; i < kMaxStreams; ++i) {
1534        if ((streamMask | resumeMask) & indexToType(i)) {
1535            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1536        }
1537    }
1538
1539    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1540    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1541    // fetchers have completed their asynchronous operation, we'll post
1542    // mContinuation, which then is handled below in onChangeConfiguration2.
1543    mContinuationCounter = mFetcherInfos.size();
1544    mContinuation = msg;
1545
1546    if (mContinuationCounter == 0) {
1547        msg->post();
1548    }
1549}
1550
1551void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1552    ALOGV("onChangeConfiguration");
1553
1554    if (!mReconfigurationInProgress) {
1555        int32_t pickTrack = 0;
1556        msg->findInt32("pickTrack", &pickTrack);
1557        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1558    } else {
1559        msg->post(1000000ll); // retry in 1 sec
1560    }
1561}
1562
1563void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1564    ALOGV("onChangeConfiguration2");
1565
1566    mContinuation.clear();
1567
1568    // All fetchers are either suspended or have been removed now.
1569
1570    // If we're seeking, clear all packet sources before we report
1571    // seek complete, to prevent decoder from pulling stale data.
1572    int64_t timeUs;
1573    CHECK(msg->findInt64("timeUs", &timeUs));
1574
1575    if (timeUs >= 0) {
1576        mLastSeekTimeUs = timeUs;
1577
1578        for (size_t i = 0; i < mPacketSources.size(); i++) {
1579            mPacketSources.editValueAt(i)->clear();
1580        }
1581
1582        for (size_t i = 0; i < kMaxStreams; ++i) {
1583            mStreams[i].mCurDiscontinuitySeq = 0;
1584        }
1585
1586        mDiscontinuityOffsetTimesUs.clear();
1587        mDiscontinuityAbsStartTimesUs.clear();
1588
1589        if (mSeekReplyID != NULL) {
1590            CHECK(mSeekReply != NULL);
1591            mSeekReply->setInt32("err", OK);
1592            mSeekReply->postReply(mSeekReplyID);
1593            mSeekReplyID.clear();
1594            mSeekReply.clear();
1595        }
1596
1597        // restart buffer polling after seek becauese previous
1598        // buffering position is no longer valid.
1599        restartPollBuffering();
1600    }
1601
1602    uint32_t streamMask, resumeMask;
1603    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1604    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1605
1606    streamMask |= resumeMask;
1607
1608    AString URIs[kMaxStreams];
1609    for (size_t i = 0; i < kMaxStreams; ++i) {
1610        if (streamMask & indexToType(i)) {
1611            const AString &uriKey = mStreams[i].uriKey();
1612            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1613            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1614        }
1615    }
1616
1617    uint32_t changedMask = 0;
1618    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1619        // stream URI could change even if onChangeConfiguration2 is only
1620        // used for seek. Seek could happen during a bw switch, in this
1621        // case bw switch will be cancelled, but the seekTo position will
1622        // fetch from the new URI.
1623        if ((mStreamMask & streamMask & indexToType(i))
1624                && !mStreams[i].mUri.empty()
1625                && !(URIs[i] == mStreams[i].mUri)) {
1626            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1627                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1628            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1629            source->queueDiscontinuity(
1630                    ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1631        }
1632        // Determine which decoders to shutdown on the player side,
1633        // a decoder has to be shutdown if its streamtype was active
1634        // before but now longer isn't.
1635        if ((mStreamMask & ~streamMask & indexToType(i))) {
1636            changedMask |= indexToType(i);
1637        }
1638    }
1639
1640    if (changedMask == 0) {
1641        // If nothing changed as far as the audio/video decoders
1642        // are concerned we can proceed.
1643        onChangeConfiguration3(msg);
1644        return;
1645    }
1646
1647    // Something changed, inform the player which will shutdown the
1648    // corresponding decoders and will post the reply once that's done.
1649    // Handling the reply will continue executing below in
1650    // onChangeConfiguration3.
1651    sp<AMessage> notify = mNotify->dup();
1652    notify->setInt32("what", kWhatStreamsChanged);
1653    notify->setInt32("changedMask", changedMask);
1654
1655    msg->setWhat(kWhatChangeConfiguration3);
1656    msg->setTarget(this);
1657
1658    notify->setMessage("reply", msg);
1659    notify->post();
1660}
1661
1662void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1663    mContinuation.clear();
1664    // All remaining fetchers are still suspended, the player has shutdown
1665    // any decoders that needed it.
1666
1667    uint32_t streamMask, resumeMask;
1668    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1669    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1670
1671    mNewStreamMask = streamMask | resumeMask;
1672
1673    int64_t timeUs;
1674    int32_t pickTrack;
1675    bool switching = false;
1676    CHECK(msg->findInt64("timeUs", &timeUs));
1677    CHECK(msg->findInt32("pickTrack", &pickTrack));
1678
1679    if (timeUs < 0ll) {
1680        if (!pickTrack) {
1681            // mSwapMask contains streams that are in both old and new variant,
1682            // (in mNewStreamMask & mStreamMask) but with different URIs
1683            // (not in resumeMask).
1684            // For example, old variant has video and audio in two separate
1685            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1686            // should be 0 as there is nothing to swap. We only need to stop video,
1687            // and resume audio.
1688            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1689            switching = (mSwapMask != 0);
1690            if (!switching) {
1691                ALOGV("#### Finishing Bandwidth Switch Early: %zd => %zd",
1692                        mOrigBandwidthIndex, mCurBandwidthIndex);
1693            }
1694        }
1695        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1696    } else {
1697        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1698    }
1699
1700    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1701            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1702            (long long)timeUs, switching, pickTrack,
1703            mStreamMask, mNewStreamMask, mSwapMask);
1704
1705    for (size_t i = 0; i < kMaxStreams; ++i) {
1706        if (streamMask & indexToType(i)) {
1707            if (switching) {
1708                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1709            } else {
1710                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1711            }
1712        }
1713    }
1714
1715    // Of all existing fetchers:
1716    // * Resume fetchers that are still needed and assign them original packet sources.
1717    // * Mark otherwise unneeded fetchers for removal.
1718    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1719    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1720        const AString &uri = mFetcherInfos.keyAt(i);
1721        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1722            ALOGV("marking fetcher-%d to be removed",
1723                    mFetcherInfos[i].mFetcher->getFetcherID());
1724
1725            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1726        }
1727    }
1728
1729    // streamMask now only contains the types that need a new fetcher created.
1730    if (streamMask != 0) {
1731        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1732    }
1733
1734    // Find out when the original fetchers have buffered up to and start the new fetchers
1735    // at a later timestamp.
1736    for (size_t i = 0; i < kMaxStreams; i++) {
1737        if (!(indexToType(i) & streamMask)) {
1738            continue;
1739        }
1740
1741        AString uri;
1742        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1743
1744        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1745        CHECK(fetcher != NULL);
1746
1747        HLSTime startTime;
1748        SeekMode seekMode = kSeekModeExactPosition;
1749        sp<AnotherPacketSource> sources[kMaxStreams];
1750
1751        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1752            startTime = latestMediaSegmentStartTime();
1753        }
1754
1755        // TRICKY: looping from i as earlier streams are already removed from streamMask
1756        for (size_t j = i; j < kMaxStreams; ++j) {
1757            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1758            if ((streamMask & indexToType(j)) && uri == streamUri) {
1759                sources[j] = mPacketSources.valueFor(indexToType(j));
1760
1761                if (timeUs >= 0) {
1762                    startTime.mTimeUs = timeUs;
1763                } else {
1764                    int32_t type;
1765                    sp<AMessage> meta;
1766                    if (!switching) {
1767                        // selecting, or adapting but no swap required
1768                        meta = sources[j]->getLatestDequeuedMeta();
1769                    } else {
1770                        // adapting and swap required
1771                        meta = sources[j]->getLatestEnqueuedMeta();
1772                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1773                            // switching up
1774                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1775                        }
1776                    }
1777
1778                    if (j != kSubtitleIndex && meta != NULL
1779                            && !meta->findInt32("discontinuity", &type)) {
1780                        HLSTime tmpTime(meta);
1781                        if (startTime < tmpTime) {
1782                            startTime = tmpTime;
1783                        }
1784                    }
1785
1786                    if (!switching) {
1787                        // selecting, or adapting but no swap required
1788                        sources[j]->clear();
1789                        if (j == kSubtitleIndex) {
1790                            break;
1791                        }
1792
1793                        ALOGV("stream[%zu]: queue format change", j);
1794                        sources[j]->queueDiscontinuity(
1795                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1796                    } else {
1797                        // switching, queue discontinuities after resume
1798                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1799                        sources[j]->clear();
1800                        // the new fetcher might be providing streams that used to be
1801                        // provided by two different fetchers,  if one of the fetcher
1802                        // paused in the middle while the other somehow paused in next
1803                        // seg, we have to start from next seg.
1804                        if (seekMode < mStreams[j].mSeekMode) {
1805                            seekMode = mStreams[j].mSeekMode;
1806                        }
1807                    }
1808                }
1809
1810                streamMask &= ~indexToType(j);
1811            }
1812        }
1813
1814        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1815                "segmentStartTimeUs %lld seekMode %d",
1816                fetcher->getFetcherID(),
1817                (long long)startTime.mTimeUs,
1818                (long long)mLastSeekTimeUs,
1819                (long long)startTime.getSegmentTimeUs(true /* midpoint */),
1820                seekMode);
1821
1822        // Set the target segment start time to the middle point of the
1823        // segment where the last sample was.
1824        // This gives a better guess if segments of the two variants are not
1825        // perfectly aligned. (If the corresponding segment in new variant
1826        // starts slightly later than that in the old variant, we still want
1827        // to pick that segment, not the one before)
1828        fetcher->startAsync(
1829                sources[kAudioIndex],
1830                sources[kVideoIndex],
1831                sources[kSubtitleIndex],
1832                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1833                startTime.getSegmentTimeUs(true /* midpoint */),
1834                startTime.mSeq,
1835                seekMode);
1836    }
1837
1838    // All fetchers have now been started, the configuration change
1839    // has completed.
1840
1841    mReconfigurationInProgress = false;
1842    if (switching) {
1843        mSwitchInProgress = true;
1844    } else {
1845        mStreamMask = mNewStreamMask;
1846        mOrigBandwidthIndex = mCurBandwidthIndex;
1847    }
1848
1849    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1850            mSwitchInProgress, mStreamMask);
1851
1852    if (mDisconnectReplyID != NULL) {
1853        finishDisconnect();
1854    }
1855}
1856
1857void LiveSession::swapPacketSource(StreamType stream) {
1858    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1859
1860    // transfer packets from source2 to source
1861    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1862    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1863
1864    // queue discontinuity in mPacketSource
1865    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1866
1867    // queue packets in mPacketSource2 to mPacketSource
1868    status_t finalResult = OK;
1869    sp<ABuffer> accessUnit;
1870    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1871          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1872        aps->queueAccessUnit(accessUnit);
1873    }
1874    aps2->clear();
1875}
1876
1877void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1878    if (!mSwitchInProgress) {
1879        return;
1880    }
1881
1882    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1883    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1884        return;
1885    }
1886
1887    // Swap packet source of streams provided by old variant
1888    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1889        StreamType stream = indexToType(idx);
1890        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1891            swapPacketSource(stream);
1892
1893            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1894                ALOGW("swapping stream type %d %s to empty stream",
1895                        stream, mStreams[idx].mUri.c_str());
1896            }
1897            mStreams[idx].mUri = mStreams[idx].mNewUri;
1898            mStreams[idx].mNewUri.clear();
1899
1900            mSwapMask &= ~stream;
1901        }
1902    }
1903
1904    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1905
1906    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
1907    if (mSwapMask != 0) {
1908        return;
1909    }
1910
1911    // Check if new variant contains extra streams.
1912    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1913    while (extraStreams) {
1914        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
1915        extraStreams &= ~stream;
1916
1917        swapPacketSource(stream);
1918
1919        ssize_t idx = typeToIndex(stream);
1920        CHECK(idx >= 0);
1921        if (mStreams[idx].mNewUri.empty()) {
1922            ALOGW("swapping extra stream type %d %s to empty stream",
1923                    stream, mStreams[idx].mUri.c_str());
1924        }
1925        mStreams[idx].mUri = mStreams[idx].mNewUri;
1926        mStreams[idx].mNewUri.clear();
1927    }
1928
1929    // Restart new fetcher (it was paused after the first 47k block)
1930    // and let it fetch into mPacketSources (not mPacketSources2)
1931    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1932        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1933        if (info.mToBeResumed) {
1934            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
1935            info.mToBeResumed = false;
1936        }
1937    }
1938
1939    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
1940            mOrigBandwidthIndex, mCurBandwidthIndex);
1941
1942    mStreamMask = mNewStreamMask;
1943    mSwitchInProgress = false;
1944    mOrigBandwidthIndex = mCurBandwidthIndex;
1945
1946    restartPollBuffering();
1947}
1948
1949void LiveSession::schedulePollBuffering() {
1950    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1951    msg->setInt32("generation", mPollBufferingGeneration);
1952    msg->post(1000000ll);
1953}
1954
1955void LiveSession::cancelPollBuffering() {
1956    ++mPollBufferingGeneration;
1957    mPrevBufferPercentage = -1;
1958}
1959
1960void LiveSession::restartPollBuffering() {
1961    cancelPollBuffering();
1962    onPollBuffering();
1963}
1964
1965void LiveSession::onPollBuffering() {
1966    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1967            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
1968        mSwitchInProgress, mReconfigurationInProgress,
1969        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
1970
1971    bool underflow, ready, down, up;
1972    if (checkBuffering(underflow, ready, down, up)) {
1973        if (mInPreparationPhase && ready) {
1974            postPrepared(OK);
1975        }
1976
1977        // don't switch before we report prepared
1978        if (!mInPreparationPhase) {
1979            if (ready) {
1980                stopBufferingIfNecessary();
1981            } else if (underflow) {
1982                startBufferingIfNecessary();
1983            }
1984            switchBandwidthIfNeeded(up, down);
1985       }
1986
1987    }
1988
1989    schedulePollBuffering();
1990}
1991
1992void LiveSession::cancelBandwidthSwitch(bool resume) {
1993    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
1994            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
1995    if (!mSwitchInProgress) {
1996        return;
1997    }
1998
1999    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2000        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2001        if (info.mToBeRemoved) {
2002            info.mToBeRemoved = false;
2003            if (resume) {
2004                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2005            }
2006        }
2007    }
2008
2009    for (size_t i = 0; i < kMaxStreams; ++i) {
2010        AString newUri = mStreams[i].mNewUri;
2011        if (!newUri.empty()) {
2012            // clear all mNewUri matching this newUri
2013            for (size_t j = i; j < kMaxStreams; ++j) {
2014                if (mStreams[j].mNewUri == newUri) {
2015                    mStreams[j].mNewUri.clear();
2016                }
2017            }
2018            ALOGV("stopping newUri = %s", newUri.c_str());
2019            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2020            if (index < 0) {
2021                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2022                continue;
2023            }
2024            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2025            info.mToBeRemoved = true;
2026            info.mFetcher->stopAsync();
2027        }
2028    }
2029
2030    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2031            mOrigBandwidthIndex, mCurBandwidthIndex);
2032
2033    mSwitchGeneration++;
2034    mSwitchInProgress = false;
2035    mCurBandwidthIndex = mOrigBandwidthIndex;
2036    mSwapMask = 0;
2037}
2038
2039bool LiveSession::checkBuffering(
2040        bool &underflow, bool &ready, bool &down, bool &up) {
2041    underflow = ready = down = up = false;
2042
2043    if (mReconfigurationInProgress) {
2044        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2045        return false;
2046    }
2047
2048    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2049    activeCount = underflowCount = readyCount = downCount = upCount =0;
2050    int32_t minBufferPercent = -1;
2051    int64_t durationUs;
2052    if (getDuration(&durationUs) != OK) {
2053        durationUs = -1;
2054    }
2055    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2056        // we don't check subtitles for buffering level
2057        if (!(mStreamMask & mPacketSources.keyAt(i)
2058                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2059            continue;
2060        }
2061        // ignore streams that never had any packet queued.
2062        // (it's possible that the variant only has audio or video)
2063        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2064        if (meta == NULL) {
2065            continue;
2066        }
2067
2068        int64_t bufferedDurationUs =
2069                mPacketSources[i]->getEstimatedDurationUs();
2070        ALOGV("[%s] buffered %lld us",
2071                getNameForStream(mPacketSources.keyAt(i)),
2072                (long long)bufferedDurationUs);
2073        if (durationUs >= 0) {
2074            int32_t percent;
2075            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2076                percent = 100;
2077            } else {
2078                percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2079            }
2080            if (minBufferPercent < 0 || percent < minBufferPercent) {
2081                minBufferPercent = percent;
2082            }
2083        }
2084
2085        ++activeCount;
2086        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2087        if (bufferedDurationUs > readyMark
2088                || mPacketSources[i]->isFinished(0)) {
2089            ++readyCount;
2090        }
2091        if (!mPacketSources[i]->isFinished(0)) {
2092            if (bufferedDurationUs < kUnderflowMarkUs) {
2093                ++underflowCount;
2094            }
2095            if (bufferedDurationUs > mUpSwitchMark) {
2096                ++upCount;
2097            }
2098            if (bufferedDurationUs < mDownSwitchMark) {
2099                ++downCount;
2100            }
2101        }
2102    }
2103
2104    if (minBufferPercent >= 0) {
2105        notifyBufferingUpdate(minBufferPercent);
2106    }
2107
2108    if (activeCount > 0) {
2109        up        = (upCount == activeCount);
2110        down      = (downCount > 0);
2111        ready     = (readyCount == activeCount);
2112        underflow = (underflowCount > 0);
2113        return true;
2114    }
2115
2116    return false;
2117}
2118
2119void LiveSession::startBufferingIfNecessary() {
2120    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2121            mInPreparationPhase, mBuffering);
2122    if (!mBuffering) {
2123        mBuffering = true;
2124
2125        sp<AMessage> notify = mNotify->dup();
2126        notify->setInt32("what", kWhatBufferingStart);
2127        notify->post();
2128    }
2129}
2130
2131void LiveSession::stopBufferingIfNecessary() {
2132    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2133            mInPreparationPhase, mBuffering);
2134
2135    if (mBuffering) {
2136        mBuffering = false;
2137
2138        sp<AMessage> notify = mNotify->dup();
2139        notify->setInt32("what", kWhatBufferingEnd);
2140        notify->post();
2141    }
2142}
2143
2144void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2145    if (percentage < mPrevBufferPercentage) {
2146        percentage = mPrevBufferPercentage;
2147    } else if (percentage > 100) {
2148        percentage = 100;
2149    }
2150
2151    mPrevBufferPercentage = percentage;
2152
2153    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2154
2155    sp<AMessage> notify = mNotify->dup();
2156    notify->setInt32("what", kWhatBufferingUpdate);
2157    notify->setInt32("percentage", percentage);
2158    notify->post();
2159}
2160
2161void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2162    // no need to check bandwidth if we only have 1 bandwidth settings
2163    if (mSwitchInProgress || mBandwidthItems.size() < 2) {
2164        return;
2165    }
2166
2167    int32_t bandwidthBps;
2168    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
2169        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
2170        mLastBandwidthBps = bandwidthBps;
2171    } else {
2172        ALOGV("no bandwidth estimate.");
2173        return;
2174    }
2175
2176    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2177    // canSwithDown and canSwitchUp can't both be true.
2178    // we only want to switch up when measured bw is 120% higher than current variant,
2179    // and we only want to switch down when measured bw is below current variant.
2180    bool canSwithDown = bufferLow
2181            && (bandwidthBps < (int32_t)curBandwidth);
2182    bool canSwitchUp = bufferHigh
2183            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2184
2185    if (canSwithDown || canSwitchUp) {
2186        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2187
2188        // it's possible that we're checking for canSwitchUp case, but the returned
2189        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2190        // of measured bw. In that case we don't want to do anything, since we have
2191        // both enough buffer and enough bw.
2192        if (bandwidthIndex == mCurBandwidthIndex
2193                || (canSwitchUp && bandwidthIndex < mCurBandwidthIndex)
2194                || (canSwithDown && bandwidthIndex > mCurBandwidthIndex)) {
2195            return;
2196        }
2197
2198        ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
2199                mCurBandwidthIndex, bandwidthIndex);
2200        changeConfiguration(-1, bandwidthIndex, false);
2201    }
2202}
2203
2204void LiveSession::postError(status_t err) {
2205    // if we reached EOS, notify buffering of 100%
2206    if (err == ERROR_END_OF_STREAM) {
2207        notifyBufferingUpdate(100);
2208    }
2209    // we'll stop buffer polling now, before that notify
2210    // stop buffering to stop the spinning icon
2211    stopBufferingIfNecessary();
2212    cancelPollBuffering();
2213
2214    sp<AMessage> notify = mNotify->dup();
2215    notify->setInt32("what", kWhatError);
2216    notify->setInt32("err", err);
2217    notify->post();
2218}
2219
2220void LiveSession::postPrepared(status_t err) {
2221    CHECK(mInPreparationPhase);
2222
2223    sp<AMessage> notify = mNotify->dup();
2224    if (err == OK || err == ERROR_END_OF_STREAM) {
2225        notify->setInt32("what", kWhatPrepared);
2226    } else {
2227        cancelPollBuffering();
2228
2229        notify->setInt32("what", kWhatPreparationFailed);
2230        notify->setInt32("err", err);
2231    }
2232
2233    notify->post();
2234
2235    mInPreparationPhase = false;
2236}
2237
2238
2239}  // namespace android
2240
2241