LiveSession.cpp revision 6f9c5e26c710dbee50e57316f1c460dda4850fa5
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22#include "HTTPDownloader.h"
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "mpeg2ts/AnotherPacketSource.h"
27
28#include <cutils/properties.h>
29#include <media/IMediaHTTPService.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/foundation/AUtils.h>
34#include <media/stagefright/MediaDefs.h>
35#include <media/stagefright/MetaData.h>
36#include <media/stagefright/Utils.h>
37
38#include <utils/Mutex.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42
43namespace android {
44
45// static
46// Bandwidth Switch Mark Defaults
47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52// Buffer Prepare/Ready/Underflow Marks
53const int64_t LiveSession::kReadyMarkUs = 5000000ll;
54const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
55const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
56
57struct LiveSession::BandwidthEstimator : public RefBase {
58    BandwidthEstimator();
59
60    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61    bool estimateBandwidth(int32_t *bandwidth, bool *isStable = NULL);
62
63private:
64    // Bandwidth estimation parameters
65    static const int32_t kMinBandwidthHistoryItems = 20;
66    static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
67    static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
68
69    struct BandwidthEntry {
70        int64_t mDelayUs;
71        size_t mNumBytes;
72    };
73
74    Mutex mLock;
75    List<BandwidthEntry> mBandwidthHistory;
76    List<int32_t> mPrevEstimates;
77    bool mHasNewSample;
78    bool mIsStable;
79    int64_t mTotalTransferTimeUs;
80    size_t mTotalTransferBytes;
81
82    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
83};
84
85LiveSession::BandwidthEstimator::BandwidthEstimator() :
86    mHasNewSample(false),
87    mIsStable(true),
88    mTotalTransferTimeUs(0),
89    mTotalTransferBytes(0) {
90}
91
92void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
93        size_t numBytes, int64_t delayUs) {
94    AutoMutex autoLock(mLock);
95
96    BandwidthEntry entry;
97    entry.mDelayUs = delayUs;
98    entry.mNumBytes = numBytes;
99    mTotalTransferTimeUs += delayUs;
100    mTotalTransferBytes += numBytes;
101    mBandwidthHistory.push_back(entry);
102    mHasNewSample = true;
103
104    // Remove no more than 10% of total transfer time at a time
105    // to avoid sudden jump on bandwidth estimation. There might
106    // be long blocking reads that takes up signification time,
107    // we have to keep a longer window in that case.
108    int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
109    if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
110        bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
111    } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
112        bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
113    }
114    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
115    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
116    while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
117        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
118        if (mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
119            break;
120        }
121        mTotalTransferTimeUs -= it->mDelayUs;
122        mTotalTransferBytes -= it->mNumBytes;
123        mBandwidthHistory.erase(mBandwidthHistory.begin());
124    }
125}
126
127bool LiveSession::BandwidthEstimator::estimateBandwidth(
128        int32_t *bandwidthBps, bool *isStable) {
129    AutoMutex autoLock(mLock);
130
131    if (mBandwidthHistory.size() < 2) {
132        return false;
133    }
134
135    if (!mHasNewSample) {
136        *bandwidthBps = *(--mPrevEstimates.end());
137        if (isStable) {
138            *isStable = mIsStable;
139        }
140        return true;
141    }
142
143    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
144    mPrevEstimates.push_back(*bandwidthBps);
145    while (mPrevEstimates.size() > 3) {
146        mPrevEstimates.erase(mPrevEstimates.begin());
147    }
148    mHasNewSample = false;
149
150    int32_t minEstimate = -1, maxEstimate = -1;
151    List<int32_t>::iterator it;
152    for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
153        int32_t estimate = *it;
154        if (minEstimate < 0 || minEstimate > estimate) {
155            minEstimate = estimate;
156        }
157        if (maxEstimate < 0 || maxEstimate < estimate) {
158            maxEstimate = estimate;
159        }
160    }
161    mIsStable = (maxEstimate <= minEstimate * 4 / 3);
162    if (isStable) {
163       *isStable = mIsStable;
164    }
165#if 0
166    {
167        char dumpStr[1024] = {0};
168        size_t itemIdx = 0;
169        size_t histSize = mBandwidthHistory.size();
170        sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
171            *bandwidthBps, mIsStable, histSize);
172        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
173        for (; it != mBandwidthHistory.end(); ++it) {
174            if (itemIdx > 50) {
175                sprintf(dumpStr + strlen(dumpStr),
176                        "...(%zd more items)... }", histSize - itemIdx);
177                break;
178            }
179            sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
180                it->mNumBytes / 1024,
181                (double)it->mDelayUs * 1.0e-6,
182                (it == (--mBandwidthHistory.end())) ? "}" : ", ");
183            itemIdx++;
184        }
185        ALOGE(dumpStr);
186    }
187#endif
188    return true;
189}
190
191//static
192const char *LiveSession::getKeyForStream(StreamType type) {
193    switch (type) {
194        case STREAMTYPE_VIDEO:
195            return "timeUsVideo";
196        case STREAMTYPE_AUDIO:
197            return "timeUsAudio";
198        case STREAMTYPE_SUBTITLES:
199            return "timeUsSubtitle";
200        case STREAMTYPE_METADATA:
201            return "timeUsMetadata"; // unused
202        default:
203            TRESPASS();
204    }
205    return NULL;
206}
207
208//static
209const char *LiveSession::getNameForStream(StreamType type) {
210    switch (type) {
211        case STREAMTYPE_VIDEO:
212            return "video";
213        case STREAMTYPE_AUDIO:
214            return "audio";
215        case STREAMTYPE_SUBTITLES:
216            return "subs";
217        case STREAMTYPE_METADATA:
218            return "metadata";
219        default:
220            break;
221    }
222    return "unknown";
223}
224
225//static
226ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
227    switch (type) {
228        case STREAMTYPE_VIDEO:
229            return ATSParser::VIDEO;
230        case STREAMTYPE_AUDIO:
231            return ATSParser::AUDIO;
232        case STREAMTYPE_METADATA:
233            return ATSParser::META;
234        case STREAMTYPE_SUBTITLES:
235        default:
236            TRESPASS();
237    }
238    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
239}
240
241LiveSession::LiveSession(
242        const sp<AMessage> &notify, uint32_t flags,
243        const sp<IMediaHTTPService> &httpService)
244    : mNotify(notify),
245      mFlags(flags),
246      mHTTPService(httpService),
247      mBuffering(false),
248      mInPreparationPhase(true),
249      mPollBufferingGeneration(0),
250      mPrevBufferPercentage(-1),
251      mCurBandwidthIndex(-1),
252      mOrigBandwidthIndex(-1),
253      mLastBandwidthBps(-1ll),
254      mBandwidthEstimator(new BandwidthEstimator()),
255      mMaxWidth(720),
256      mMaxHeight(480),
257      mStreamMask(0),
258      mNewStreamMask(0),
259      mSwapMask(0),
260      mSwitchGeneration(0),
261      mSubtitleGeneration(0),
262      mLastDequeuedTimeUs(0ll),
263      mRealTimeBaseUs(0ll),
264      mReconfigurationInProgress(false),
265      mSwitchInProgress(false),
266      mUpSwitchMark(kUpSwitchMarkUs),
267      mDownSwitchMark(kDownSwitchMarkUs),
268      mUpSwitchMargin(kUpSwitchMarginUs),
269      mFirstTimeUsValid(false),
270      mFirstTimeUs(0),
271      mLastSeekTimeUs(0),
272      mHasMetadata(false) {
273    mStreams[kAudioIndex] = StreamItem("audio");
274    mStreams[kVideoIndex] = StreamItem("video");
275    mStreams[kSubtitleIndex] = StreamItem("subtitles");
276
277    for (size_t i = 0; i < kNumSources; ++i) {
278        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
279        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
280    }
281}
282
283LiveSession::~LiveSession() {
284    if (mFetcherLooper != NULL) {
285        mFetcherLooper->stop();
286    }
287}
288
289int64_t LiveSession::calculateMediaTimeUs(
290        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
291    if (timeUs >= firstTimeUs) {
292        timeUs -= firstTimeUs;
293    } else {
294        timeUs = 0;
295    }
296    timeUs += mLastSeekTimeUs;
297    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
298        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
299    }
300    return timeUs;
301}
302
303status_t LiveSession::dequeueAccessUnit(
304        StreamType stream, sp<ABuffer> *accessUnit) {
305    status_t finalResult = OK;
306    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
307
308    ssize_t streamIdx = typeToIndex(stream);
309    if (streamIdx < 0) {
310        return BAD_VALUE;
311    }
312    const char *streamStr = getNameForStream(stream);
313    // Do not let client pull data if we don't have data packets yet.
314    // We might only have a format discontinuity queued without data.
315    // When NuPlayerDecoder dequeues the format discontinuity, it will
316    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
317    // thinks it can do seamless change, so will not shutdown decoder.
318    // When the actual format arrives, it can't handle it and get stuck.
319    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
320        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
321                streamStr, finalResult);
322
323        if (finalResult == OK) {
324            return -EAGAIN;
325        } else {
326            return finalResult;
327        }
328    }
329
330    // Let the client dequeue as long as we have buffers available
331    // Do not make pause/resume decisions here.
332
333    status_t err = packetSource->dequeueAccessUnit(accessUnit);
334
335    if (err == INFO_DISCONTINUITY) {
336        // adaptive streaming, discontinuities in the playlist
337        int32_t type;
338        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
339
340        sp<AMessage> extra;
341        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
342            extra.clear();
343        }
344
345        ALOGI("[%s] read discontinuity of type %d, extra = %s",
346              streamStr,
347              type,
348              extra == NULL ? "NULL" : extra->debugString().c_str());
349    } else if (err == OK) {
350
351        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
352            int64_t timeUs, originalTimeUs;
353            int32_t discontinuitySeq = 0;
354            StreamItem& strm = mStreams[streamIdx];
355            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
356            originalTimeUs = timeUs;
357            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
358            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
359                int64_t offsetTimeUs;
360                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
361                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
362                } else {
363                    offsetTimeUs = 0;
364                }
365
366                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
367                        && strm.mLastDequeuedTimeUs >= 0) {
368                    int64_t firstTimeUs;
369                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
370                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
371                    offsetTimeUs += strm.mLastSampleDurationUs;
372                } else {
373                    offsetTimeUs += strm.mLastSampleDurationUs;
374                }
375
376                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
377                strm.mCurDiscontinuitySeq = discontinuitySeq;
378            }
379
380            int32_t discard = 0;
381            int64_t firstTimeUs;
382            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
383                int64_t durUs; // approximate sample duration
384                if (timeUs > strm.mLastDequeuedTimeUs) {
385                    durUs = timeUs - strm.mLastDequeuedTimeUs;
386                } else {
387                    durUs = strm.mLastDequeuedTimeUs - timeUs;
388                }
389                strm.mLastSampleDurationUs = durUs;
390                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
391            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
392                firstTimeUs = timeUs;
393            } else {
394                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
395                firstTimeUs = timeUs;
396            }
397
398            strm.mLastDequeuedTimeUs = timeUs;
399            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
400
401            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
402                    streamStr, (long long)timeUs, (long long)originalTimeUs);
403            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
404            mLastDequeuedTimeUs = timeUs;
405            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
406        } else if (stream == STREAMTYPE_SUBTITLES) {
407            int32_t subtitleGeneration;
408            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
409                    && subtitleGeneration != mSubtitleGeneration) {
410               return -EAGAIN;
411            };
412            (*accessUnit)->meta()->setInt32(
413                    "trackIndex", mPlaylist->getSelectedIndex());
414            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
415        } else if (stream == STREAMTYPE_METADATA) {
416            HLSTime mdTime((*accessUnit)->meta());
417            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
418                packetSource->requeueAccessUnit((*accessUnit));
419                return -EAGAIN;
420            } else {
421                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
422                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
423                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
424                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
425            }
426        }
427    } else {
428        ALOGI("[%s] encountered error %d", streamStr, err);
429    }
430
431    return err;
432}
433
434status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
435    if (!(mStreamMask & stream)) {
436        return UNKNOWN_ERROR;
437    }
438
439    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
440
441    sp<MetaData> meta = packetSource->getFormat();
442
443    if (meta == NULL) {
444        return -EAGAIN;
445    }
446
447    if (stream == STREAMTYPE_AUDIO) {
448        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
449        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
450    } else if (stream == STREAMTYPE_VIDEO) {
451        meta->setInt32(kKeyMaxWidth, mMaxWidth);
452        meta->setInt32(kKeyMaxHeight, mMaxHeight);
453    }
454
455    return convertMetaDataToMessage(meta, format);
456}
457
458sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
459    return new HTTPDownloader(mHTTPService, mExtraHeaders);
460}
461
462void LiveSession::connectAsync(
463        const char *url, const KeyedVector<String8, String8> *headers) {
464    sp<AMessage> msg = new AMessage(kWhatConnect, this);
465    msg->setString("url", url);
466
467    if (headers != NULL) {
468        msg->setPointer(
469                "headers",
470                new KeyedVector<String8, String8>(*headers));
471    }
472
473    msg->post();
474}
475
476status_t LiveSession::disconnect() {
477    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
478
479    sp<AMessage> response;
480    status_t err = msg->postAndAwaitResponse(&response);
481
482    return err;
483}
484
485status_t LiveSession::seekTo(int64_t timeUs) {
486    sp<AMessage> msg = new AMessage(kWhatSeek, this);
487    msg->setInt64("timeUs", timeUs);
488
489    sp<AMessage> response;
490    status_t err = msg->postAndAwaitResponse(&response);
491
492    return err;
493}
494
495bool LiveSession::checkSwitchProgress(
496        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
497    AString newUri;
498    CHECK(stopParams->findString("uri", &newUri));
499
500    *needResumeUntil = false;
501    sp<AMessage> firstNewMeta[kMaxStreams];
502    for (size_t i = 0; i < kMaxStreams; ++i) {
503        StreamType stream = indexToType(i);
504        if (!(mSwapMask & mNewStreamMask & stream)
505            || (mStreams[i].mNewUri != newUri)) {
506            continue;
507        }
508        if (stream == STREAMTYPE_SUBTITLES) {
509            continue;
510        }
511        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
512
513        // First, get latest dequeued meta, which is where the decoder is at.
514        // (when upswitching, we take the meta after a certain delay, so that
515        // the decoder is left with some cushion)
516        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
517        if (delayUs > 0) {
518            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
519            if (lastDequeueMeta == NULL) {
520                // this means we don't have enough cushion, try again later
521                ALOGV("[%s] up switching failed due to insufficient buffer",
522                        getNameForStream(stream));
523                return false;
524            }
525        } else {
526            // It's okay for lastDequeueMeta to be NULL here, it means the
527            // decoder hasn't even started dequeueing
528            lastDequeueMeta = source->getLatestDequeuedMeta();
529        }
530        // Then, trim off packets at beginning of mPacketSources2 that's before
531        // the latest dequeued time. These samples are definitely too late.
532        firstNewMeta[i] = mPacketSources2.editValueAt(i)
533                            ->trimBuffersBeforeMeta(lastDequeueMeta);
534
535        // Now firstNewMeta[i] is the first sample after the trim.
536        // If it's NULL, we failed because dequeue already past all samples
537        // in mPacketSource2, we have to try again.
538        if (firstNewMeta[i] == NULL) {
539            HLSTime dequeueTime(lastDequeueMeta);
540            ALOGV("[%s] dequeue time (%d, %lld) past start time",
541                    getNameForStream(stream),
542                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
543            return false;
544        }
545
546        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
547        // already fetched, and see if we need to resumeUntil
548        lastEnqueueMeta = source->getLatestEnqueuedMeta();
549        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
550        // boundary, no need to resume as the content will look different anyways
551        if (lastEnqueueMeta != NULL) {
552            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
553
554            // no need to resume old fetcher if new fetcher started in different
555            // discontinuity sequence, as the content will look different.
556            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
557                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
558
559            // update the stopTime for resumeUntil
560            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
561            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
562        }
563    }
564
565    // if we're here, it means dequeue progress hasn't passed some samples in
566    // mPacketSource2, we can trim off the excess in mPacketSource.
567    // (old fetcher might still need to resumeUntil the start time of new fetcher)
568    for (size_t i = 0; i < kMaxStreams; ++i) {
569        StreamType stream = indexToType(i);
570        if (!(mSwapMask & mNewStreamMask & stream)
571            || (newUri != mStreams[i].mNewUri)
572            || stream == STREAMTYPE_SUBTITLES) {
573            continue;
574        }
575        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
576    }
577
578    // no resumeUntil if already underflow
579    *needResumeUntil &= !mBuffering;
580
581    return true;
582}
583
584void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
585    switch (msg->what()) {
586        case kWhatConnect:
587        {
588            onConnect(msg);
589            break;
590        }
591
592        case kWhatDisconnect:
593        {
594            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
595
596            if (mReconfigurationInProgress) {
597                break;
598            }
599
600            finishDisconnect();
601            break;
602        }
603
604        case kWhatSeek:
605        {
606            if (mReconfigurationInProgress) {
607                msg->post(50000);
608                break;
609            }
610
611            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
612            mSeekReply = new AMessage;
613
614            onSeek(msg);
615            break;
616        }
617
618        case kWhatFetcherNotify:
619        {
620            int32_t what;
621            CHECK(msg->findInt32("what", &what));
622
623            switch (what) {
624                case PlaylistFetcher::kWhatStarted:
625                    break;
626                case PlaylistFetcher::kWhatPaused:
627                case PlaylistFetcher::kWhatStopped:
628                {
629                    AString uri;
630                    CHECK(msg->findString("uri", &uri));
631                    ssize_t index = mFetcherInfos.indexOfKey(uri);
632                    if (index < 0) {
633                        // ignore msgs from fetchers that's already gone
634                        break;
635                    }
636
637                    ALOGV("fetcher-%d %s",
638                            mFetcherInfos[index].mFetcher->getFetcherID(),
639                            what == PlaylistFetcher::kWhatPaused ?
640                                    "paused" : "stopped");
641
642                    if (what == PlaylistFetcher::kWhatStopped) {
643                        mFetcherLooper->unregisterHandler(
644                                mFetcherInfos[index].mFetcher->id());
645                        mFetcherInfos.removeItemsAt(index);
646                    } else if (what == PlaylistFetcher::kWhatPaused) {
647                        int32_t seekMode;
648                        CHECK(msg->findInt32("seekMode", &seekMode));
649                        for (size_t i = 0; i < kMaxStreams; ++i) {
650                            if (mStreams[i].mUri == uri) {
651                                mStreams[i].mSeekMode = (SeekMode) seekMode;
652                            }
653                        }
654                    }
655
656                    if (mContinuation != NULL) {
657                        CHECK_GT(mContinuationCounter, 0);
658                        if (--mContinuationCounter == 0) {
659                            mContinuation->post();
660                        }
661                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
662                    }
663                    break;
664                }
665
666                case PlaylistFetcher::kWhatDurationUpdate:
667                {
668                    AString uri;
669                    CHECK(msg->findString("uri", &uri));
670
671                    int64_t durationUs;
672                    CHECK(msg->findInt64("durationUs", &durationUs));
673
674                    ssize_t index = mFetcherInfos.indexOfKey(uri);
675                    if (index >= 0) {
676                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
677                        info->mDurationUs = durationUs;
678                    }
679                    break;
680                }
681
682                case PlaylistFetcher::kWhatTargetDurationUpdate:
683                {
684                    int64_t targetDurationUs;
685                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
686                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
687                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
688                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
689                    break;
690                }
691
692                case PlaylistFetcher::kWhatError:
693                {
694                    status_t err;
695                    CHECK(msg->findInt32("err", &err));
696
697                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
698
699                    // handle EOS on subtitle tracks independently
700                    AString uri;
701                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
702                        ssize_t i = mFetcherInfos.indexOfKey(uri);
703                        if (i >= 0) {
704                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
705                            if (fetcher != NULL) {
706                                uint32_t type = fetcher->getStreamTypeMask();
707                                if (type == STREAMTYPE_SUBTITLES) {
708                                    mPacketSources.valueFor(
709                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
710                                    break;
711                                }
712                            }
713                        }
714                    }
715
716                    if (mInPreparationPhase) {
717                        postPrepared(err);
718                    }
719
720                    cancelBandwidthSwitch();
721
722                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
723
724                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
725
726                    mPacketSources.valueFor(
727                            STREAMTYPE_SUBTITLES)->signalEOS(err);
728
729                    postError(err);
730                    break;
731                }
732
733                case PlaylistFetcher::kWhatStopReached:
734                {
735                    ALOGV("kWhatStopReached");
736
737                    AString oldUri;
738                    CHECK(msg->findString("uri", &oldUri));
739
740                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
741                    if (index < 0) {
742                        break;
743                    }
744
745                    tryToFinishBandwidthSwitch(oldUri);
746                    break;
747                }
748
749                case PlaylistFetcher::kWhatStartedAt:
750                {
751                    int32_t switchGeneration;
752                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
753
754                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
755                            switchGeneration, mSwitchGeneration);
756
757                    if (switchGeneration != mSwitchGeneration) {
758                        break;
759                    }
760
761                    AString uri;
762                    CHECK(msg->findString("uri", &uri));
763
764                    // mark new fetcher mToBeResumed
765                    ssize_t index = mFetcherInfos.indexOfKey(uri);
766                    if (index >= 0) {
767                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
768                    }
769
770                    // temporarily disable packet sources to be swapped to prevent
771                    // NuPlayerDecoder from dequeuing while we check progress
772                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
773                        if ((mSwapMask & mPacketSources.keyAt(i))
774                                && uri == mStreams[i].mNewUri) {
775                            mPacketSources.editValueAt(i)->enable(false);
776                        }
777                    }
778                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
779                    // If switching up, require a cushion bigger than kUnderflowMark
780                    // to avoid buffering immediately after the switch.
781                    // (If we don't have that cushion we'd rather cancel and try again.)
782                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
783                    bool needResumeUntil = false;
784                    sp<AMessage> stopParams = msg;
785                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
786                        // playback time hasn't passed startAt time
787                        if (!needResumeUntil) {
788                            ALOGV("finish switch");
789                            for (size_t i = 0; i < kMaxStreams; ++i) {
790                                if ((mSwapMask & indexToType(i))
791                                        && uri == mStreams[i].mNewUri) {
792                                    // have to make a copy of mStreams[i].mUri because
793                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
794                                    AString oldURI = mStreams[i].mUri;
795                                    tryToFinishBandwidthSwitch(oldURI);
796                                    break;
797                                }
798                            }
799                        } else {
800                            // startAt time is after last enqueue time
801                            // Resume fetcher for the original variant; the resumed fetcher should
802                            // continue until the timestamps found in msg, which is stored by the
803                            // new fetcher to indicate where the new variant has started buffering.
804                            ALOGV("finish switch with resumeUntilAsync");
805                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
806                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
807                                if (info.mToBeRemoved) {
808                                    info.mFetcher->resumeUntilAsync(stopParams);
809                                }
810                            }
811                        }
812                    } else {
813                        // playback time passed startAt time
814                        if (switchUp) {
815                            // if switching up, cancel and retry if condition satisfies again
816                            ALOGV("cancel up switch because we're too late");
817                            cancelBandwidthSwitch(true /* resume */);
818                        } else {
819                            ALOGV("retry down switch at next sample");
820                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
821                        }
822                    }
823                    // re-enable all packet sources
824                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
825                        mPacketSources.editValueAt(i)->enable(true);
826                    }
827
828                    break;
829                }
830
831                case PlaylistFetcher::kWhatPlaylistFetched:
832                {
833                    onMasterPlaylistFetched(msg);
834                    break;
835                }
836
837                case PlaylistFetcher::kWhatMetadataDetected:
838                {
839                    if (!mHasMetadata) {
840                        mHasMetadata = true;
841                        sp<AMessage> notify = mNotify->dup();
842                        notify->setInt32("what", kWhatMetadataDetected);
843                        notify->post();
844                    }
845                    break;
846                }
847
848                default:
849                    TRESPASS();
850            }
851
852            break;
853        }
854
855        case kWhatChangeConfiguration:
856        {
857            onChangeConfiguration(msg);
858            break;
859        }
860
861        case kWhatChangeConfiguration2:
862        {
863            onChangeConfiguration2(msg);
864            break;
865        }
866
867        case kWhatChangeConfiguration3:
868        {
869            onChangeConfiguration3(msg);
870            break;
871        }
872
873        case kWhatPollBuffering:
874        {
875            int32_t generation;
876            CHECK(msg->findInt32("generation", &generation));
877            if (generation == mPollBufferingGeneration) {
878                onPollBuffering();
879            }
880            break;
881        }
882
883        default:
884            TRESPASS();
885            break;
886    }
887}
888
889// static
890int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
891    if (a->mBandwidth < b->mBandwidth) {
892        return -1;
893    } else if (a->mBandwidth == b->mBandwidth) {
894        return 0;
895    }
896
897    return 1;
898}
899
900// static
901LiveSession::StreamType LiveSession::indexToType(int idx) {
902    CHECK(idx >= 0 && idx < kNumSources);
903    return (StreamType)(1 << idx);
904}
905
906// static
907ssize_t LiveSession::typeToIndex(int32_t type) {
908    switch (type) {
909        case STREAMTYPE_AUDIO:
910            return 0;
911        case STREAMTYPE_VIDEO:
912            return 1;
913        case STREAMTYPE_SUBTITLES:
914            return 2;
915        case STREAMTYPE_METADATA:
916            return 3;
917        default:
918            return -1;
919    };
920    return -1;
921}
922
923void LiveSession::onConnect(const sp<AMessage> &msg) {
924    CHECK(msg->findString("url", &mMasterURL));
925
926    // TODO currently we don't know if we are coming here from incognito mode
927    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
928
929    KeyedVector<String8, String8> *headers = NULL;
930    if (!msg->findPointer("headers", (void **)&headers)) {
931        mExtraHeaders.clear();
932    } else {
933        mExtraHeaders = *headers;
934
935        delete headers;
936        headers = NULL;
937    }
938
939    // create looper for fetchers
940    if (mFetcherLooper == NULL) {
941        mFetcherLooper = new ALooper();
942
943        mFetcherLooper->setName("Fetcher");
944        mFetcherLooper->start(false, false);
945    }
946
947    // create fetcher to fetch the master playlist
948    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
949}
950
951void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
952    AString uri;
953    CHECK(msg->findString("uri", &uri));
954    ssize_t index = mFetcherInfos.indexOfKey(uri);
955    if (index < 0) {
956        ALOGW("fetcher for master playlist is gone.");
957        return;
958    }
959
960    // no longer useful, remove
961    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
962    mFetcherInfos.removeItemsAt(index);
963
964    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
965    if (mPlaylist == NULL) {
966        ALOGE("unable to fetch master playlist %s.",
967                uriDebugString(mMasterURL).c_str());
968
969        postPrepared(ERROR_IO);
970        return;
971    }
972    // We trust the content provider to make a reasonable choice of preferred
973    // initial bandwidth by listing it first in the variant playlist.
974    // At startup we really don't have a good estimate on the available
975    // network bandwidth since we haven't tranferred any data yet. Once
976    // we have we can make a better informed choice.
977    size_t initialBandwidth = 0;
978    size_t initialBandwidthIndex = 0;
979
980    int32_t maxWidth = 0;
981    int32_t maxHeight = 0;
982
983    if (mPlaylist->isVariantPlaylist()) {
984        Vector<BandwidthItem> itemsWithVideo;
985        for (size_t i = 0; i < mPlaylist->size(); ++i) {
986            BandwidthItem item;
987
988            item.mPlaylistIndex = i;
989
990            sp<AMessage> meta;
991            AString uri;
992            mPlaylist->itemAt(i, &uri, &meta);
993
994            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
995
996            int32_t width, height;
997            if (meta->findInt32("width", &width)) {
998                maxWidth = max(maxWidth, width);
999            }
1000            if (meta->findInt32("height", &height)) {
1001                maxHeight = max(maxHeight, height);
1002            }
1003
1004            mBandwidthItems.push(item);
1005            if (mPlaylist->hasType(i, "video")) {
1006                itemsWithVideo.push(item);
1007            }
1008        }
1009        // remove the audio-only variants if we have at least one with video
1010        if (!itemsWithVideo.empty()
1011                && itemsWithVideo.size() < mBandwidthItems.size()) {
1012            mBandwidthItems.clear();
1013            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1014                mBandwidthItems.push(itemsWithVideo[i]);
1015            }
1016        }
1017
1018        CHECK_GT(mBandwidthItems.size(), 0u);
1019        initialBandwidth = mBandwidthItems[0].mBandwidth;
1020
1021        mBandwidthItems.sort(SortByBandwidth);
1022
1023        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1024            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1025                initialBandwidthIndex = i;
1026                break;
1027            }
1028        }
1029    } else {
1030        // dummy item.
1031        BandwidthItem item;
1032        item.mPlaylistIndex = 0;
1033        item.mBandwidth = 0;
1034        mBandwidthItems.push(item);
1035    }
1036
1037    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1038    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1039
1040    mPlaylist->pickRandomMediaItems();
1041    changeConfiguration(
1042            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1043}
1044
1045void LiveSession::finishDisconnect() {
1046    ALOGV("finishDisconnect");
1047
1048    // No reconfiguration is currently pending, make sure none will trigger
1049    // during disconnection either.
1050    cancelBandwidthSwitch();
1051
1052    // cancel buffer polling
1053    cancelPollBuffering();
1054
1055    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1056    //
1057    // Some fetchers might be stuck in connect/getSize at this point. These
1058    // operations will eventually timeout (as we have a timeout set in
1059    // MediaHTTPConnection), but we don't want to block the main UI thread
1060    // until then. Here we just need to make sure we clear all references
1061    // to the fetchers, so that when they finally exit from the blocking
1062    // operation, they can be destructed.
1063    //
1064    // There is one very tricky point though. For this scheme to work, the
1065    // fecther must hold a reference to LiveSession, so that LiveSession is
1066    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1067    // own destructor when it waits for mFetcherLooper to stop, which still
1068    // blocks main UI thread.
1069    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1070        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1071        mFetcherLooper->unregisterHandler(
1072                mFetcherInfos.valueAt(i).mFetcher->id());
1073    }
1074    mFetcherInfos.clear();
1075
1076    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1077    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1078
1079    mPacketSources.valueFor(
1080            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1081
1082    sp<AMessage> response = new AMessage;
1083    response->setInt32("err", OK);
1084
1085    response->postReply(mDisconnectReplyID);
1086    mDisconnectReplyID.clear();
1087}
1088
1089sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1090    ssize_t index = mFetcherInfos.indexOfKey(uri);
1091
1092    if (index >= 0) {
1093        return NULL;
1094    }
1095
1096    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1097    notify->setString("uri", uri);
1098    notify->setInt32("switchGeneration", mSwitchGeneration);
1099
1100    FetcherInfo info;
1101    info.mFetcher = new PlaylistFetcher(
1102            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1103    info.mDurationUs = -1ll;
1104    info.mToBeRemoved = false;
1105    info.mToBeResumed = false;
1106    mFetcherLooper->registerHandler(info.mFetcher);
1107
1108    mFetcherInfos.add(uri, info);
1109
1110    return info.mFetcher;
1111}
1112
1113#if 0
1114static double uniformRand() {
1115    return (double)rand() / RAND_MAX;
1116}
1117#endif
1118
1119bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1120    ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1121            newUri ? "true" : "false",
1122            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1123    return i >= 0
1124            && ((!newUri && uri == mStreams[i].mUri)
1125            || (newUri && uri == mStreams[i].mNewUri));
1126}
1127
1128sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1129        size_t trackIndex, bool newUri) {
1130    StreamType type = indexToType(trackIndex);
1131    sp<AnotherPacketSource> source = NULL;
1132    if (newUri) {
1133        source = mPacketSources2.valueFor(type);
1134        source->clear();
1135    } else {
1136        source = mPacketSources.valueFor(type);
1137    };
1138    return source;
1139}
1140
1141sp<AnotherPacketSource> LiveSession::getMetadataSource(
1142        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1143    // todo: One case where the following strategy can fail is when audio and video
1144    // are in separate playlists, both are transport streams, and the metadata
1145    // is actually contained in the audio stream.
1146    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1147            streamMask, newUri ? "true" : "false");
1148
1149    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1150            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1151            // ... audio fetcher for audio only variant
1152        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1153    }
1154
1155    return NULL;
1156}
1157
1158bool LiveSession::resumeFetcher(
1159        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1160    ssize_t index = mFetcherInfos.indexOfKey(uri);
1161    if (index < 0) {
1162        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1163        return false;
1164    }
1165
1166    bool resume = false;
1167    sp<AnotherPacketSource> sources[kNumSources];
1168    for (size_t i = 0; i < kMaxStreams; ++i) {
1169        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1170            resume = true;
1171            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1172        }
1173    }
1174
1175    if (resume) {
1176        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1177        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1178
1179        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1180                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1181
1182        fetcher->startAsync(
1183                sources[kAudioIndex],
1184                sources[kVideoIndex],
1185                sources[kSubtitleIndex],
1186                getMetadataSource(sources, streamMask, newUri),
1187                timeUs, -1, -1, seekMode);
1188    }
1189
1190    return resume;
1191}
1192
1193float LiveSession::getAbortThreshold(
1194        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1195    float abortThreshold = -1.0f;
1196    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1197        /*
1198           If we're switching down, we need to decide whether to
1199
1200           1) finish last segment of high-bandwidth variant, or
1201           2) abort last segment of high-bandwidth variant, and fetch an
1202              overlapping portion from low-bandwidth variant.
1203
1204           Here we try to maximize the amount of buffer left when the
1205           switch point is met. Given the following parameters:
1206
1207           B: our current buffering level in seconds
1208           T: target duration in seconds
1209           X: sample duration in seconds remain to fetch in last segment
1210           bw0: bandwidth of old variant (as specified in playlist)
1211           bw1: bandwidth of new variant (as specified in playlist)
1212           bw: measured bandwidth available
1213
1214           If we choose 1), when switch happens at the end of current
1215           segment, our buffering will be
1216                  B + X - X * bw0 / bw
1217
1218           If we choose 2), when switch happens where we aborted current
1219           segment, our buffering will be
1220                  B - (T - X) * bw1 / bw
1221
1222           We should only choose 1) if
1223                  X/T < bw1 / (bw1 + bw0 - bw)
1224        */
1225
1226        // Taking the measured current bandwidth at 50% face value only,
1227        // as our bandwidth estimation is a lagging indicator. Being
1228        // conservative on this, we prefer switching to lower bandwidth
1229        // unless we're really confident finishing up the last segment
1230        // of higher bandwidth will be fast.
1231        CHECK(mLastBandwidthBps >= 0);
1232        abortThreshold =
1233                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1234             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1235              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1236              - (float)mLastBandwidthBps * 0.5f);
1237        if (abortThreshold < 0.0f) {
1238            abortThreshold = -1.0f; // do not abort
1239        }
1240        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1241                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1242                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1243                mLastBandwidthBps,
1244                abortThreshold);
1245    }
1246    return abortThreshold;
1247}
1248
1249void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1250    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1251}
1252
1253size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1254    if (mBandwidthItems.size() < 2) {
1255        // shouldn't be here if we only have 1 bandwidth, check
1256        // logic to get rid of redundant bandwidth polling
1257        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1258        return 0;
1259    }
1260
1261#if 1
1262    char value[PROPERTY_VALUE_MAX];
1263    ssize_t index = -1;
1264    if (property_get("media.httplive.bw-index", value, NULL)) {
1265        char *end;
1266        index = strtol(value, &end, 10);
1267        CHECK(end > value && *end == '\0');
1268
1269        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1270            index = mBandwidthItems.size() - 1;
1271        }
1272    }
1273
1274    if (index < 0) {
1275        char value[PROPERTY_VALUE_MAX];
1276        if (property_get("media.httplive.max-bw", value, NULL)) {
1277            char *end;
1278            long maxBw = strtoul(value, &end, 10);
1279            if (end > value && *end == '\0') {
1280                if (maxBw > 0 && bandwidthBps > maxBw) {
1281                    ALOGV("bandwidth capped to %ld bps", maxBw);
1282                    bandwidthBps = maxBw;
1283                }
1284            }
1285        }
1286
1287        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1288
1289        index = mBandwidthItems.size() - 1;
1290        while (index > 0) {
1291            // be conservative (70%) to avoid overestimating and immediately
1292            // switching down again.
1293            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1294            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1295                break;
1296            }
1297            --index;
1298        }
1299    }
1300#elif 0
1301    // Change bandwidth at random()
1302    size_t index = uniformRand() * mBandwidthItems.size();
1303#elif 0
1304    // There's a 50% chance to stay on the current bandwidth and
1305    // a 50% chance to switch to the next higher bandwidth (wrapping around
1306    // to lowest)
1307    const size_t kMinIndex = 0;
1308
1309    static ssize_t mCurBandwidthIndex = -1;
1310
1311    size_t index;
1312    if (mCurBandwidthIndex < 0) {
1313        index = kMinIndex;
1314    } else if (uniformRand() < 0.5) {
1315        index = (size_t)mCurBandwidthIndex;
1316    } else {
1317        index = mCurBandwidthIndex + 1;
1318        if (index == mBandwidthItems.size()) {
1319            index = kMinIndex;
1320        }
1321    }
1322    mCurBandwidthIndex = index;
1323#elif 0
1324    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1325
1326    size_t index = mBandwidthItems.size() - 1;
1327    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1328        --index;
1329    }
1330#elif 1
1331    char value[PROPERTY_VALUE_MAX];
1332    size_t index;
1333    if (property_get("media.httplive.bw-index", value, NULL)) {
1334        char *end;
1335        index = strtoul(value, &end, 10);
1336        CHECK(end > value && *end == '\0');
1337
1338        if (index >= mBandwidthItems.size()) {
1339            index = mBandwidthItems.size() - 1;
1340        }
1341    } else {
1342        index = 0;
1343    }
1344#else
1345    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1346#endif
1347
1348    CHECK_GE(index, 0);
1349
1350    return index;
1351}
1352
1353HLSTime LiveSession::latestMediaSegmentStartTime() const {
1354    HLSTime audioTime(mPacketSources.valueFor(
1355                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1356
1357    HLSTime videoTime(mPacketSources.valueFor(
1358                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1359
1360    return audioTime < videoTime ? videoTime : audioTime;
1361}
1362
1363void LiveSession::onSeek(const sp<AMessage> &msg) {
1364    int64_t timeUs;
1365    CHECK(msg->findInt64("timeUs", &timeUs));
1366    changeConfiguration(timeUs);
1367}
1368
1369status_t LiveSession::getDuration(int64_t *durationUs) const {
1370    int64_t maxDurationUs = -1ll;
1371    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1372        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1373
1374        if (fetcherDurationUs > maxDurationUs) {
1375            maxDurationUs = fetcherDurationUs;
1376        }
1377    }
1378
1379    *durationUs = maxDurationUs;
1380
1381    return OK;
1382}
1383
1384bool LiveSession::isSeekable() const {
1385    int64_t durationUs;
1386    return getDuration(&durationUs) == OK && durationUs >= 0;
1387}
1388
1389bool LiveSession::hasDynamicDuration() const {
1390    return false;
1391}
1392
1393size_t LiveSession::getTrackCount() const {
1394    if (mPlaylist == NULL) {
1395        return 0;
1396    } else {
1397        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1398    }
1399}
1400
1401sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1402    if (mPlaylist == NULL) {
1403        return NULL;
1404    } else {
1405        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1406            sp<AMessage> format = new AMessage();
1407            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1408            format->setString("language", "und");
1409            format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1410            return format;
1411        }
1412        return mPlaylist->getTrackInfo(trackIndex);
1413    }
1414}
1415
1416status_t LiveSession::selectTrack(size_t index, bool select) {
1417    if (mPlaylist == NULL) {
1418        return INVALID_OPERATION;
1419    }
1420
1421    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1422            index, select, mSubtitleGeneration);
1423
1424    ++mSubtitleGeneration;
1425    status_t err = mPlaylist->selectTrack(index, select);
1426    if (err == OK) {
1427        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1428        msg->setInt32("pickTrack", select);
1429        msg->post();
1430    }
1431    return err;
1432}
1433
1434ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1435    if (mPlaylist == NULL) {
1436        return -1;
1437    } else {
1438        return mPlaylist->getSelectedTrack(type);
1439    }
1440}
1441
1442void LiveSession::changeConfiguration(
1443        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1444    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1445          (long long)timeUs, bandwidthIndex, pickTrack);
1446
1447    cancelBandwidthSwitch();
1448
1449    CHECK(!mReconfigurationInProgress);
1450    mReconfigurationInProgress = true;
1451    if (bandwidthIndex >= 0) {
1452        mOrigBandwidthIndex = mCurBandwidthIndex;
1453        mCurBandwidthIndex = bandwidthIndex;
1454        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1455            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1456                    mOrigBandwidthIndex, mCurBandwidthIndex);
1457        }
1458    }
1459    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1460    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1461
1462    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1463    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1464
1465    AString URIs[kMaxStreams];
1466    for (size_t i = 0; i < kMaxStreams; ++i) {
1467        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1468            streamMask |= indexToType(i);
1469        }
1470    }
1471
1472    // Step 1, stop and discard fetchers that are no longer needed.
1473    // Pause those that we'll reuse.
1474    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1475        // skip fetchers that are marked mToBeRemoved,
1476        // these are done and can't be reused
1477        if (mFetcherInfos[i].mToBeRemoved) {
1478            continue;
1479        }
1480
1481        const AString &uri = mFetcherInfos.keyAt(i);
1482        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1483
1484        bool discardFetcher = true, delayRemoval = false;
1485        for (size_t j = 0; j < kMaxStreams; ++j) {
1486            StreamType type = indexToType(j);
1487            if ((streamMask & type) && uri == URIs[j]) {
1488                resumeMask |= type;
1489                streamMask &= ~type;
1490                discardFetcher = false;
1491            }
1492        }
1493        // Delay fetcher removal if not picking tracks, AND old fetcher
1494        // has stream mask that overlaps new variant. (Okay to discard
1495        // old fetcher now, if completely no overlap.)
1496        if (discardFetcher && timeUs < 0ll && !pickTrack
1497                && (fetcher->getStreamTypeMask() & streamMask)) {
1498            discardFetcher = false;
1499            delayRemoval = true;
1500        }
1501
1502        if (discardFetcher) {
1503            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1504            fetcher->stopAsync();
1505        } else {
1506            float threshold = 0.0f; // default to pause after current block (47Kbytes)
1507            bool disconnect = false;
1508            if (timeUs >= 0ll) {
1509                // seeking, no need to finish fetching
1510                disconnect = true;
1511            } else if (delayRemoval) {
1512                // adapting, abort if remaining of current segment is over threshold
1513                threshold = getAbortThreshold(
1514                        mOrigBandwidthIndex, mCurBandwidthIndex);
1515            }
1516
1517            ALOGV("pausing fetcher-%d, threshold=%.2f",
1518                    fetcher->getFetcherID(), threshold);
1519            fetcher->pauseAsync(threshold, disconnect);
1520        }
1521    }
1522
1523    sp<AMessage> msg;
1524    if (timeUs < 0ll) {
1525        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1526        msg = new AMessage(kWhatChangeConfiguration3, this);
1527    } else {
1528        msg = new AMessage(kWhatChangeConfiguration2, this);
1529    }
1530    msg->setInt32("streamMask", streamMask);
1531    msg->setInt32("resumeMask", resumeMask);
1532    msg->setInt32("pickTrack", pickTrack);
1533    msg->setInt64("timeUs", timeUs);
1534    for (size_t i = 0; i < kMaxStreams; ++i) {
1535        if ((streamMask | resumeMask) & indexToType(i)) {
1536            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1537        }
1538    }
1539
1540    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1541    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1542    // fetchers have completed their asynchronous operation, we'll post
1543    // mContinuation, which then is handled below in onChangeConfiguration2.
1544    mContinuationCounter = mFetcherInfos.size();
1545    mContinuation = msg;
1546
1547    if (mContinuationCounter == 0) {
1548        msg->post();
1549    }
1550}
1551
1552void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1553    ALOGV("onChangeConfiguration");
1554
1555    if (!mReconfigurationInProgress) {
1556        int32_t pickTrack = 0;
1557        msg->findInt32("pickTrack", &pickTrack);
1558        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1559    } else {
1560        msg->post(1000000ll); // retry in 1 sec
1561    }
1562}
1563
1564void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1565    ALOGV("onChangeConfiguration2");
1566
1567    mContinuation.clear();
1568
1569    // All fetchers are either suspended or have been removed now.
1570
1571    // If we're seeking, clear all packet sources before we report
1572    // seek complete, to prevent decoder from pulling stale data.
1573    int64_t timeUs;
1574    CHECK(msg->findInt64("timeUs", &timeUs));
1575
1576    if (timeUs >= 0) {
1577        mLastSeekTimeUs = timeUs;
1578        mLastDequeuedTimeUs = timeUs;
1579
1580        for (size_t i = 0; i < mPacketSources.size(); i++) {
1581            mPacketSources.editValueAt(i)->clear();
1582        }
1583
1584        for (size_t i = 0; i < kMaxStreams; ++i) {
1585            mStreams[i].reset();
1586        }
1587
1588        mDiscontinuityOffsetTimesUs.clear();
1589        mDiscontinuityAbsStartTimesUs.clear();
1590
1591        if (mSeekReplyID != NULL) {
1592            CHECK(mSeekReply != NULL);
1593            mSeekReply->setInt32("err", OK);
1594            mSeekReply->postReply(mSeekReplyID);
1595            mSeekReplyID.clear();
1596            mSeekReply.clear();
1597        }
1598
1599        // restart buffer polling after seek becauese previous
1600        // buffering position is no longer valid.
1601        restartPollBuffering();
1602    }
1603
1604    uint32_t streamMask, resumeMask;
1605    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1606    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1607
1608    streamMask |= resumeMask;
1609
1610    AString URIs[kMaxStreams];
1611    for (size_t i = 0; i < kMaxStreams; ++i) {
1612        if (streamMask & indexToType(i)) {
1613            const AString &uriKey = mStreams[i].uriKey();
1614            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1615            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1616        }
1617    }
1618
1619    uint32_t changedMask = 0;
1620    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1621        // stream URI could change even if onChangeConfiguration2 is only
1622        // used for seek. Seek could happen during a bw switch, in this
1623        // case bw switch will be cancelled, but the seekTo position will
1624        // fetch from the new URI.
1625        if ((mStreamMask & streamMask & indexToType(i))
1626                && !mStreams[i].mUri.empty()
1627                && !(URIs[i] == mStreams[i].mUri)) {
1628            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1629                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1630            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1631            if (source->getLatestDequeuedMeta() != NULL) {
1632                source->queueDiscontinuity(
1633                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1634            }
1635        }
1636        // Determine which decoders to shutdown on the player side,
1637        // a decoder has to be shutdown if its streamtype was active
1638        // before but now longer isn't.
1639        if ((mStreamMask & ~streamMask & indexToType(i))) {
1640            changedMask |= indexToType(i);
1641        }
1642    }
1643
1644    if (changedMask == 0) {
1645        // If nothing changed as far as the audio/video decoders
1646        // are concerned we can proceed.
1647        onChangeConfiguration3(msg);
1648        return;
1649    }
1650
1651    // Something changed, inform the player which will shutdown the
1652    // corresponding decoders and will post the reply once that's done.
1653    // Handling the reply will continue executing below in
1654    // onChangeConfiguration3.
1655    sp<AMessage> notify = mNotify->dup();
1656    notify->setInt32("what", kWhatStreamsChanged);
1657    notify->setInt32("changedMask", changedMask);
1658
1659    msg->setWhat(kWhatChangeConfiguration3);
1660    msg->setTarget(this);
1661
1662    notify->setMessage("reply", msg);
1663    notify->post();
1664}
1665
1666void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1667    mContinuation.clear();
1668    // All remaining fetchers are still suspended, the player has shutdown
1669    // any decoders that needed it.
1670
1671    uint32_t streamMask, resumeMask;
1672    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1673    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1674
1675    mNewStreamMask = streamMask | resumeMask;
1676
1677    int64_t timeUs;
1678    int32_t pickTrack;
1679    bool switching = false;
1680    CHECK(msg->findInt64("timeUs", &timeUs));
1681    CHECK(msg->findInt32("pickTrack", &pickTrack));
1682
1683    if (timeUs < 0ll) {
1684        if (!pickTrack) {
1685            // mSwapMask contains streams that are in both old and new variant,
1686            // (in mNewStreamMask & mStreamMask) but with different URIs
1687            // (not in resumeMask).
1688            // For example, old variant has video and audio in two separate
1689            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1690            // should be 0 as there is nothing to swap. We only need to stop video,
1691            // and resume audio.
1692            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1693            switching = (mSwapMask != 0);
1694        }
1695        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1696    } else {
1697        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1698    }
1699
1700    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1701            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1702            (long long)timeUs, switching, pickTrack,
1703            mStreamMask, mNewStreamMask, mSwapMask);
1704
1705    for (size_t i = 0; i < kMaxStreams; ++i) {
1706        if (streamMask & indexToType(i)) {
1707            if (switching) {
1708                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1709            } else {
1710                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1711            }
1712        }
1713    }
1714
1715    // Of all existing fetchers:
1716    // * Resume fetchers that are still needed and assign them original packet sources.
1717    // * Mark otherwise unneeded fetchers for removal.
1718    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1719    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1720        const AString &uri = mFetcherInfos.keyAt(i);
1721        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1722            ALOGV("marking fetcher-%d to be removed",
1723                    mFetcherInfos[i].mFetcher->getFetcherID());
1724
1725            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1726        }
1727    }
1728
1729    // streamMask now only contains the types that need a new fetcher created.
1730    if (streamMask != 0) {
1731        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1732    }
1733
1734    // Find out when the original fetchers have buffered up to and start the new fetchers
1735    // at a later timestamp.
1736    for (size_t i = 0; i < kMaxStreams; i++) {
1737        if (!(indexToType(i) & streamMask)) {
1738            continue;
1739        }
1740
1741        AString uri;
1742        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1743
1744        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1745        CHECK(fetcher != NULL);
1746
1747        HLSTime startTime;
1748        SeekMode seekMode = kSeekModeExactPosition;
1749        sp<AnotherPacketSource> sources[kNumSources];
1750
1751        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1752            startTime = latestMediaSegmentStartTime();
1753        }
1754
1755        // TRICKY: looping from i as earlier streams are already removed from streamMask
1756        for (size_t j = i; j < kMaxStreams; ++j) {
1757            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1758            if ((streamMask & indexToType(j)) && uri == streamUri) {
1759                sources[j] = mPacketSources.valueFor(indexToType(j));
1760
1761                if (timeUs >= 0) {
1762                    startTime.mTimeUs = timeUs;
1763                } else {
1764                    int32_t type;
1765                    sp<AMessage> meta;
1766                    if (!switching) {
1767                        // selecting, or adapting but no swap required
1768                        meta = sources[j]->getLatestDequeuedMeta();
1769                    } else {
1770                        // adapting and swap required
1771                        meta = sources[j]->getLatestEnqueuedMeta();
1772                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1773                            // switching up
1774                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1775                        }
1776                    }
1777
1778                    if ((j == kAudioIndex || j == kVideoIndex)
1779                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1780                        HLSTime tmpTime(meta);
1781                        if (startTime < tmpTime) {
1782                            startTime = tmpTime;
1783                        }
1784                    }
1785
1786                    if (!switching) {
1787                        // selecting, or adapting but no swap required
1788                        sources[j]->clear();
1789                        if (j == kSubtitleIndex) {
1790                            break;
1791                        }
1792
1793                        ALOGV("stream[%zu]: queue format change", j);
1794                        sources[j]->queueDiscontinuity(
1795                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1796                    } else {
1797                        // switching, queue discontinuities after resume
1798                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1799                        sources[j]->clear();
1800                        // the new fetcher might be providing streams that used to be
1801                        // provided by two different fetchers,  if one of the fetcher
1802                        // paused in the middle while the other somehow paused in next
1803                        // seg, we have to start from next seg.
1804                        if (seekMode < mStreams[j].mSeekMode) {
1805                            seekMode = mStreams[j].mSeekMode;
1806                        }
1807                    }
1808                }
1809
1810                streamMask &= ~indexToType(j);
1811            }
1812        }
1813
1814        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1815                "segmentStartTimeUs %lld seekMode %d",
1816                fetcher->getFetcherID(),
1817                (long long)startTime.mTimeUs,
1818                (long long)mLastSeekTimeUs,
1819                (long long)startTime.getSegmentTimeUs(),
1820                seekMode);
1821
1822        // Set the target segment start time to the middle point of the
1823        // segment where the last sample was.
1824        // This gives a better guess if segments of the two variants are not
1825        // perfectly aligned. (If the corresponding segment in new variant
1826        // starts slightly later than that in the old variant, we still want
1827        // to pick that segment, not the one before)
1828        fetcher->startAsync(
1829                sources[kAudioIndex],
1830                sources[kVideoIndex],
1831                sources[kSubtitleIndex],
1832                getMetadataSource(sources, mNewStreamMask, switching),
1833                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1834                startTime.getSegmentTimeUs(),
1835                startTime.mSeq,
1836                seekMode);
1837    }
1838
1839    // All fetchers have now been started, the configuration change
1840    // has completed.
1841
1842    mReconfigurationInProgress = false;
1843    if (switching) {
1844        mSwitchInProgress = true;
1845    } else {
1846        mStreamMask = mNewStreamMask;
1847        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1848            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1849                    mOrigBandwidthIndex, mCurBandwidthIndex);
1850            mOrigBandwidthIndex = mCurBandwidthIndex;
1851        }
1852    }
1853
1854    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1855            mSwitchInProgress, mStreamMask);
1856
1857    if (mDisconnectReplyID != NULL) {
1858        finishDisconnect();
1859    }
1860}
1861
1862void LiveSession::swapPacketSource(StreamType stream) {
1863    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1864
1865    // transfer packets from source2 to source
1866    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1867    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1868
1869    // queue discontinuity in mPacketSource
1870    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1871
1872    // queue packets in mPacketSource2 to mPacketSource
1873    status_t finalResult = OK;
1874    sp<ABuffer> accessUnit;
1875    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1876          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1877        aps->queueAccessUnit(accessUnit);
1878    }
1879    aps2->clear();
1880}
1881
1882void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1883    if (!mSwitchInProgress) {
1884        return;
1885    }
1886
1887    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1888    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1889        return;
1890    }
1891
1892    // Swap packet source of streams provided by old variant
1893    for (size_t idx = 0; idx < kMaxStreams; idx++) {
1894        StreamType stream = indexToType(idx);
1895        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1896            swapPacketSource(stream);
1897
1898            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1899                ALOGW("swapping stream type %d %s to empty stream",
1900                        stream, mStreams[idx].mUri.c_str());
1901            }
1902            mStreams[idx].mUri = mStreams[idx].mNewUri;
1903            mStreams[idx].mNewUri.clear();
1904
1905            mSwapMask &= ~stream;
1906        }
1907    }
1908
1909    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1910
1911    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
1912    if (mSwapMask != 0) {
1913        return;
1914    }
1915
1916    // Check if new variant contains extra streams.
1917    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1918    while (extraStreams) {
1919        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
1920        extraStreams &= ~stream;
1921
1922        swapPacketSource(stream);
1923
1924        ssize_t idx = typeToIndex(stream);
1925        CHECK(idx >= 0);
1926        if (mStreams[idx].mNewUri.empty()) {
1927            ALOGW("swapping extra stream type %d %s to empty stream",
1928                    stream, mStreams[idx].mUri.c_str());
1929        }
1930        mStreams[idx].mUri = mStreams[idx].mNewUri;
1931        mStreams[idx].mNewUri.clear();
1932    }
1933
1934    // Restart new fetcher (it was paused after the first 47k block)
1935    // and let it fetch into mPacketSources (not mPacketSources2)
1936    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1937        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1938        if (info.mToBeResumed) {
1939            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
1940            info.mToBeResumed = false;
1941        }
1942    }
1943
1944    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
1945            mOrigBandwidthIndex, mCurBandwidthIndex);
1946
1947    mStreamMask = mNewStreamMask;
1948    mSwitchInProgress = false;
1949    mOrigBandwidthIndex = mCurBandwidthIndex;
1950
1951    restartPollBuffering();
1952}
1953
1954void LiveSession::schedulePollBuffering() {
1955    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1956    msg->setInt32("generation", mPollBufferingGeneration);
1957    msg->post(1000000ll);
1958}
1959
1960void LiveSession::cancelPollBuffering() {
1961    ++mPollBufferingGeneration;
1962    mPrevBufferPercentage = -1;
1963}
1964
1965void LiveSession::restartPollBuffering() {
1966    cancelPollBuffering();
1967    onPollBuffering();
1968}
1969
1970void LiveSession::onPollBuffering() {
1971    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1972            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
1973        mSwitchInProgress, mReconfigurationInProgress,
1974        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
1975
1976    bool underflow, ready, down, up;
1977    if (checkBuffering(underflow, ready, down, up)) {
1978        if (mInPreparationPhase) {
1979            // Allow down switch even if we're still preparing.
1980            //
1981            // Some streams have a high bandwidth index as default,
1982            // when bandwidth is low, it takes a long time to buffer
1983            // to ready mark, then it immediately pauses after start
1984            // as we have to do a down switch. It's better experience
1985            // to restart from a lower index, if we detect low bw.
1986            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
1987                postPrepared(OK);
1988            }
1989        }
1990
1991        if (!mInPreparationPhase) {
1992            if (ready) {
1993                stopBufferingIfNecessary();
1994            } else if (underflow) {
1995                startBufferingIfNecessary();
1996            }
1997            switchBandwidthIfNeeded(up, down);
1998        }
1999    }
2000
2001    schedulePollBuffering();
2002}
2003
2004void LiveSession::cancelBandwidthSwitch(bool resume) {
2005    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2006            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2007    if (!mSwitchInProgress) {
2008        return;
2009    }
2010
2011    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2012        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2013        if (info.mToBeRemoved) {
2014            info.mToBeRemoved = false;
2015            if (resume) {
2016                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2017            }
2018        }
2019    }
2020
2021    for (size_t i = 0; i < kMaxStreams; ++i) {
2022        AString newUri = mStreams[i].mNewUri;
2023        if (!newUri.empty()) {
2024            // clear all mNewUri matching this newUri
2025            for (size_t j = i; j < kMaxStreams; ++j) {
2026                if (mStreams[j].mNewUri == newUri) {
2027                    mStreams[j].mNewUri.clear();
2028                }
2029            }
2030            ALOGV("stopping newUri = %s", newUri.c_str());
2031            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2032            if (index < 0) {
2033                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2034                continue;
2035            }
2036            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2037            info.mToBeRemoved = true;
2038            info.mFetcher->stopAsync();
2039        }
2040    }
2041
2042    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2043            mOrigBandwidthIndex, mCurBandwidthIndex);
2044
2045    mSwitchGeneration++;
2046    mSwitchInProgress = false;
2047    mCurBandwidthIndex = mOrigBandwidthIndex;
2048    mSwapMask = 0;
2049}
2050
2051bool LiveSession::checkBuffering(
2052        bool &underflow, bool &ready, bool &down, bool &up) {
2053    underflow = ready = down = up = false;
2054
2055    if (mReconfigurationInProgress) {
2056        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2057        return false;
2058    }
2059
2060    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2061    activeCount = underflowCount = readyCount = downCount = upCount =0;
2062    int32_t minBufferPercent = -1;
2063    int64_t durationUs;
2064    if (getDuration(&durationUs) != OK) {
2065        durationUs = -1;
2066    }
2067    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2068        // we don't check subtitles for buffering level
2069        if (!(mStreamMask & mPacketSources.keyAt(i)
2070                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2071            continue;
2072        }
2073        // ignore streams that never had any packet queued.
2074        // (it's possible that the variant only has audio or video)
2075        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2076        if (meta == NULL) {
2077            continue;
2078        }
2079
2080        status_t finalResult;
2081        int64_t bufferedDurationUs =
2082                mPacketSources[i]->getBufferedDurationUs(&finalResult);
2083        ALOGV("[%s] buffered %lld us",
2084                getNameForStream(mPacketSources.keyAt(i)),
2085                (long long)bufferedDurationUs);
2086        if (durationUs >= 0) {
2087            int32_t percent;
2088            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2089                percent = 100;
2090            } else {
2091                percent = (int32_t)(100.0 *
2092                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2093            }
2094            if (minBufferPercent < 0 || percent < minBufferPercent) {
2095                minBufferPercent = percent;
2096            }
2097        }
2098
2099        ++activeCount;
2100        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2101        if (bufferedDurationUs > readyMark
2102                || mPacketSources[i]->isFinished(0)) {
2103            ++readyCount;
2104        }
2105        if (!mPacketSources[i]->isFinished(0)) {
2106            if (bufferedDurationUs < kUnderflowMarkUs) {
2107                ++underflowCount;
2108            }
2109            if (bufferedDurationUs > mUpSwitchMark) {
2110                ++upCount;
2111            }
2112            if (bufferedDurationUs < mDownSwitchMark) {
2113                ++downCount;
2114            }
2115        }
2116    }
2117
2118    if (minBufferPercent >= 0) {
2119        notifyBufferingUpdate(minBufferPercent);
2120    }
2121
2122    if (activeCount > 0) {
2123        up        = (upCount == activeCount);
2124        down      = (downCount > 0);
2125        ready     = (readyCount == activeCount);
2126        underflow = (underflowCount > 0);
2127        return true;
2128    }
2129
2130    return false;
2131}
2132
2133void LiveSession::startBufferingIfNecessary() {
2134    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2135            mInPreparationPhase, mBuffering);
2136    if (!mBuffering) {
2137        mBuffering = true;
2138
2139        sp<AMessage> notify = mNotify->dup();
2140        notify->setInt32("what", kWhatBufferingStart);
2141        notify->post();
2142    }
2143}
2144
2145void LiveSession::stopBufferingIfNecessary() {
2146    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2147            mInPreparationPhase, mBuffering);
2148
2149    if (mBuffering) {
2150        mBuffering = false;
2151
2152        sp<AMessage> notify = mNotify->dup();
2153        notify->setInt32("what", kWhatBufferingEnd);
2154        notify->post();
2155    }
2156}
2157
2158void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2159    if (percentage < mPrevBufferPercentage) {
2160        percentage = mPrevBufferPercentage;
2161    } else if (percentage > 100) {
2162        percentage = 100;
2163    }
2164
2165    mPrevBufferPercentage = percentage;
2166
2167    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2168
2169    sp<AMessage> notify = mNotify->dup();
2170    notify->setInt32("what", kWhatBufferingUpdate);
2171    notify->setInt32("percentage", percentage);
2172    notify->post();
2173}
2174
2175/*
2176 * returns true if a bandwidth switch is actually needed (and started),
2177 * returns false otherwise
2178 */
2179bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2180    // no need to check bandwidth if we only have 1 bandwidth settings
2181    if (mSwitchInProgress || mBandwidthItems.size() < 2) {
2182        return false;
2183    }
2184
2185    int32_t bandwidthBps;
2186    bool isStable;
2187    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps, &isStable)) {
2188        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
2189        mLastBandwidthBps = bandwidthBps;
2190    } else {
2191        ALOGV("no bandwidth estimate.");
2192        return false;
2193    }
2194
2195    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2196    // canSwithDown and canSwitchUp can't both be true.
2197    // we only want to switch up when measured bw is 120% higher than current variant,
2198    // and we only want to switch down when measured bw is below current variant.
2199    bool canSwitchDown = bufferLow
2200            && (bandwidthBps < (int32_t)curBandwidth);
2201    bool canSwitchUp = bufferHigh
2202            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2203
2204    if (canSwitchDown || canSwitchUp) {
2205        // bandwidth estimating has some delay, if we have to downswitch when
2206        // it hasn't stabilized, be very conservative on bandwidth.
2207        if (!isStable && canSwitchDown) {
2208            bandwidthBps /= 2;
2209        }
2210
2211        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2212
2213        // it's possible that we're checking for canSwitchUp case, but the returned
2214        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2215        // of measured bw. In that case we don't want to do anything, since we have
2216        // both enough buffer and enough bw.
2217        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2218         || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2219            // if not yet prepared, just restart again with new bw index.
2220            // this is faster and playback experience is cleaner.
2221            changeConfiguration(
2222                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2223            return true;
2224        }
2225    }
2226    return false;
2227}
2228
2229void LiveSession::postError(status_t err) {
2230    // if we reached EOS, notify buffering of 100%
2231    if (err == ERROR_END_OF_STREAM) {
2232        notifyBufferingUpdate(100);
2233    }
2234    // we'll stop buffer polling now, before that notify
2235    // stop buffering to stop the spinning icon
2236    stopBufferingIfNecessary();
2237    cancelPollBuffering();
2238
2239    sp<AMessage> notify = mNotify->dup();
2240    notify->setInt32("what", kWhatError);
2241    notify->setInt32("err", err);
2242    notify->post();
2243}
2244
2245void LiveSession::postPrepared(status_t err) {
2246    CHECK(mInPreparationPhase);
2247
2248    sp<AMessage> notify = mNotify->dup();
2249    if (err == OK || err == ERROR_END_OF_STREAM) {
2250        notify->setInt32("what", kWhatPrepared);
2251    } else {
2252        cancelPollBuffering();
2253
2254        notify->setInt32("what", kWhatPreparationFailed);
2255        notify->setInt32("err", err);
2256    }
2257
2258    notify->post();
2259
2260    mInPreparationPhase = false;
2261}
2262
2263
2264}  // namespace android
2265
2266