LiveSession.cpp revision 0852843d304006e3ab333081fddda13b07193de8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/foundation/AUtils.h>
37#include <media/stagefright/DataSource.h>
38#include <media/stagefright/FileSource.h>
39#include <media/stagefright/MediaErrors.h>
40#include <media/stagefright/MediaHTTP.h>
41#include <media/stagefright/MediaDefs.h>
42#include <media/stagefright/MetaData.h>
43#include <media/stagefright/Utils.h>
44
45#include <utils/Mutex.h>
46
47#include <ctype.h>
48#include <inttypes.h>
49#include <openssl/aes.h>
50#include <openssl/md5.h>
51
52namespace android {
53
54// static
55// Bandwidth Switch Mark Defaults
56const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
57const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
58const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
59const int64_t LiveSession::kResumeThresholdUs = 100000ll;
60
61// Buffer Prepare/Ready/Underflow Marks
62const int64_t LiveSession::kReadyMarkUs = 5000000ll;
63const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
64const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
65
66struct LiveSession::BandwidthEstimator : public RefBase {
67    BandwidthEstimator();
68
69    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
70    bool estimateBandwidth(int32_t *bandwidth);
71
72private:
73    // Bandwidth estimation parameters
74    static const int32_t kMaxBandwidthHistoryItems = 20;
75    static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec
76
77    struct BandwidthEntry {
78        int64_t mDelayUs;
79        size_t mNumBytes;
80    };
81
82    Mutex mLock;
83    List<BandwidthEntry> mBandwidthHistory;
84    int64_t mTotalTransferTimeUs;
85    size_t mTotalTransferBytes;
86
87    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
88};
89
90LiveSession::BandwidthEstimator::BandwidthEstimator() :
91    mTotalTransferTimeUs(0),
92    mTotalTransferBytes(0) {
93}
94
95void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
96        size_t numBytes, int64_t delayUs) {
97    AutoMutex autoLock(mLock);
98
99    BandwidthEntry entry;
100    entry.mDelayUs = delayUs;
101    entry.mNumBytes = numBytes;
102    mTotalTransferTimeUs += delayUs;
103    mTotalTransferBytes += numBytes;
104    mBandwidthHistory.push_back(entry);
105
106    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
107    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
108    while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) {
109        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
110        if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) {
111            break;
112        }
113        mTotalTransferTimeUs -= it->mDelayUs;
114        mTotalTransferBytes -= it->mNumBytes;
115        mBandwidthHistory.erase(mBandwidthHistory.begin());
116    }
117}
118
119bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
120    AutoMutex autoLock(mLock);
121
122    if (mBandwidthHistory.size() < 2) {
123        return false;
124    }
125
126    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
127    return true;
128}
129
130//static
131const char *LiveSession::getKeyForStream(StreamType type) {
132    switch (type) {
133        case STREAMTYPE_VIDEO:
134            return "timeUsVideo";
135        case STREAMTYPE_AUDIO:
136            return "timeUsAudio";
137        case STREAMTYPE_SUBTITLES:
138            return "timeUsSubtitle";
139        case STREAMTYPE_METADATA:
140            return "timeUsMetadata"; // unused
141        default:
142            TRESPASS();
143    }
144    return NULL;
145}
146
147//static
148const char *LiveSession::getNameForStream(StreamType type) {
149    switch (type) {
150        case STREAMTYPE_VIDEO:
151            return "video";
152        case STREAMTYPE_AUDIO:
153            return "audio";
154        case STREAMTYPE_SUBTITLES:
155            return "subs";
156        case STREAMTYPE_METADATA:
157            return "metadata";
158        default:
159            break;
160    }
161    return "unknown";
162}
163
164//static
165ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
166    switch (type) {
167        case STREAMTYPE_VIDEO:
168            return ATSParser::VIDEO;
169        case STREAMTYPE_AUDIO:
170            return ATSParser::AUDIO;
171        case STREAMTYPE_METADATA:
172            return ATSParser::META;
173        case STREAMTYPE_SUBTITLES:
174        default:
175            TRESPASS();
176    }
177    return ATSParser::NUM_SOURCE_TYPES; // should not reach here
178}
179
180LiveSession::LiveSession(
181        const sp<AMessage> &notify, uint32_t flags,
182        const sp<IMediaHTTPService> &httpService)
183    : mNotify(notify),
184      mFlags(flags),
185      mHTTPService(httpService),
186      mBuffering(false),
187      mInPreparationPhase(true),
188      mPollBufferingGeneration(0),
189      mPrevBufferPercentage(-1),
190      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
191      mCurBandwidthIndex(-1),
192      mOrigBandwidthIndex(-1),
193      mLastBandwidthBps(-1ll),
194      mBandwidthEstimator(new BandwidthEstimator()),
195      mMaxWidth(720),
196      mMaxHeight(480),
197      mStreamMask(0),
198      mNewStreamMask(0),
199      mSwapMask(0),
200      mSwitchGeneration(0),
201      mSubtitleGeneration(0),
202      mLastDequeuedTimeUs(0ll),
203      mRealTimeBaseUs(0ll),
204      mReconfigurationInProgress(false),
205      mSwitchInProgress(false),
206      mUpSwitchMark(kUpSwitchMarkUs),
207      mDownSwitchMark(kDownSwitchMarkUs),
208      mUpSwitchMargin(kUpSwitchMarginUs),
209      mFirstTimeUsValid(false),
210      mFirstTimeUs(0),
211      mLastSeekTimeUs(0),
212      mHasMetadata(false) {
213    mStreams[kAudioIndex] = StreamItem("audio");
214    mStreams[kVideoIndex] = StreamItem("video");
215    mStreams[kSubtitleIndex] = StreamItem("subtitles");
216
217    for (size_t i = 0; i < kNumSources; ++i) {
218        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
219        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
220    }
221}
222
223LiveSession::~LiveSession() {
224    if (mFetcherLooper != NULL) {
225        mFetcherLooper->stop();
226    }
227}
228
229int64_t LiveSession::calculateMediaTimeUs(
230        int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
231    if (timeUs >= firstTimeUs) {
232        timeUs -= firstTimeUs;
233    } else {
234        timeUs = 0;
235    }
236    timeUs += mLastSeekTimeUs;
237    if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
238        timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
239    }
240    return timeUs;
241}
242
243status_t LiveSession::dequeueAccessUnit(
244        StreamType stream, sp<ABuffer> *accessUnit) {
245    status_t finalResult = OK;
246    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
247
248    ssize_t streamIdx = typeToIndex(stream);
249    if (streamIdx < 0) {
250        return INVALID_VALUE;
251    }
252    const char *streamStr = getNameForStream(stream);
253    // Do not let client pull data if we don't have data packets yet.
254    // We might only have a format discontinuity queued without data.
255    // When NuPlayerDecoder dequeues the format discontinuity, it will
256    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
257    // thinks it can do seamless change, so will not shutdown decoder.
258    // When the actual format arrives, it can't handle it and get stuck.
259    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
260        ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
261                streamStr, finalResult);
262
263        if (finalResult == OK) {
264            return -EAGAIN;
265        } else {
266            return finalResult;
267        }
268    }
269
270    // Let the client dequeue as long as we have buffers available
271    // Do not make pause/resume decisions here.
272
273    status_t err = packetSource->dequeueAccessUnit(accessUnit);
274
275    if (err == INFO_DISCONTINUITY) {
276        // adaptive streaming, discontinuities in the playlist
277        int32_t type;
278        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
279
280        sp<AMessage> extra;
281        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
282            extra.clear();
283        }
284
285        ALOGI("[%s] read discontinuity of type %d, extra = %s",
286              streamStr,
287              type,
288              extra == NULL ? "NULL" : extra->debugString().c_str());
289    } else if (err == OK) {
290
291        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
292            int64_t timeUs, originalTimeUs;
293            int32_t discontinuitySeq = 0;
294            StreamItem& strm = mStreams[streamIdx];
295            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
296            originalTimeUs = timeUs;
297            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
298            if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
299                int64_t offsetTimeUs;
300                if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
301                    offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
302                } else {
303                    offsetTimeUs = 0;
304                }
305
306                if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
307                    int64_t firstTimeUs;
308                    firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
309                    offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
310                    offsetTimeUs += strm.mLastSampleDurationUs;
311                } else {
312                    offsetTimeUs += strm.mLastSampleDurationUs;
313                }
314
315                mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
316                strm.mCurDiscontinuitySeq = discontinuitySeq;
317            }
318
319            int32_t discard = 0;
320            int64_t firstTimeUs;
321            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
322                int64_t durUs; // approximate sample duration
323                if (timeUs > strm.mLastDequeuedTimeUs) {
324                    durUs = timeUs - strm.mLastDequeuedTimeUs;
325                } else {
326                    durUs = strm.mLastDequeuedTimeUs - timeUs;
327                }
328                strm.mLastSampleDurationUs = durUs;
329                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
330            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
331                firstTimeUs = timeUs;
332            } else {
333                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
334                firstTimeUs = timeUs;
335            }
336
337            strm.mLastDequeuedTimeUs = timeUs;
338            timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
339
340            ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
341                    streamStr, (long long)timeUs, (long long)originalTimeUs);
342            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
343            mLastDequeuedTimeUs = timeUs;
344            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
345        } else if (stream == STREAMTYPE_SUBTITLES) {
346            int32_t subtitleGeneration;
347            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
348                    && subtitleGeneration != mSubtitleGeneration) {
349               return -EAGAIN;
350            };
351            (*accessUnit)->meta()->setInt32(
352                    "trackIndex", mPlaylist->getSelectedIndex());
353            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
354        } else if (stream == STREAMTYPE_METADATA) {
355            HLSTime mdTime((*accessUnit)->meta());
356            if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
357                packetSource->requeueAccessUnit((*accessUnit));
358                return -EAGAIN;
359            } else {
360                int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
361                int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
362                (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
363                (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
364            }
365        }
366    } else {
367        ALOGI("[%s] encountered error %d", streamStr, err);
368    }
369
370    return err;
371}
372
373status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
374    if (!(mStreamMask & stream)) {
375        return UNKNOWN_ERROR;
376    }
377
378    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
379
380    sp<MetaData> meta = packetSource->getFormat();
381
382    if (meta == NULL) {
383        return -EAGAIN;
384    }
385
386    if (stream == STREAMTYPE_AUDIO) {
387        // set AAC input buffer size to 32K bytes (256kbps x 1sec)
388        meta->setInt32(kKeyMaxInputSize, 32 * 1024);
389    } else if (stream == STREAMTYPE_VIDEO) {
390        meta->setInt32(kKeyMaxWidth, mMaxWidth);
391        meta->setInt32(kKeyMaxHeight, mMaxHeight);
392    }
393
394    return convertMetaDataToMessage(meta, format);
395}
396
397sp<HTTPBase> LiveSession::getHTTPDataSource() {
398    return new MediaHTTP(mHTTPService->makeHTTPConnection());
399}
400
401void LiveSession::connectAsync(
402        const char *url, const KeyedVector<String8, String8> *headers) {
403    sp<AMessage> msg = new AMessage(kWhatConnect, this);
404    msg->setString("url", url);
405
406    if (headers != NULL) {
407        msg->setPointer(
408                "headers",
409                new KeyedVector<String8, String8>(*headers));
410    }
411
412    msg->post();
413}
414
415status_t LiveSession::disconnect() {
416    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
417
418    sp<AMessage> response;
419    status_t err = msg->postAndAwaitResponse(&response);
420
421    return err;
422}
423
424status_t LiveSession::seekTo(int64_t timeUs) {
425    sp<AMessage> msg = new AMessage(kWhatSeek, this);
426    msg->setInt64("timeUs", timeUs);
427
428    sp<AMessage> response;
429    status_t err = msg->postAndAwaitResponse(&response);
430
431    return err;
432}
433
434bool LiveSession::checkSwitchProgress(
435        sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
436    AString newUri;
437    CHECK(stopParams->findString("uri", &newUri));
438
439    *needResumeUntil = false;
440    sp<AMessage> firstNewMeta[kMaxStreams];
441    for (size_t i = 0; i < kMaxStreams; ++i) {
442        StreamType stream = indexToType(i);
443        if (!(mSwapMask & mNewStreamMask & stream)
444            || (mStreams[i].mNewUri != newUri)) {
445            continue;
446        }
447        if (stream == STREAMTYPE_SUBTITLES) {
448            continue;
449        }
450        sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
451
452        // First, get latest dequeued meta, which is where the decoder is at.
453        // (when upswitching, we take the meta after a certain delay, so that
454        // the decoder is left with some cushion)
455        sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
456        if (delayUs > 0) {
457            lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
458            if (lastDequeueMeta == NULL) {
459                // this means we don't have enough cushion, try again later
460                ALOGV("[%s] up switching failed due to insufficient buffer",
461                        getNameForStream(stream));
462                return false;
463            }
464        } else {
465            // It's okay for lastDequeueMeta to be NULL here, it means the
466            // decoder hasn't even started dequeueing
467            lastDequeueMeta = source->getLatestDequeuedMeta();
468        }
469        // Then, trim off packets at beginning of mPacketSources2 that's before
470        // the latest dequeued time. These samples are definitely too late.
471        firstNewMeta[i] = mPacketSources2.editValueAt(i)
472                            ->trimBuffersBeforeMeta(lastDequeueMeta);
473
474        // Now firstNewMeta[i] is the first sample after the trim.
475        // If it's NULL, we failed because dequeue already past all samples
476        // in mPacketSource2, we have to try again.
477        if (firstNewMeta[i] == NULL) {
478            HLSTime dequeueTime(lastDequeueMeta);
479            ALOGV("[%s] dequeue time (%d, %lld) past start time",
480                    getNameForStream(stream),
481                    dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
482            return false;
483        }
484
485        // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
486        // already fetched, and see if we need to resumeUntil
487        lastEnqueueMeta = source->getLatestEnqueuedMeta();
488        // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
489        // boundary, no need to resume as the content will look different anyways
490        if (lastEnqueueMeta != NULL) {
491            HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
492
493            // no need to resume old fetcher if new fetcher started in different
494            // discontinuity sequence, as the content will look different.
495            *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
496                    && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
497
498            // update the stopTime for resumeUntil
499            stopParams->setInt32("discontinuitySeq", startTime.mSeq);
500            stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
501        }
502    }
503
504    // if we're here, it means dequeue progress hasn't passed some samples in
505    // mPacketSource2, we can trim off the excess in mPacketSource.
506    // (old fetcher might still need to resumeUntil the start time of new fetcher)
507    for (size_t i = 0; i < kMaxStreams; ++i) {
508        StreamType stream = indexToType(i);
509        if (!(mSwapMask & mNewStreamMask & stream)
510            || (newUri != mStreams[i].mNewUri)
511            || stream == STREAMTYPE_SUBTITLES) {
512            continue;
513        }
514        mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
515    }
516
517    // no resumeUntil if already underflow
518    *needResumeUntil &= !mBuffering;
519
520    return true;
521}
522
523void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
524    switch (msg->what()) {
525        case kWhatConnect:
526        {
527            onConnect(msg);
528            break;
529        }
530
531        case kWhatDisconnect:
532        {
533            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
534
535            if (mReconfigurationInProgress) {
536                break;
537            }
538
539            finishDisconnect();
540            break;
541        }
542
543        case kWhatSeek:
544        {
545            if (mReconfigurationInProgress) {
546                msg->post(50000);
547                break;
548            }
549
550            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
551            mSeekReply = new AMessage;
552
553            onSeek(msg);
554            break;
555        }
556
557        case kWhatFetcherNotify:
558        {
559            int32_t what;
560            CHECK(msg->findInt32("what", &what));
561
562            switch (what) {
563                case PlaylistFetcher::kWhatStarted:
564                    break;
565                case PlaylistFetcher::kWhatPaused:
566                case PlaylistFetcher::kWhatStopped:
567                {
568                    AString uri;
569                    CHECK(msg->findString("uri", &uri));
570                    ssize_t index = mFetcherInfos.indexOfKey(uri);
571                    if (index < 0) {
572                        // ignore msgs from fetchers that's already gone
573                        break;
574                    }
575
576                    ALOGV("fetcher-%d %s",
577                            mFetcherInfos[index].mFetcher->getFetcherID(),
578                            what == PlaylistFetcher::kWhatPaused ?
579                                    "paused" : "stopped");
580
581                    if (what == PlaylistFetcher::kWhatStopped) {
582                        mFetcherLooper->unregisterHandler(
583                                mFetcherInfos[index].mFetcher->id());
584                        mFetcherInfos.removeItemsAt(index);
585                    } else if (what == PlaylistFetcher::kWhatPaused) {
586                        int32_t seekMode;
587                        CHECK(msg->findInt32("seekMode", &seekMode));
588                        for (size_t i = 0; i < kMaxStreams; ++i) {
589                            if (mStreams[i].mUri == uri) {
590                                mStreams[i].mSeekMode = (SeekMode) seekMode;
591                            }
592                        }
593                    }
594
595                    if (mContinuation != NULL) {
596                        CHECK_GT(mContinuationCounter, 0);
597                        if (--mContinuationCounter == 0) {
598                            mContinuation->post();
599                        }
600                        ALOGV("%zu fetcher(s) left", mContinuationCounter);
601                    }
602                    break;
603                }
604
605                case PlaylistFetcher::kWhatDurationUpdate:
606                {
607                    AString uri;
608                    CHECK(msg->findString("uri", &uri));
609
610                    int64_t durationUs;
611                    CHECK(msg->findInt64("durationUs", &durationUs));
612
613                    ssize_t index = mFetcherInfos.indexOfKey(uri);
614                    if (index >= 0) {
615                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
616                        info->mDurationUs = durationUs;
617                    }
618                    break;
619                }
620
621                case PlaylistFetcher::kWhatTargetDurationUpdate:
622                {
623                    int64_t targetDurationUs;
624                    CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
625                    mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
626                    mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
627                    mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
628                    break;
629                }
630
631                case PlaylistFetcher::kWhatError:
632                {
633                    status_t err;
634                    CHECK(msg->findInt32("err", &err));
635
636                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
637
638                    // handle EOS on subtitle tracks independently
639                    AString uri;
640                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
641                        ssize_t i = mFetcherInfos.indexOfKey(uri);
642                        if (i >= 0) {
643                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
644                            if (fetcher != NULL) {
645                                uint32_t type = fetcher->getStreamTypeMask();
646                                if (type == STREAMTYPE_SUBTITLES) {
647                                    mPacketSources.valueFor(
648                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
649                                    break;
650                                }
651                            }
652                        }
653                    }
654
655                    if (mInPreparationPhase) {
656                        postPrepared(err);
657                    }
658
659                    cancelBandwidthSwitch();
660
661                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
662
663                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
664
665                    mPacketSources.valueFor(
666                            STREAMTYPE_SUBTITLES)->signalEOS(err);
667
668                    postError(err);
669                    break;
670                }
671
672                case PlaylistFetcher::kWhatStopReached:
673                {
674                    ALOGV("kWhatStopReached");
675
676                    AString oldUri;
677                    CHECK(msg->findString("uri", &oldUri));
678
679                    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
680                    if (index < 0) {
681                        break;
682                    }
683
684                    tryToFinishBandwidthSwitch(oldUri);
685                    break;
686                }
687
688                case PlaylistFetcher::kWhatStartedAt:
689                {
690                    int32_t switchGeneration;
691                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
692
693                    ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
694                            switchGeneration, mSwitchGeneration);
695
696                    if (switchGeneration != mSwitchGeneration) {
697                        break;
698                    }
699
700                    AString uri;
701                    CHECK(msg->findString("uri", &uri));
702
703                    // mark new fetcher mToBeResumed
704                    ssize_t index = mFetcherInfos.indexOfKey(uri);
705                    if (index >= 0) {
706                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
707                    }
708
709                    // temporarily disable packet sources to be swapped to prevent
710                    // NuPlayerDecoder from dequeuing while we check progress
711                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
712                        if ((mSwapMask & mPacketSources.keyAt(i))
713                                && uri == mStreams[i].mNewUri) {
714                            mPacketSources.editValueAt(i)->enable(false);
715                        }
716                    }
717                    bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
718                    // If switching up, require a cushion bigger than kUnderflowMark
719                    // to avoid buffering immediately after the switch.
720                    // (If we don't have that cushion we'd rather cancel and try again.)
721                    int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
722                    bool needResumeUntil = false;
723                    sp<AMessage> stopParams = msg;
724                    if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
725                        // playback time hasn't passed startAt time
726                        if (!needResumeUntil) {
727                            ALOGV("finish switch");
728                            for (size_t i = 0; i < kMaxStreams; ++i) {
729                                if ((mSwapMask & indexToType(i))
730                                        && uri == mStreams[i].mNewUri) {
731                                    // have to make a copy of mStreams[i].mUri because
732                                    // tryToFinishBandwidthSwitch is modifying mStreams[]
733                                    AString oldURI = mStreams[i].mUri;
734                                    tryToFinishBandwidthSwitch(oldURI);
735                                    break;
736                                }
737                            }
738                        } else {
739                            // startAt time is after last enqueue time
740                            // Resume fetcher for the original variant; the resumed fetcher should
741                            // continue until the timestamps found in msg, which is stored by the
742                            // new fetcher to indicate where the new variant has started buffering.
743                            ALOGV("finish switch with resumeUntilAsync");
744                            for (size_t i = 0; i < mFetcherInfos.size(); i++) {
745                                const FetcherInfo &info = mFetcherInfos.valueAt(i);
746                                if (info.mToBeRemoved) {
747                                    info.mFetcher->resumeUntilAsync(stopParams);
748                                }
749                            }
750                        }
751                    } else {
752                        // playback time passed startAt time
753                        if (switchUp) {
754                            // if switching up, cancel and retry if condition satisfies again
755                            ALOGV("cancel up switch because we're too late");
756                            cancelBandwidthSwitch(true /* resume */);
757                        } else {
758                            ALOGV("retry down switch at next sample");
759                            resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
760                        }
761                    }
762                    // re-enable all packet sources
763                    for (size_t i = 0; i < mPacketSources.size(); ++i) {
764                        mPacketSources.editValueAt(i)->enable(true);
765                    }
766
767                    break;
768                }
769
770                case PlaylistFetcher::kWhatMetadataDetected:
771                {
772                    if (!mHasMetadata) {
773                        mHasMetadata = true;
774                        sp<AMessage> notify = mNotify->dup();
775                        notify->setInt32("what", kWhatMetadataDetected);
776                        notify->post();
777                    }
778                    break;
779                }
780
781                default:
782                    TRESPASS();
783            }
784
785            break;
786        }
787
788        case kWhatChangeConfiguration:
789        {
790            onChangeConfiguration(msg);
791            break;
792        }
793
794        case kWhatChangeConfiguration2:
795        {
796            onChangeConfiguration2(msg);
797            break;
798        }
799
800        case kWhatChangeConfiguration3:
801        {
802            onChangeConfiguration3(msg);
803            break;
804        }
805
806        case kWhatFinishDisconnect2:
807        {
808            onFinishDisconnect2();
809            break;
810        }
811
812        case kWhatPollBuffering:
813        {
814            int32_t generation;
815            CHECK(msg->findInt32("generation", &generation));
816            if (generation == mPollBufferingGeneration) {
817                onPollBuffering();
818            }
819            break;
820        }
821
822        default:
823            TRESPASS();
824            break;
825    }
826}
827
828// static
829int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
830    if (a->mBandwidth < b->mBandwidth) {
831        return -1;
832    } else if (a->mBandwidth == b->mBandwidth) {
833        return 0;
834    }
835
836    return 1;
837}
838
839// static
840LiveSession::StreamType LiveSession::indexToType(int idx) {
841    CHECK(idx >= 0 && idx < kNumSources);
842    return (StreamType)(1 << idx);
843}
844
845// static
846ssize_t LiveSession::typeToIndex(int32_t type) {
847    switch (type) {
848        case STREAMTYPE_AUDIO:
849            return 0;
850        case STREAMTYPE_VIDEO:
851            return 1;
852        case STREAMTYPE_SUBTITLES:
853            return 2;
854        case STREAMTYPE_METADATA:
855            return 3;
856        default:
857            return -1;
858    };
859    return -1;
860}
861
862void LiveSession::onConnect(const sp<AMessage> &msg) {
863    AString url;
864    CHECK(msg->findString("url", &url));
865
866    KeyedVector<String8, String8> *headers = NULL;
867    if (!msg->findPointer("headers", (void **)&headers)) {
868        mExtraHeaders.clear();
869    } else {
870        mExtraHeaders = *headers;
871
872        delete headers;
873        headers = NULL;
874    }
875
876    // TODO currently we don't know if we are coming here from incognito mode
877    ALOGI("onConnect %s", uriDebugString(url).c_str());
878
879    mMasterURL = url;
880
881    bool dummy;
882    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
883
884    if (mPlaylist == NULL) {
885        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
886
887        postPrepared(ERROR_IO);
888        return;
889    }
890
891    // create looper for fetchers
892    if (mFetcherLooper == NULL) {
893        mFetcherLooper = new ALooper();
894
895        mFetcherLooper->setName("Fetcher");
896        mFetcherLooper->start(false, false);
897    }
898
899    // We trust the content provider to make a reasonable choice of preferred
900    // initial bandwidth by listing it first in the variant playlist.
901    // At startup we really don't have a good estimate on the available
902    // network bandwidth since we haven't tranferred any data yet. Once
903    // we have we can make a better informed choice.
904    size_t initialBandwidth = 0;
905    size_t initialBandwidthIndex = 0;
906
907    int32_t maxWidth = 0;
908    int32_t maxHeight = 0;
909
910    if (mPlaylist->isVariantPlaylist()) {
911        Vector<BandwidthItem> itemsWithVideo;
912        for (size_t i = 0; i < mPlaylist->size(); ++i) {
913            BandwidthItem item;
914
915            item.mPlaylistIndex = i;
916
917            sp<AMessage> meta;
918            AString uri;
919            mPlaylist->itemAt(i, &uri, &meta);
920
921            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
922
923            int32_t width, height;
924            if (meta->findInt32("width", &width)) {
925                maxWidth = max(maxWidth, width);
926            }
927            if (meta->findInt32("height", &height)) {
928                maxHeight = max(maxHeight, height);
929            }
930
931            mBandwidthItems.push(item);
932            if (mPlaylist->hasType(i, "video")) {
933                itemsWithVideo.push(item);
934            }
935        }
936        // remove the audio-only variants if we have at least one with video
937        if (!itemsWithVideo.empty()
938                && itemsWithVideo.size() < mBandwidthItems.size()) {
939            mBandwidthItems.clear();
940            for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
941                mBandwidthItems.push(itemsWithVideo[i]);
942            }
943        }
944
945        CHECK_GT(mBandwidthItems.size(), 0u);
946        initialBandwidth = mBandwidthItems[0].mBandwidth;
947
948        mBandwidthItems.sort(SortByBandwidth);
949
950        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
951            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
952                initialBandwidthIndex = i;
953                break;
954            }
955        }
956    } else {
957        // dummy item.
958        BandwidthItem item;
959        item.mPlaylistIndex = 0;
960        item.mBandwidth = 0;
961        mBandwidthItems.push(item);
962    }
963
964    mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
965    mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
966
967    mPlaylist->pickRandomMediaItems();
968    changeConfiguration(
969            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
970}
971
972void LiveSession::finishDisconnect() {
973    ALOGV("finishDisconnect");
974
975    // No reconfiguration is currently pending, make sure none will trigger
976    // during disconnection either.
977    cancelBandwidthSwitch();
978
979    // cancel buffer polling
980    cancelPollBuffering();
981
982    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
983        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
984    }
985
986    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
987
988    mContinuationCounter = mFetcherInfos.size();
989    mContinuation = msg;
990
991    if (mContinuationCounter == 0) {
992        msg->post();
993    }
994}
995
996void LiveSession::onFinishDisconnect2() {
997    mContinuation.clear();
998
999    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1000    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1001
1002    mPacketSources.valueFor(
1003            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1004
1005    sp<AMessage> response = new AMessage;
1006    response->setInt32("err", OK);
1007
1008    response->postReply(mDisconnectReplyID);
1009    mDisconnectReplyID.clear();
1010}
1011
1012sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1013    ssize_t index = mFetcherInfos.indexOfKey(uri);
1014
1015    if (index >= 0) {
1016        return NULL;
1017    }
1018
1019    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1020    notify->setString("uri", uri);
1021    notify->setInt32("switchGeneration", mSwitchGeneration);
1022
1023    FetcherInfo info;
1024    info.mFetcher = new PlaylistFetcher(
1025            notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1026    info.mDurationUs = -1ll;
1027    info.mToBeRemoved = false;
1028    info.mToBeResumed = false;
1029    mFetcherLooper->registerHandler(info.mFetcher);
1030
1031    mFetcherInfos.add(uri, info);
1032
1033    return info.mFetcher;
1034}
1035
1036/*
1037 * Illustration of parameters:
1038 *
1039 * 0      `range_offset`
1040 * +------------+-------------------------------------------------------+--+--+
1041 * |            |                                 | next block to fetch |  |  |
1042 * |            | `source` handle => `out` buffer |                     |  |  |
1043 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
1044 * |            |<----------- `range_length` / buffer capacity ----------->|  |
1045 * |<------------------------------ file_size ------------------------------->|
1046 *
1047 * Special parameter values:
1048 * - range_length == -1 means entire file
1049 * - block_size == 0 means entire range
1050 *
1051 */
1052ssize_t LiveSession::fetchFile(
1053        const char *url, sp<ABuffer> *out,
1054        int64_t range_offset, int64_t range_length,
1055        uint32_t block_size, /* download block size */
1056        sp<DataSource> *source, /* to return and reuse source */
1057        String8 *actualUrl,
1058        bool forceConnectHTTP /* force connect HTTP when resuing source */) {
1059    off64_t size;
1060    sp<DataSource> temp_source;
1061    if (source == NULL) {
1062        source = &temp_source;
1063    }
1064
1065    if (*source == NULL || forceConnectHTTP) {
1066        if (!strncasecmp(url, "file://", 7)) {
1067            *source = new FileSource(url + 7);
1068        } else if (strncasecmp(url, "http://", 7)
1069                && strncasecmp(url, "https://", 8)) {
1070            return ERROR_UNSUPPORTED;
1071        } else {
1072            KeyedVector<String8, String8> headers = mExtraHeaders;
1073            if (range_offset > 0 || range_length >= 0) {
1074                headers.add(
1075                        String8("Range"),
1076                        String8(
1077                            AStringPrintf(
1078                                "bytes=%lld-%s",
1079                                range_offset,
1080                                range_length < 0
1081                                    ? "" : AStringPrintf("%lld",
1082                                            range_offset + range_length - 1).c_str()).c_str()));
1083            }
1084
1085            HTTPBase* httpDataSource =
1086                    (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
1087            status_t err = httpDataSource->connect(url, &headers);
1088
1089            if (err != OK) {
1090                return err;
1091            }
1092
1093            if (*source == NULL) {
1094                *source = mHTTPDataSource;
1095            }
1096        }
1097    }
1098
1099    status_t getSizeErr = (*source)->getSize(&size);
1100    if (getSizeErr != OK) {
1101        size = 65536;
1102    }
1103
1104    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
1105    if (*out == NULL) {
1106        buffer->setRange(0, 0);
1107    }
1108
1109    ssize_t bytesRead = 0;
1110    // adjust range_length if only reading partial block
1111    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
1112        range_length = buffer->size() + block_size;
1113    }
1114    for (;;) {
1115        // Only resize when we don't know the size.
1116        size_t bufferRemaining = buffer->capacity() - buffer->size();
1117        if (bufferRemaining == 0 && getSizeErr != OK) {
1118            size_t bufferIncrement = buffer->size() / 2;
1119            if (bufferIncrement < 32768) {
1120                bufferIncrement = 32768;
1121            }
1122            bufferRemaining = bufferIncrement;
1123
1124            ALOGV("increasing download buffer to %zu bytes",
1125                 buffer->size() + bufferRemaining);
1126
1127            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
1128            memcpy(copy->data(), buffer->data(), buffer->size());
1129            copy->setRange(0, buffer->size());
1130
1131            buffer = copy;
1132        }
1133
1134        size_t maxBytesToRead = bufferRemaining;
1135        if (range_length >= 0) {
1136            int64_t bytesLeftInRange = range_length - buffer->size();
1137            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
1138                maxBytesToRead = bytesLeftInRange;
1139
1140                if (bytesLeftInRange == 0) {
1141                    break;
1142                }
1143            }
1144        }
1145
1146        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
1147        // to help us break out of the loop.
1148        ssize_t n = (*source)->readAt(
1149                buffer->size(), buffer->data() + buffer->size(),
1150                maxBytesToRead);
1151
1152        if (n < 0) {
1153            return n;
1154        }
1155
1156        if (n == 0) {
1157            break;
1158        }
1159
1160        buffer->setRange(0, buffer->size() + (size_t)n);
1161        bytesRead += n;
1162    }
1163
1164    *out = buffer;
1165    if (actualUrl != NULL) {
1166        *actualUrl = (*source)->getUri();
1167        if (actualUrl->isEmpty()) {
1168            *actualUrl = url;
1169        }
1170    }
1171
1172    return bytesRead;
1173}
1174
1175sp<M3UParser> LiveSession::fetchPlaylist(
1176        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
1177    ALOGV("fetchPlaylist '%s'", url);
1178
1179    *unchanged = false;
1180
1181    sp<ABuffer> buffer;
1182    String8 actualUrl;
1183    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
1184
1185    // close off the connection after use
1186    mHTTPDataSource->disconnect();
1187
1188    if (err <= 0) {
1189        return NULL;
1190    }
1191
1192    // MD5 functionality is not available on the simulator, treat all
1193    // playlists as changed.
1194
1195#if defined(HAVE_ANDROID_OS)
1196    uint8_t hash[16];
1197
1198    MD5_CTX m;
1199    MD5_Init(&m);
1200    MD5_Update(&m, buffer->data(), buffer->size());
1201
1202    MD5_Final(hash, &m);
1203
1204    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
1205        // playlist unchanged
1206        *unchanged = true;
1207
1208        return NULL;
1209    }
1210
1211    if (curPlaylistHash != NULL) {
1212        memcpy(curPlaylistHash, hash, sizeof(hash));
1213    }
1214#endif
1215
1216    sp<M3UParser> playlist =
1217        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
1218
1219    if (playlist->initCheck() != OK) {
1220        ALOGE("failed to parse .m3u8 playlist");
1221
1222        return NULL;
1223    }
1224
1225    return playlist;
1226}
1227
1228#if 0
1229static double uniformRand() {
1230    return (double)rand() / RAND_MAX;
1231}
1232#endif
1233
1234bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1235    ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1236            newUri ? "true" : "false",
1237            newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1238    return i >= 0
1239            && ((!newUri && uri == mStreams[i].mUri)
1240            || (newUri && uri == mStreams[i].mNewUri));
1241}
1242
1243sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1244        size_t trackIndex, bool newUri) {
1245    StreamType type = indexToType(trackIndex);
1246    sp<AnotherPacketSource> source = NULL;
1247    if (newUri) {
1248        source = mPacketSources2.valueFor(type);
1249        source->clear();
1250    } else {
1251        source = mPacketSources.valueFor(type);
1252    };
1253    return source;
1254}
1255
1256sp<AnotherPacketSource> LiveSession::getMetadataSource(
1257        sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1258    // todo: One case where the following strategy can fail is when audio and video
1259    // are in separate playlists, both are transport streams, and the metadata
1260    // is actually contained in the audio stream.
1261    ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1262            streamMask, newUri ? "true" : "false");
1263
1264    if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1265            || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1266            // ... audio fetcher for audio only variant
1267        return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1268    }
1269
1270    return NULL;
1271}
1272
1273bool LiveSession::resumeFetcher(
1274        const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1275    ssize_t index = mFetcherInfos.indexOfKey(uri);
1276    if (index < 0) {
1277        ALOGE("did not find fetcher for uri: %s", uri.c_str());
1278        return false;
1279    }
1280
1281    bool resume = false;
1282    sp<AnotherPacketSource> sources[kNumSources];
1283    for (size_t i = 0; i < kMaxStreams; ++i) {
1284        if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1285            resume = true;
1286            sources[i] = getPacketSourceForStreamIndex(i, newUri);
1287        }
1288    }
1289
1290    if (resume) {
1291        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1292        SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1293
1294        ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1295                fetcher->getFetcherID(), (long long)timeUs, seekMode);
1296
1297        fetcher->startAsync(
1298                sources[kAudioIndex],
1299                sources[kVideoIndex],
1300                sources[kSubtitleIndex],
1301                getMetadataSource(sources, streamMask, newUri),
1302                timeUs, -1, -1, seekMode);
1303    }
1304
1305    return resume;
1306}
1307
1308float LiveSession::getAbortThreshold(
1309        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1310    float abortThreshold = -1.0f;
1311    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1312        /*
1313           If we're switching down, we need to decide whether to
1314
1315           1) finish last segment of high-bandwidth variant, or
1316           2) abort last segment of high-bandwidth variant, and fetch an
1317              overlapping portion from low-bandwidth variant.
1318
1319           Here we try to maximize the amount of buffer left when the
1320           switch point is met. Given the following parameters:
1321
1322           B: our current buffering level in seconds
1323           T: target duration in seconds
1324           X: sample duration in seconds remain to fetch in last segment
1325           bw0: bandwidth of old variant (as specified in playlist)
1326           bw1: bandwidth of new variant (as specified in playlist)
1327           bw: measured bandwidth available
1328
1329           If we choose 1), when switch happens at the end of current
1330           segment, our buffering will be
1331                  B + X - X * bw0 / bw
1332
1333           If we choose 2), when switch happens where we aborted current
1334           segment, our buffering will be
1335                  B - (T - X) * bw1 / bw
1336
1337           We should only choose 1) if
1338                  X/T < bw1 / (bw1 + bw0 - bw)
1339        */
1340
1341        // Taking the measured current bandwidth at 50% face value only,
1342        // as our bandwidth estimation is a lagging indicator. Being
1343        // conservative on this, we prefer switching to lower bandwidth
1344        // unless we're really confident finishing up the last segment
1345        // of higher bandwidth will be fast.
1346        CHECK(mLastBandwidthBps >= 0);
1347        abortThreshold =
1348                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1349             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1350              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1351              - (float)mLastBandwidthBps * 0.5f);
1352        if (abortThreshold < 0.0f) {
1353            abortThreshold = -1.0f; // do not abort
1354        }
1355        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1356                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1357                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1358                mLastBandwidthBps,
1359                abortThreshold);
1360    }
1361    return abortThreshold;
1362}
1363
1364void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1365    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1366}
1367
1368size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1369    if (mBandwidthItems.size() < 2) {
1370        // shouldn't be here if we only have 1 bandwidth, check
1371        // logic to get rid of redundant bandwidth polling
1372        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1373        return 0;
1374    }
1375
1376#if 1
1377    char value[PROPERTY_VALUE_MAX];
1378    ssize_t index = -1;
1379    if (property_get("media.httplive.bw-index", value, NULL)) {
1380        char *end;
1381        index = strtol(value, &end, 10);
1382        CHECK(end > value && *end == '\0');
1383
1384        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1385            index = mBandwidthItems.size() - 1;
1386        }
1387    }
1388
1389    if (index < 0) {
1390        char value[PROPERTY_VALUE_MAX];
1391        if (property_get("media.httplive.max-bw", value, NULL)) {
1392            char *end;
1393            long maxBw = strtoul(value, &end, 10);
1394            if (end > value && *end == '\0') {
1395                if (maxBw > 0 && bandwidthBps > maxBw) {
1396                    ALOGV("bandwidth capped to %ld bps", maxBw);
1397                    bandwidthBps = maxBw;
1398                }
1399            }
1400        }
1401
1402        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1403
1404        index = mBandwidthItems.size() - 1;
1405        while (index > 0) {
1406            // be conservative (70%) to avoid overestimating and immediately
1407            // switching down again.
1408            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1409            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1410                break;
1411            }
1412            --index;
1413        }
1414    }
1415#elif 0
1416    // Change bandwidth at random()
1417    size_t index = uniformRand() * mBandwidthItems.size();
1418#elif 0
1419    // There's a 50% chance to stay on the current bandwidth and
1420    // a 50% chance to switch to the next higher bandwidth (wrapping around
1421    // to lowest)
1422    const size_t kMinIndex = 0;
1423
1424    static ssize_t mCurBandwidthIndex = -1;
1425
1426    size_t index;
1427    if (mCurBandwidthIndex < 0) {
1428        index = kMinIndex;
1429    } else if (uniformRand() < 0.5) {
1430        index = (size_t)mCurBandwidthIndex;
1431    } else {
1432        index = mCurBandwidthIndex + 1;
1433        if (index == mBandwidthItems.size()) {
1434            index = kMinIndex;
1435        }
1436    }
1437    mCurBandwidthIndex = index;
1438#elif 0
1439    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1440
1441    size_t index = mBandwidthItems.size() - 1;
1442    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1443        --index;
1444    }
1445#elif 1
1446    char value[PROPERTY_VALUE_MAX];
1447    size_t index;
1448    if (property_get("media.httplive.bw-index", value, NULL)) {
1449        char *end;
1450        index = strtoul(value, &end, 10);
1451        CHECK(end > value && *end == '\0');
1452
1453        if (index >= mBandwidthItems.size()) {
1454            index = mBandwidthItems.size() - 1;
1455        }
1456    } else {
1457        index = 0;
1458    }
1459#else
1460    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1461#endif
1462
1463    CHECK_GE(index, 0);
1464
1465    return index;
1466}
1467
1468HLSTime LiveSession::latestMediaSegmentStartTime() const {
1469    HLSTime audioTime(mPacketSources.valueFor(
1470                    STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1471
1472    HLSTime videoTime(mPacketSources.valueFor(
1473                    STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1474
1475    return audioTime < videoTime ? videoTime : audioTime;
1476}
1477
1478void LiveSession::onSeek(const sp<AMessage> &msg) {
1479    int64_t timeUs;
1480    CHECK(msg->findInt64("timeUs", &timeUs));
1481    changeConfiguration(timeUs);
1482}
1483
1484status_t LiveSession::getDuration(int64_t *durationUs) const {
1485    int64_t maxDurationUs = -1ll;
1486    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1487        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1488
1489        if (fetcherDurationUs > maxDurationUs) {
1490            maxDurationUs = fetcherDurationUs;
1491        }
1492    }
1493
1494    *durationUs = maxDurationUs;
1495
1496    return OK;
1497}
1498
1499bool LiveSession::isSeekable() const {
1500    int64_t durationUs;
1501    return getDuration(&durationUs) == OK && durationUs >= 0;
1502}
1503
1504bool LiveSession::hasDynamicDuration() const {
1505    return false;
1506}
1507
1508size_t LiveSession::getTrackCount() const {
1509    if (mPlaylist == NULL) {
1510        return 0;
1511    } else {
1512        return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1513    }
1514}
1515
1516sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1517    if (mPlaylist == NULL) {
1518        return NULL;
1519    } else {
1520        if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1521            sp<AMessage> format = new AMessage();
1522            format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1523            format->setString("language", "und");
1524            format->setString("mime", MEDIA_MIMETYPE_DATA_METADATA);
1525            return format;
1526        }
1527        return mPlaylist->getTrackInfo(trackIndex);
1528    }
1529}
1530
1531status_t LiveSession::selectTrack(size_t index, bool select) {
1532    if (mPlaylist == NULL) {
1533        return INVALID_OPERATION;
1534    }
1535
1536    ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1537            index, select, mSubtitleGeneration);
1538
1539    ++mSubtitleGeneration;
1540    status_t err = mPlaylist->selectTrack(index, select);
1541    if (err == OK) {
1542        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1543        msg->setInt32("pickTrack", select);
1544        msg->post();
1545    }
1546    return err;
1547}
1548
1549ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1550    if (mPlaylist == NULL) {
1551        return -1;
1552    } else {
1553        return mPlaylist->getSelectedTrack(type);
1554    }
1555}
1556
1557void LiveSession::changeConfiguration(
1558        int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1559    ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1560          (long long)timeUs, bandwidthIndex, pickTrack);
1561
1562    cancelBandwidthSwitch();
1563
1564    CHECK(!mReconfigurationInProgress);
1565    mReconfigurationInProgress = true;
1566    if (bandwidthIndex >= 0) {
1567        mOrigBandwidthIndex = mCurBandwidthIndex;
1568        mCurBandwidthIndex = bandwidthIndex;
1569        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1570            ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1571                    mOrigBandwidthIndex, mCurBandwidthIndex);
1572        }
1573    }
1574    CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1575    const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1576
1577    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1578    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1579
1580    AString URIs[kMaxStreams];
1581    for (size_t i = 0; i < kMaxStreams; ++i) {
1582        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1583            streamMask |= indexToType(i);
1584        }
1585    }
1586
1587    // Step 1, stop and discard fetchers that are no longer needed.
1588    // Pause those that we'll reuse.
1589    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1590        // skip fetchers that are marked mToBeRemoved,
1591        // these are done and can't be reused
1592        if (mFetcherInfos[i].mToBeRemoved) {
1593            continue;
1594        }
1595
1596        const AString &uri = mFetcherInfos.keyAt(i);
1597        sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1598
1599        bool discardFetcher = true, delayRemoval = false;
1600        for (size_t j = 0; j < kMaxStreams; ++j) {
1601            StreamType type = indexToType(j);
1602            if ((streamMask & type) && uri == URIs[j]) {
1603                resumeMask |= type;
1604                streamMask &= ~type;
1605                discardFetcher = false;
1606            }
1607        }
1608        // Delay fetcher removal if not picking tracks, AND old fetcher
1609        // has stream mask that overlaps new variant. (Okay to discard
1610        // old fetcher now, if completely no overlap.)
1611        if (discardFetcher && timeUs < 0ll && !pickTrack
1612                && (fetcher->getStreamTypeMask() & streamMask)) {
1613            discardFetcher = false;
1614            delayRemoval = true;
1615        }
1616
1617        if (discardFetcher) {
1618            ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1619            fetcher->stopAsync();
1620        } else {
1621            float threshold = -1.0f; // always finish fetching by default
1622            if (timeUs >= 0ll) {
1623                // seeking, no need to finish fetching
1624                threshold = 0.0f;
1625            } else if (delayRemoval) {
1626                // adapting, abort if remaining of current segment is over threshold
1627                threshold = getAbortThreshold(
1628                        mOrigBandwidthIndex, mCurBandwidthIndex);
1629            }
1630
1631            ALOGV("pausing fetcher-%d, threshold=%.2f",
1632                    fetcher->getFetcherID(), threshold);
1633            fetcher->pauseAsync(threshold);
1634        }
1635    }
1636
1637    sp<AMessage> msg;
1638    if (timeUs < 0ll) {
1639        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1640        msg = new AMessage(kWhatChangeConfiguration3, this);
1641    } else {
1642        msg = new AMessage(kWhatChangeConfiguration2, this);
1643    }
1644    msg->setInt32("streamMask", streamMask);
1645    msg->setInt32("resumeMask", resumeMask);
1646    msg->setInt32("pickTrack", pickTrack);
1647    msg->setInt64("timeUs", timeUs);
1648    for (size_t i = 0; i < kMaxStreams; ++i) {
1649        if ((streamMask | resumeMask) & indexToType(i)) {
1650            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1651        }
1652    }
1653
1654    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1655    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1656    // fetchers have completed their asynchronous operation, we'll post
1657    // mContinuation, which then is handled below in onChangeConfiguration2.
1658    mContinuationCounter = mFetcherInfos.size();
1659    mContinuation = msg;
1660
1661    if (mContinuationCounter == 0) {
1662        msg->post();
1663    }
1664}
1665
1666void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1667    ALOGV("onChangeConfiguration");
1668
1669    if (!mReconfigurationInProgress) {
1670        int32_t pickTrack = 0;
1671        msg->findInt32("pickTrack", &pickTrack);
1672        changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1673    } else {
1674        msg->post(1000000ll); // retry in 1 sec
1675    }
1676}
1677
1678void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1679    ALOGV("onChangeConfiguration2");
1680
1681    mContinuation.clear();
1682
1683    // All fetchers are either suspended or have been removed now.
1684
1685    // If we're seeking, clear all packet sources before we report
1686    // seek complete, to prevent decoder from pulling stale data.
1687    int64_t timeUs;
1688    CHECK(msg->findInt64("timeUs", &timeUs));
1689
1690    if (timeUs >= 0) {
1691        mLastSeekTimeUs = timeUs;
1692        mLastDequeuedTimeUs = timeUs;
1693
1694        for (size_t i = 0; i < mPacketSources.size(); i++) {
1695            mPacketSources.editValueAt(i)->clear();
1696        }
1697
1698        for (size_t i = 0; i < kMaxStreams; ++i) {
1699            mStreams[i].mCurDiscontinuitySeq = 0;
1700        }
1701
1702        mDiscontinuityOffsetTimesUs.clear();
1703        mDiscontinuityAbsStartTimesUs.clear();
1704
1705        if (mSeekReplyID != NULL) {
1706            CHECK(mSeekReply != NULL);
1707            mSeekReply->setInt32("err", OK);
1708            mSeekReply->postReply(mSeekReplyID);
1709            mSeekReplyID.clear();
1710            mSeekReply.clear();
1711        }
1712
1713        // restart buffer polling after seek becauese previous
1714        // buffering position is no longer valid.
1715        restartPollBuffering();
1716    }
1717
1718    uint32_t streamMask, resumeMask;
1719    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1720    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1721
1722    streamMask |= resumeMask;
1723
1724    AString URIs[kMaxStreams];
1725    for (size_t i = 0; i < kMaxStreams; ++i) {
1726        if (streamMask & indexToType(i)) {
1727            const AString &uriKey = mStreams[i].uriKey();
1728            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1729            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1730        }
1731    }
1732
1733    uint32_t changedMask = 0;
1734    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1735        // stream URI could change even if onChangeConfiguration2 is only
1736        // used for seek. Seek could happen during a bw switch, in this
1737        // case bw switch will be cancelled, but the seekTo position will
1738        // fetch from the new URI.
1739        if ((mStreamMask & streamMask & indexToType(i))
1740                && !mStreams[i].mUri.empty()
1741                && !(URIs[i] == mStreams[i].mUri)) {
1742            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1743                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1744            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1745            if (source->getLatestDequeuedMeta() != NULL) {
1746                source->queueDiscontinuity(
1747                        ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1748            }
1749        }
1750        // Determine which decoders to shutdown on the player side,
1751        // a decoder has to be shutdown if its streamtype was active
1752        // before but now longer isn't.
1753        if ((mStreamMask & ~streamMask & indexToType(i))) {
1754            changedMask |= indexToType(i);
1755        }
1756    }
1757
1758    if (changedMask == 0) {
1759        // If nothing changed as far as the audio/video decoders
1760        // are concerned we can proceed.
1761        onChangeConfiguration3(msg);
1762        return;
1763    }
1764
1765    // Something changed, inform the player which will shutdown the
1766    // corresponding decoders and will post the reply once that's done.
1767    // Handling the reply will continue executing below in
1768    // onChangeConfiguration3.
1769    sp<AMessage> notify = mNotify->dup();
1770    notify->setInt32("what", kWhatStreamsChanged);
1771    notify->setInt32("changedMask", changedMask);
1772
1773    msg->setWhat(kWhatChangeConfiguration3);
1774    msg->setTarget(this);
1775
1776    notify->setMessage("reply", msg);
1777    notify->post();
1778}
1779
1780void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1781    mContinuation.clear();
1782    // All remaining fetchers are still suspended, the player has shutdown
1783    // any decoders that needed it.
1784
1785    uint32_t streamMask, resumeMask;
1786    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1787    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1788
1789    mNewStreamMask = streamMask | resumeMask;
1790
1791    int64_t timeUs;
1792    int32_t pickTrack;
1793    bool switching = false;
1794    CHECK(msg->findInt64("timeUs", &timeUs));
1795    CHECK(msg->findInt32("pickTrack", &pickTrack));
1796
1797    if (timeUs < 0ll) {
1798        if (!pickTrack) {
1799            // mSwapMask contains streams that are in both old and new variant,
1800            // (in mNewStreamMask & mStreamMask) but with different URIs
1801            // (not in resumeMask).
1802            // For example, old variant has video and audio in two separate
1803            // URIs, and new variant has only audio with unchanged URI. mSwapMask
1804            // should be 0 as there is nothing to swap. We only need to stop video,
1805            // and resume audio.
1806            mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1807            switching = (mSwapMask != 0);
1808        }
1809        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1810    } else {
1811        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1812    }
1813
1814    ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1815            "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1816            (long long)timeUs, switching, pickTrack,
1817            mStreamMask, mNewStreamMask, mSwapMask);
1818
1819    for (size_t i = 0; i < kMaxStreams; ++i) {
1820        if (streamMask & indexToType(i)) {
1821            if (switching) {
1822                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1823            } else {
1824                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1825            }
1826        }
1827    }
1828
1829    // Of all existing fetchers:
1830    // * Resume fetchers that are still needed and assign them original packet sources.
1831    // * Mark otherwise unneeded fetchers for removal.
1832    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1833    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1834        const AString &uri = mFetcherInfos.keyAt(i);
1835        if (!resumeFetcher(uri, resumeMask, timeUs)) {
1836            ALOGV("marking fetcher-%d to be removed",
1837                    mFetcherInfos[i].mFetcher->getFetcherID());
1838
1839            mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1840        }
1841    }
1842
1843    // streamMask now only contains the types that need a new fetcher created.
1844    if (streamMask != 0) {
1845        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1846    }
1847
1848    // Find out when the original fetchers have buffered up to and start the new fetchers
1849    // at a later timestamp.
1850    for (size_t i = 0; i < kMaxStreams; i++) {
1851        if (!(indexToType(i) & streamMask)) {
1852            continue;
1853        }
1854
1855        AString uri;
1856        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1857
1858        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1859        CHECK(fetcher != NULL);
1860
1861        HLSTime startTime;
1862        SeekMode seekMode = kSeekModeExactPosition;
1863        sp<AnotherPacketSource> sources[kNumSources];
1864
1865        if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1866            startTime = latestMediaSegmentStartTime();
1867        }
1868
1869        // TRICKY: looping from i as earlier streams are already removed from streamMask
1870        for (size_t j = i; j < kMaxStreams; ++j) {
1871            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1872            if ((streamMask & indexToType(j)) && uri == streamUri) {
1873                sources[j] = mPacketSources.valueFor(indexToType(j));
1874
1875                if (timeUs >= 0) {
1876                    startTime.mTimeUs = timeUs;
1877                } else {
1878                    int32_t type;
1879                    sp<AMessage> meta;
1880                    if (!switching) {
1881                        // selecting, or adapting but no swap required
1882                        meta = sources[j]->getLatestDequeuedMeta();
1883                    } else {
1884                        // adapting and swap required
1885                        meta = sources[j]->getLatestEnqueuedMeta();
1886                        if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1887                            // switching up
1888                            meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1889                        }
1890                    }
1891
1892                    if ((j == kAudioIndex || j == kVideoIndex)
1893                            && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1894                        HLSTime tmpTime(meta);
1895                        if (startTime < tmpTime) {
1896                            startTime = tmpTime;
1897                        }
1898                    }
1899
1900                    if (!switching) {
1901                        // selecting, or adapting but no swap required
1902                        sources[j]->clear();
1903                        if (j == kSubtitleIndex) {
1904                            break;
1905                        }
1906
1907                        ALOGV("stream[%zu]: queue format change", j);
1908                        sources[j]->queueDiscontinuity(
1909                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1910                    } else {
1911                        // switching, queue discontinuities after resume
1912                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1913                        sources[j]->clear();
1914                        // the new fetcher might be providing streams that used to be
1915                        // provided by two different fetchers,  if one of the fetcher
1916                        // paused in the middle while the other somehow paused in next
1917                        // seg, we have to start from next seg.
1918                        if (seekMode < mStreams[j].mSeekMode) {
1919                            seekMode = mStreams[j].mSeekMode;
1920                        }
1921                    }
1922                }
1923
1924                streamMask &= ~indexToType(j);
1925            }
1926        }
1927
1928        ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1929                "segmentStartTimeUs %lld seekMode %d",
1930                fetcher->getFetcherID(),
1931                (long long)startTime.mTimeUs,
1932                (long long)mLastSeekTimeUs,
1933                (long long)startTime.getSegmentTimeUs(true /* midpoint */),
1934                seekMode);
1935
1936        // Set the target segment start time to the middle point of the
1937        // segment where the last sample was.
1938        // This gives a better guess if segments of the two variants are not
1939        // perfectly aligned. (If the corresponding segment in new variant
1940        // starts slightly later than that in the old variant, we still want
1941        // to pick that segment, not the one before)
1942        fetcher->startAsync(
1943                sources[kAudioIndex],
1944                sources[kVideoIndex],
1945                sources[kSubtitleIndex],
1946                getMetadataSource(sources, mNewStreamMask, switching),
1947                startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1948                startTime.getSegmentTimeUs(true /* midpoint */),
1949                startTime.mSeq,
1950                seekMode);
1951    }
1952
1953    // All fetchers have now been started, the configuration change
1954    // has completed.
1955
1956    mReconfigurationInProgress = false;
1957    if (switching) {
1958        mSwitchInProgress = true;
1959    } else {
1960        mStreamMask = mNewStreamMask;
1961        if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1962            ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1963                    mOrigBandwidthIndex, mCurBandwidthIndex);
1964            mOrigBandwidthIndex = mCurBandwidthIndex;
1965        }
1966    }
1967
1968    ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1969            mSwitchInProgress, mStreamMask);
1970
1971    if (mDisconnectReplyID != NULL) {
1972        finishDisconnect();
1973    }
1974}
1975
1976void LiveSession::swapPacketSource(StreamType stream) {
1977    ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1978
1979    // transfer packets from source2 to source
1980    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1981    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1982
1983    // queue discontinuity in mPacketSource
1984    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1985
1986    // queue packets in mPacketSource2 to mPacketSource
1987    status_t finalResult = OK;
1988    sp<ABuffer> accessUnit;
1989    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1990          OK == aps2->dequeueAccessUnit(&accessUnit)) {
1991        aps->queueAccessUnit(accessUnit);
1992    }
1993    aps2->clear();
1994}
1995
1996void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1997    if (!mSwitchInProgress) {
1998        return;
1999    }
2000
2001    ssize_t index = mFetcherInfos.indexOfKey(oldUri);
2002    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
2003        return;
2004    }
2005
2006    // Swap packet source of streams provided by old variant
2007    for (size_t idx = 0; idx < kMaxStreams; idx++) {
2008        StreamType stream = indexToType(idx);
2009        if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2010            swapPacketSource(stream);
2011
2012            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2013                ALOGW("swapping stream type %d %s to empty stream",
2014                        stream, mStreams[idx].mUri.c_str());
2015            }
2016            mStreams[idx].mUri = mStreams[idx].mNewUri;
2017            mStreams[idx].mNewUri.clear();
2018
2019            mSwapMask &= ~stream;
2020        }
2021    }
2022
2023    mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2024
2025    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2026    if (mSwapMask != 0) {
2027        return;
2028    }
2029
2030    // Check if new variant contains extra streams.
2031    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2032    while (extraStreams) {
2033        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2034        extraStreams &= ~stream;
2035
2036        swapPacketSource(stream);
2037
2038        ssize_t idx = typeToIndex(stream);
2039        CHECK(idx >= 0);
2040        if (mStreams[idx].mNewUri.empty()) {
2041            ALOGW("swapping extra stream type %d %s to empty stream",
2042                    stream, mStreams[idx].mUri.c_str());
2043        }
2044        mStreams[idx].mUri = mStreams[idx].mNewUri;
2045        mStreams[idx].mNewUri.clear();
2046    }
2047
2048    // Restart new fetcher (it was paused after the first 47k block)
2049    // and let it fetch into mPacketSources (not mPacketSources2)
2050    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2051        FetcherInfo &info = mFetcherInfos.editValueAt(i);
2052        if (info.mToBeResumed) {
2053            resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2054            info.mToBeResumed = false;
2055        }
2056    }
2057
2058    ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2059            mOrigBandwidthIndex, mCurBandwidthIndex);
2060
2061    mStreamMask = mNewStreamMask;
2062    mSwitchInProgress = false;
2063    mOrigBandwidthIndex = mCurBandwidthIndex;
2064
2065    restartPollBuffering();
2066}
2067
2068void LiveSession::schedulePollBuffering() {
2069    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2070    msg->setInt32("generation", mPollBufferingGeneration);
2071    msg->post(1000000ll);
2072}
2073
2074void LiveSession::cancelPollBuffering() {
2075    ++mPollBufferingGeneration;
2076    mPrevBufferPercentage = -1;
2077}
2078
2079void LiveSession::restartPollBuffering() {
2080    cancelPollBuffering();
2081    onPollBuffering();
2082}
2083
2084void LiveSession::onPollBuffering() {
2085    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2086            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2087        mSwitchInProgress, mReconfigurationInProgress,
2088        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2089
2090    bool underflow, ready, down, up;
2091    if (checkBuffering(underflow, ready, down, up)) {
2092        if (mInPreparationPhase) {
2093            // Allow down switch even if we're still preparing.
2094            //
2095            // Some streams have a high bandwidth index as default,
2096            // when bandwidth is low, it takes a long time to buffer
2097            // to ready mark, then it immediately pauses after start
2098            // as we have to do a down switch. It's better experience
2099            // to restart from a lower index, if we detect low bw.
2100            if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2101                postPrepared(OK);
2102            }
2103        }
2104
2105        if (!mInPreparationPhase) {
2106            if (ready) {
2107                stopBufferingIfNecessary();
2108            } else if (underflow) {
2109                startBufferingIfNecessary();
2110            }
2111            switchBandwidthIfNeeded(up, down);
2112        }
2113    }
2114
2115    schedulePollBuffering();
2116}
2117
2118void LiveSession::cancelBandwidthSwitch(bool resume) {
2119    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2120            mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2121    if (!mSwitchInProgress) {
2122        return;
2123    }
2124
2125    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2126        FetcherInfo& info = mFetcherInfos.editValueAt(i);
2127        if (info.mToBeRemoved) {
2128            info.mToBeRemoved = false;
2129            if (resume) {
2130                resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2131            }
2132        }
2133    }
2134
2135    for (size_t i = 0; i < kMaxStreams; ++i) {
2136        AString newUri = mStreams[i].mNewUri;
2137        if (!newUri.empty()) {
2138            // clear all mNewUri matching this newUri
2139            for (size_t j = i; j < kMaxStreams; ++j) {
2140                if (mStreams[j].mNewUri == newUri) {
2141                    mStreams[j].mNewUri.clear();
2142                }
2143            }
2144            ALOGV("stopping newUri = %s", newUri.c_str());
2145            ssize_t index = mFetcherInfos.indexOfKey(newUri);
2146            if (index < 0) {
2147                ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2148                continue;
2149            }
2150            FetcherInfo &info = mFetcherInfos.editValueAt(index);
2151            info.mToBeRemoved = true;
2152            info.mFetcher->stopAsync();
2153        }
2154    }
2155
2156    ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2157            mOrigBandwidthIndex, mCurBandwidthIndex);
2158
2159    mSwitchGeneration++;
2160    mSwitchInProgress = false;
2161    mCurBandwidthIndex = mOrigBandwidthIndex;
2162    mSwapMask = 0;
2163}
2164
2165bool LiveSession::checkBuffering(
2166        bool &underflow, bool &ready, bool &down, bool &up) {
2167    underflow = ready = down = up = false;
2168
2169    if (mReconfigurationInProgress) {
2170        ALOGV("Switch/Reconfig in progress, defer buffer polling");
2171        return false;
2172    }
2173
2174    size_t activeCount, underflowCount, readyCount, downCount, upCount;
2175    activeCount = underflowCount = readyCount = downCount = upCount =0;
2176    int32_t minBufferPercent = -1;
2177    int64_t durationUs;
2178    if (getDuration(&durationUs) != OK) {
2179        durationUs = -1;
2180    }
2181    for (size_t i = 0; i < mPacketSources.size(); ++i) {
2182        // we don't check subtitles for buffering level
2183        if (!(mStreamMask & mPacketSources.keyAt(i)
2184                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2185            continue;
2186        }
2187        // ignore streams that never had any packet queued.
2188        // (it's possible that the variant only has audio or video)
2189        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2190        if (meta == NULL) {
2191            continue;
2192        }
2193
2194        int64_t bufferedDurationUs =
2195                mPacketSources[i]->getEstimatedDurationUs();
2196        ALOGV("[%s] buffered %lld us",
2197                getNameForStream(mPacketSources.keyAt(i)),
2198                (long long)bufferedDurationUs);
2199        if (durationUs >= 0) {
2200            int32_t percent;
2201            if (mPacketSources[i]->isFinished(0 /* duration */)) {
2202                percent = 100;
2203            } else {
2204                percent = (int32_t)(100.0 *
2205                        (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2206            }
2207            if (minBufferPercent < 0 || percent < minBufferPercent) {
2208                minBufferPercent = percent;
2209            }
2210        }
2211
2212        ++activeCount;
2213        int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2214        if (bufferedDurationUs > readyMark
2215                || mPacketSources[i]->isFinished(0)) {
2216            ++readyCount;
2217        }
2218        if (!mPacketSources[i]->isFinished(0)) {
2219            if (bufferedDurationUs < kUnderflowMarkUs) {
2220                ++underflowCount;
2221            }
2222            if (bufferedDurationUs > mUpSwitchMark) {
2223                ++upCount;
2224            }
2225            if (bufferedDurationUs < mDownSwitchMark) {
2226                ++downCount;
2227            }
2228        }
2229    }
2230
2231    if (minBufferPercent >= 0) {
2232        notifyBufferingUpdate(minBufferPercent);
2233    }
2234
2235    if (activeCount > 0) {
2236        up        = (upCount == activeCount);
2237        down      = (downCount > 0);
2238        ready     = (readyCount == activeCount);
2239        underflow = (underflowCount > 0);
2240        return true;
2241    }
2242
2243    return false;
2244}
2245
2246void LiveSession::startBufferingIfNecessary() {
2247    ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2248            mInPreparationPhase, mBuffering);
2249    if (!mBuffering) {
2250        mBuffering = true;
2251
2252        sp<AMessage> notify = mNotify->dup();
2253        notify->setInt32("what", kWhatBufferingStart);
2254        notify->post();
2255    }
2256}
2257
2258void LiveSession::stopBufferingIfNecessary() {
2259    ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2260            mInPreparationPhase, mBuffering);
2261
2262    if (mBuffering) {
2263        mBuffering = false;
2264
2265        sp<AMessage> notify = mNotify->dup();
2266        notify->setInt32("what", kWhatBufferingEnd);
2267        notify->post();
2268    }
2269}
2270
2271void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2272    if (percentage < mPrevBufferPercentage) {
2273        percentage = mPrevBufferPercentage;
2274    } else if (percentage > 100) {
2275        percentage = 100;
2276    }
2277
2278    mPrevBufferPercentage = percentage;
2279
2280    ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2281
2282    sp<AMessage> notify = mNotify->dup();
2283    notify->setInt32("what", kWhatBufferingUpdate);
2284    notify->setInt32("percentage", percentage);
2285    notify->post();
2286}
2287
2288/*
2289 * returns true if a bandwidth switch is actually needed (and started),
2290 * returns false otherwise
2291 */
2292bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2293    // no need to check bandwidth if we only have 1 bandwidth settings
2294    if (mSwitchInProgress || mBandwidthItems.size() < 2) {
2295        return false;
2296    }
2297
2298    int32_t bandwidthBps;
2299    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
2300        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
2301        mLastBandwidthBps = bandwidthBps;
2302    } else {
2303        ALOGV("no bandwidth estimate.");
2304        return false;
2305    }
2306
2307    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2308    // canSwithDown and canSwitchUp can't both be true.
2309    // we only want to switch up when measured bw is 120% higher than current variant,
2310    // and we only want to switch down when measured bw is below current variant.
2311    bool canSwithDown = bufferLow
2312            && (bandwidthBps < (int32_t)curBandwidth);
2313    bool canSwitchUp = bufferHigh
2314            && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2315
2316    if (canSwithDown || canSwitchUp) {
2317        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2318
2319        // it's possible that we're checking for canSwitchUp case, but the returned
2320        // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2321        // of measured bw. In that case we don't want to do anything, since we have
2322        // both enough buffer and enough bw.
2323        if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2324         || (canSwithDown && bandwidthIndex < mCurBandwidthIndex)) {
2325            // if not yet prepared, just restart again with new bw index.
2326            // this is faster and playback experience is cleaner.
2327            changeConfiguration(
2328                    mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2329            return true;
2330        }
2331    }
2332    return false;
2333}
2334
2335void LiveSession::postError(status_t err) {
2336    // if we reached EOS, notify buffering of 100%
2337    if (err == ERROR_END_OF_STREAM) {
2338        notifyBufferingUpdate(100);
2339    }
2340    // we'll stop buffer polling now, before that notify
2341    // stop buffering to stop the spinning icon
2342    stopBufferingIfNecessary();
2343    cancelPollBuffering();
2344
2345    sp<AMessage> notify = mNotify->dup();
2346    notify->setInt32("what", kWhatError);
2347    notify->setInt32("err", err);
2348    notify->post();
2349}
2350
2351void LiveSession::postPrepared(status_t err) {
2352    CHECK(mInPreparationPhase);
2353
2354    sp<AMessage> notify = mNotify->dup();
2355    if (err == OK || err == ERROR_END_OF_STREAM) {
2356        notify->setInt32("what", kWhatPrepared);
2357    } else {
2358        cancelPollBuffering();
2359
2360        notify->setInt32("what", kWhatPreparationFailed);
2361        notify->setInt32("err", err);
2362    }
2363
2364    notify->post();
2365
2366    mInPreparationPhase = false;
2367}
2368
2369
2370}  // namespace android
2371
2372