LiveSession.cpp revision afcc4fcbb3a094ec2221d6e523772e76894d1f00
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52// Number of recently-read bytes to use for bandwidth estimation
53const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
54
55LiveSession::LiveSession(
56        const sp<AMessage> &notify, uint32_t flags,
57        const sp<IMediaHTTPService> &httpService)
58    : mNotify(notify),
59      mFlags(flags),
60      mHTTPService(httpService),
61      mInPreparationPhase(true),
62      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
63      mCurBandwidthIndex(-1),
64      mStreamMask(0),
65      mNewStreamMask(0),
66      mSwapMask(0),
67      mCheckBandwidthGeneration(0),
68      mSwitchGeneration(0),
69      mSubtitleGeneration(0),
70      mLastDequeuedTimeUs(0ll),
71      mRealTimeBaseUs(0ll),
72      mReconfigurationInProgress(false),
73      mSwitchInProgress(false),
74      mDisconnectReplyID(0),
75      mSeekReplyID(0),
76      mFirstTimeUsValid(false),
77      mFirstTimeUs(0),
78      mLastSeekTimeUs(0) {
79
80    mStreams[kAudioIndex] = StreamItem("audio");
81    mStreams[kVideoIndex] = StreamItem("video");
82    mStreams[kSubtitleIndex] = StreamItem("subtitles");
83
84    for (size_t i = 0; i < kMaxStreams; ++i) {
85        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
86        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
87        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
88        mBuffering[i] = false;
89    }
90
91    size_t numHistoryItems = kBandwidthHistoryBytes /
92            PlaylistFetcher::kDownloadBlockSize + 1;
93    if (numHistoryItems < 5) {
94        numHistoryItems = 5;
95    }
96    mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
97}
98
99LiveSession::~LiveSession() {
100}
101
102sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
103    ABuffer *discontinuity = new ABuffer(0);
104    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
105    discontinuity->meta()->setInt32("swapPacketSource", swap);
106    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
107    discontinuity->meta()->setInt64("timeUs", -1);
108    return discontinuity;
109}
110
111void LiveSession::swapPacketSource(StreamType stream) {
112    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
113    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
114    sp<AnotherPacketSource> tmp = aps;
115    aps = aps2;
116    aps2 = tmp;
117    aps2->clear();
118}
119
120status_t LiveSession::dequeueAccessUnit(
121        StreamType stream, sp<ABuffer> *accessUnit) {
122    if (!(mStreamMask & stream)) {
123        // return -EWOULDBLOCK to avoid halting the decoder
124        // when switching between audio/video and audio only.
125        return -EWOULDBLOCK;
126    }
127
128    status_t finalResult;
129    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
130    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
131        discontinuityQueue->dequeueAccessUnit(accessUnit);
132        // seeking, track switching
133        sp<AMessage> extra;
134        int64_t timeUs;
135        if ((*accessUnit)->meta()->findMessage("extra", &extra)
136                && extra != NULL
137                && extra->findInt64("timeUs", &timeUs)) {
138            // seeking only
139            mLastSeekTimeUs = timeUs;
140            mDiscontinuityOffsetTimesUs.clear();
141            mDiscontinuityAbsStartTimesUs.clear();
142        }
143        return INFO_DISCONTINUITY;
144    }
145
146    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
147
148    ssize_t idx = typeToIndex(stream);
149    if (!packetSource->hasBufferAvailable(&finalResult)) {
150        if (finalResult == OK) {
151            mBuffering[idx] = true;
152            return -EAGAIN;
153        } else {
154            return finalResult;
155        }
156    }
157
158    int32_t targetDuration = 0;
159    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
160    if (meta != NULL) {
161        meta->findInt32("targetDuration", &targetDuration);
162    }
163
164    int64_t targetDurationUs = targetDuration * 1000000ll;
165    if (targetDurationUs == 0 ||
166            targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
167        // Fetchers limit buffering to
168        // min(3 * targetDuration, kMinBufferedDurationUs)
169        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
170    }
171
172    if (mBuffering[idx]) {
173        if (mSwitchInProgress
174                || packetSource->isFinished(0)
175                || packetSource->getEstimatedDurationUs() > targetDurationUs) {
176            mBuffering[idx] = false;
177        }
178    }
179
180    if (mBuffering[idx]) {
181        return -EAGAIN;
182    }
183
184    // wait for counterpart
185    sp<AnotherPacketSource> otherSource;
186    uint32_t mask = mNewStreamMask & mStreamMask;
187    uint32_t fetchersMask  = 0;
188    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
189        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
190        fetchersMask |= fetcherMask;
191    }
192    mask &= fetchersMask;
193    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
194        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
195    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
196        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
197    }
198    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
199        return finalResult == OK ? -EAGAIN : finalResult;
200    }
201
202    status_t err = packetSource->dequeueAccessUnit(accessUnit);
203
204    size_t streamIdx;
205    const char *streamStr;
206    switch (stream) {
207        case STREAMTYPE_AUDIO:
208            streamIdx = kAudioIndex;
209            streamStr = "audio";
210            break;
211        case STREAMTYPE_VIDEO:
212            streamIdx = kVideoIndex;
213            streamStr = "video";
214            break;
215        case STREAMTYPE_SUBTITLES:
216            streamIdx = kSubtitleIndex;
217            streamStr = "subs";
218            break;
219        default:
220            TRESPASS();
221    }
222
223    StreamItem& strm = mStreams[streamIdx];
224    if (err == INFO_DISCONTINUITY) {
225        // adaptive streaming, discontinuities in the playlist
226        int32_t type;
227        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
228
229        sp<AMessage> extra;
230        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
231            extra.clear();
232        }
233
234        ALOGI("[%s] read discontinuity of type %d, extra = %s",
235              streamStr,
236              type,
237              extra == NULL ? "NULL" : extra->debugString().c_str());
238
239        int32_t swap;
240        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
241            int32_t switchGeneration;
242            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
243            {
244                Mutex::Autolock lock(mSwapMutex);
245                if (switchGeneration == mSwitchGeneration) {
246                    swapPacketSource(stream);
247                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
248                    msg->setInt32("stream", stream);
249                    msg->setInt32("switchGeneration", switchGeneration);
250                    msg->post();
251                }
252            }
253        } else {
254            size_t seq = strm.mCurDiscontinuitySeq;
255            int64_t offsetTimeUs;
256            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
257                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
258            } else {
259                offsetTimeUs = 0;
260            }
261
262            seq += 1;
263            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
264                int64_t firstTimeUs;
265                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
266                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
267                offsetTimeUs += strm.mLastSampleDurationUs;
268            } else {
269                offsetTimeUs += strm.mLastSampleDurationUs;
270            }
271
272            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
273        }
274    } else if (err == OK) {
275
276        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
277            int64_t timeUs;
278            int32_t discontinuitySeq = 0;
279            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
280            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
281            strm.mCurDiscontinuitySeq = discontinuitySeq;
282
283            int32_t discard = 0;
284            int64_t firstTimeUs;
285            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
286                int64_t durUs; // approximate sample duration
287                if (timeUs > strm.mLastDequeuedTimeUs) {
288                    durUs = timeUs - strm.mLastDequeuedTimeUs;
289                } else {
290                    durUs = strm.mLastDequeuedTimeUs - timeUs;
291                }
292                strm.mLastSampleDurationUs = durUs;
293                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
294            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
295                firstTimeUs = timeUs;
296            } else {
297                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
298                firstTimeUs = timeUs;
299            }
300
301            strm.mLastDequeuedTimeUs = timeUs;
302            if (timeUs >= firstTimeUs) {
303                timeUs -= firstTimeUs;
304            } else {
305                timeUs = 0;
306            }
307            timeUs += mLastSeekTimeUs;
308            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
309                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
310            }
311
312            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
313            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
314            mLastDequeuedTimeUs = timeUs;
315            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
316        } else if (stream == STREAMTYPE_SUBTITLES) {
317            int32_t subtitleGeneration;
318            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
319                    && subtitleGeneration != mSubtitleGeneration) {
320               return -EAGAIN;
321            };
322            (*accessUnit)->meta()->setInt32(
323                    "trackIndex", mPlaylist->getSelectedIndex());
324            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
325        }
326    } else {
327        ALOGI("[%s] encountered error %d", streamStr, err);
328    }
329
330    return err;
331}
332
333status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
334    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
335    if (!(mStreamMask & stream)) {
336        return UNKNOWN_ERROR;
337    }
338
339    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
340
341    sp<MetaData> meta = packetSource->getFormat();
342
343    if (meta == NULL) {
344        return -EAGAIN;
345    }
346
347    return convertMetaDataToMessage(meta, format);
348}
349
350void LiveSession::connectAsync(
351        const char *url, const KeyedVector<String8, String8> *headers) {
352    sp<AMessage> msg = new AMessage(kWhatConnect, id());
353    msg->setString("url", url);
354
355    if (headers != NULL) {
356        msg->setPointer(
357                "headers",
358                new KeyedVector<String8, String8>(*headers));
359    }
360
361    msg->post();
362}
363
364status_t LiveSession::disconnect() {
365    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
366
367    sp<AMessage> response;
368    status_t err = msg->postAndAwaitResponse(&response);
369
370    return err;
371}
372
373status_t LiveSession::seekTo(int64_t timeUs) {
374    sp<AMessage> msg = new AMessage(kWhatSeek, id());
375    msg->setInt64("timeUs", timeUs);
376
377    sp<AMessage> response;
378    status_t err = msg->postAndAwaitResponse(&response);
379
380    return err;
381}
382
383void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
384    switch (msg->what()) {
385        case kWhatConnect:
386        {
387            onConnect(msg);
388            break;
389        }
390
391        case kWhatDisconnect:
392        {
393            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
394
395            if (mReconfigurationInProgress) {
396                break;
397            }
398
399            finishDisconnect();
400            break;
401        }
402
403        case kWhatSeek:
404        {
405            uint32_t seekReplyID;
406            CHECK(msg->senderAwaitsResponse(&seekReplyID));
407            mSeekReplyID = seekReplyID;
408            mSeekReply = new AMessage;
409
410            status_t err = onSeek(msg);
411
412            if (err != OK) {
413                msg->post(50000);
414            }
415            break;
416        }
417
418        case kWhatFetcherNotify:
419        {
420            int32_t what;
421            CHECK(msg->findInt32("what", &what));
422
423            switch (what) {
424                case PlaylistFetcher::kWhatStarted:
425                    break;
426                case PlaylistFetcher::kWhatPaused:
427                case PlaylistFetcher::kWhatStopped:
428                {
429                    if (what == PlaylistFetcher::kWhatStopped) {
430                        AString uri;
431                        CHECK(msg->findString("uri", &uri));
432                        if (mFetcherInfos.removeItem(uri) < 0) {
433                            // ignore duplicated kWhatStopped messages.
434                            break;
435                        }
436
437                        if (mSwitchInProgress) {
438                            tryToFinishBandwidthSwitch();
439                        }
440                    }
441
442                    if (mContinuation != NULL) {
443                        CHECK_GT(mContinuationCounter, 0);
444                        if (--mContinuationCounter == 0) {
445                            mContinuation->post();
446
447                            if (mSeekReplyID != 0) {
448                                CHECK(mSeekReply != NULL);
449                                mSeekReply->setInt32("err", OK);
450                                mSeekReply->postReply(mSeekReplyID);
451                                mSeekReplyID = 0;
452                                mSeekReply.clear();
453                            }
454                        }
455                    }
456                    break;
457                }
458
459                case PlaylistFetcher::kWhatDurationUpdate:
460                {
461                    AString uri;
462                    CHECK(msg->findString("uri", &uri));
463
464                    int64_t durationUs;
465                    CHECK(msg->findInt64("durationUs", &durationUs));
466
467                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
468                    info->mDurationUs = durationUs;
469                    break;
470                }
471
472                case PlaylistFetcher::kWhatError:
473                {
474                    status_t err;
475                    CHECK(msg->findInt32("err", &err));
476
477                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
478
479                    // handle EOS on subtitle tracks independently
480                    AString uri;
481                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
482                        ssize_t i = mFetcherInfos.indexOfKey(uri);
483                        if (i >= 0) {
484                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
485                            if (fetcher != NULL) {
486                                uint32_t type = fetcher->getStreamTypeMask();
487                                if (type == STREAMTYPE_SUBTITLES) {
488                                    mPacketSources.valueFor(
489                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
490                                    break;
491                                }
492                            }
493                        }
494                    }
495
496                    if (mInPreparationPhase) {
497                        postPrepared(err);
498                    }
499
500                    cancelBandwidthSwitch();
501
502                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
503
504                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
505
506                    mPacketSources.valueFor(
507                            STREAMTYPE_SUBTITLES)->signalEOS(err);
508
509                    sp<AMessage> notify = mNotify->dup();
510                    notify->setInt32("what", kWhatError);
511                    notify->setInt32("err", err);
512                    notify->post();
513                    break;
514                }
515
516                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
517                {
518                    AString uri;
519                    CHECK(msg->findString("uri", &uri));
520
521                    if (mFetcherInfos.indexOfKey(uri) < 0) {
522                        ALOGE("couldn't find uri");
523                        break;
524                    }
525                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
526                    info->mIsPrepared = true;
527
528                    if (mInPreparationPhase) {
529                        bool allFetchersPrepared = true;
530                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
531                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
532                                allFetchersPrepared = false;
533                                break;
534                            }
535                        }
536
537                        if (allFetchersPrepared) {
538                            postPrepared(OK);
539                        }
540                    }
541                    break;
542                }
543
544                case PlaylistFetcher::kWhatStartedAt:
545                {
546                    int32_t switchGeneration;
547                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
548
549                    if (switchGeneration != mSwitchGeneration) {
550                        break;
551                    }
552
553                    // Resume fetcher for the original variant; the resumed fetcher should
554                    // continue until the timestamps found in msg, which is stored by the
555                    // new fetcher to indicate where the new variant has started buffering.
556                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
557                        const FetcherInfo info = mFetcherInfos.valueAt(i);
558                        if (info.mToBeRemoved) {
559                            info.mFetcher->resumeUntilAsync(msg);
560                        }
561                    }
562                    break;
563                }
564
565                default:
566                    TRESPASS();
567            }
568
569            break;
570        }
571
572        case kWhatCheckBandwidth:
573        {
574            int32_t generation;
575            CHECK(msg->findInt32("generation", &generation));
576
577            if (generation != mCheckBandwidthGeneration) {
578                break;
579            }
580
581            onCheckBandwidth(msg);
582            break;
583        }
584
585        case kWhatChangeConfiguration:
586        {
587            onChangeConfiguration(msg);
588            break;
589        }
590
591        case kWhatChangeConfiguration2:
592        {
593            onChangeConfiguration2(msg);
594            break;
595        }
596
597        case kWhatChangeConfiguration3:
598        {
599            onChangeConfiguration3(msg);
600            break;
601        }
602
603        case kWhatFinishDisconnect2:
604        {
605            onFinishDisconnect2();
606            break;
607        }
608
609        case kWhatSwapped:
610        {
611            onSwapped(msg);
612            break;
613        }
614
615        case kWhatCheckSwitchDown:
616        {
617            onCheckSwitchDown();
618            break;
619        }
620
621        case kWhatSwitchDown:
622        {
623            onSwitchDown();
624            break;
625        }
626
627        default:
628            TRESPASS();
629            break;
630    }
631}
632
633// static
634int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
635    if (a->mBandwidth < b->mBandwidth) {
636        return -1;
637    } else if (a->mBandwidth == b->mBandwidth) {
638        return 0;
639    }
640
641    return 1;
642}
643
644// static
645LiveSession::StreamType LiveSession::indexToType(int idx) {
646    CHECK(idx >= 0 && idx < kMaxStreams);
647    return (StreamType)(1 << idx);
648}
649
650// static
651ssize_t LiveSession::typeToIndex(int32_t type) {
652    switch (type) {
653        case STREAMTYPE_AUDIO:
654            return 0;
655        case STREAMTYPE_VIDEO:
656            return 1;
657        case STREAMTYPE_SUBTITLES:
658            return 2;
659        default:
660            return -1;
661    };
662    return -1;
663}
664
665void LiveSession::onConnect(const sp<AMessage> &msg) {
666    AString url;
667    CHECK(msg->findString("url", &url));
668
669    KeyedVector<String8, String8> *headers = NULL;
670    if (!msg->findPointer("headers", (void **)&headers)) {
671        mExtraHeaders.clear();
672    } else {
673        mExtraHeaders = *headers;
674
675        delete headers;
676        headers = NULL;
677    }
678
679    // TODO currently we don't know if we are coming here from incognito mode
680    ALOGI("onConnect %s", uriDebugString(url).c_str());
681
682    mMasterURL = url;
683
684    bool dummy;
685    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
686
687    if (mPlaylist == NULL) {
688        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
689
690        postPrepared(ERROR_IO);
691        return;
692    }
693
694    // We trust the content provider to make a reasonable choice of preferred
695    // initial bandwidth by listing it first in the variant playlist.
696    // At startup we really don't have a good estimate on the available
697    // network bandwidth since we haven't tranferred any data yet. Once
698    // we have we can make a better informed choice.
699    size_t initialBandwidth = 0;
700    size_t initialBandwidthIndex = 0;
701
702    if (mPlaylist->isVariantPlaylist()) {
703        for (size_t i = 0; i < mPlaylist->size(); ++i) {
704            BandwidthItem item;
705
706            item.mPlaylistIndex = i;
707
708            sp<AMessage> meta;
709            AString uri;
710            mPlaylist->itemAt(i, &uri, &meta);
711
712            unsigned long bandwidth;
713            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
714
715            if (initialBandwidth == 0) {
716                initialBandwidth = item.mBandwidth;
717            }
718
719            mBandwidthItems.push(item);
720        }
721
722        CHECK_GT(mBandwidthItems.size(), 0u);
723
724        mBandwidthItems.sort(SortByBandwidth);
725
726        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
727            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
728                initialBandwidthIndex = i;
729                break;
730            }
731        }
732    } else {
733        // dummy item.
734        BandwidthItem item;
735        item.mPlaylistIndex = 0;
736        item.mBandwidth = 0;
737        mBandwidthItems.push(item);
738    }
739
740    mPlaylist->pickRandomMediaItems();
741    changeConfiguration(
742            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
743}
744
745void LiveSession::finishDisconnect() {
746    // No reconfiguration is currently pending, make sure none will trigger
747    // during disconnection either.
748    cancelCheckBandwidthEvent();
749
750    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
751    // (finishDisconnect, onFinishDisconnect2)
752    cancelBandwidthSwitch();
753
754    // cancel switch down monitor
755    mSwitchDownMonitor.clear();
756
757    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
758        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
759    }
760
761    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
762
763    mContinuationCounter = mFetcherInfos.size();
764    mContinuation = msg;
765
766    if (mContinuationCounter == 0) {
767        msg->post();
768    }
769}
770
771void LiveSession::onFinishDisconnect2() {
772    mContinuation.clear();
773
774    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
775    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
776
777    mPacketSources.valueFor(
778            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
779
780    sp<AMessage> response = new AMessage;
781    response->setInt32("err", OK);
782
783    response->postReply(mDisconnectReplyID);
784    mDisconnectReplyID = 0;
785}
786
787sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
788    ssize_t index = mFetcherInfos.indexOfKey(uri);
789
790    if (index >= 0) {
791        return NULL;
792    }
793
794    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
795    notify->setString("uri", uri);
796    notify->setInt32("switchGeneration", mSwitchGeneration);
797
798    FetcherInfo info;
799    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
800    info.mDurationUs = -1ll;
801    info.mIsPrepared = false;
802    info.mToBeRemoved = false;
803    looper()->registerHandler(info.mFetcher);
804
805    mFetcherInfos.add(uri, info);
806
807    return info.mFetcher;
808}
809
810/*
811 * Illustration of parameters:
812 *
813 * 0      `range_offset`
814 * +------------+-------------------------------------------------------+--+--+
815 * |            |                                 | next block to fetch |  |  |
816 * |            | `source` handle => `out` buffer |                     |  |  |
817 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
818 * |            |<----------- `range_length` / buffer capacity ----------->|  |
819 * |<------------------------------ file_size ------------------------------->|
820 *
821 * Special parameter values:
822 * - range_length == -1 means entire file
823 * - block_size == 0 means entire range
824 *
825 */
826ssize_t LiveSession::fetchFile(
827        const char *url, sp<ABuffer> *out,
828        int64_t range_offset, int64_t range_length,
829        uint32_t block_size, /* download block size */
830        sp<DataSource> *source, /* to return and reuse source */
831        String8 *actualUrl) {
832    off64_t size;
833    sp<DataSource> temp_source;
834    if (source == NULL) {
835        source = &temp_source;
836    }
837
838    if (*source == NULL) {
839        if (!strncasecmp(url, "file://", 7)) {
840            *source = new FileSource(url + 7);
841        } else if (strncasecmp(url, "http://", 7)
842                && strncasecmp(url, "https://", 8)) {
843            return ERROR_UNSUPPORTED;
844        } else {
845            KeyedVector<String8, String8> headers = mExtraHeaders;
846            if (range_offset > 0 || range_length >= 0) {
847                headers.add(
848                        String8("Range"),
849                        String8(
850                            StringPrintf(
851                                "bytes=%lld-%s",
852                                range_offset,
853                                range_length < 0
854                                    ? "" : StringPrintf("%lld",
855                                            range_offset + range_length - 1).c_str()).c_str()));
856            }
857            status_t err = mHTTPDataSource->connect(url, &headers);
858
859            if (err != OK) {
860                return err;
861            }
862
863            *source = mHTTPDataSource;
864        }
865    }
866
867    status_t getSizeErr = (*source)->getSize(&size);
868    if (getSizeErr != OK) {
869        size = 65536;
870    }
871
872    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
873    if (*out == NULL) {
874        buffer->setRange(0, 0);
875    }
876
877    ssize_t bytesRead = 0;
878    // adjust range_length if only reading partial block
879    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
880        range_length = buffer->size() + block_size;
881    }
882    for (;;) {
883        // Only resize when we don't know the size.
884        size_t bufferRemaining = buffer->capacity() - buffer->size();
885        if (bufferRemaining == 0 && getSizeErr != OK) {
886            size_t bufferIncrement = buffer->size() / 2;
887            if (bufferIncrement < 32768) {
888                bufferIncrement = 32768;
889            }
890            bufferRemaining = bufferIncrement;
891
892            ALOGV("increasing download buffer to %zu bytes",
893                 buffer->size() + bufferRemaining);
894
895            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
896            memcpy(copy->data(), buffer->data(), buffer->size());
897            copy->setRange(0, buffer->size());
898
899            buffer = copy;
900        }
901
902        size_t maxBytesToRead = bufferRemaining;
903        if (range_length >= 0) {
904            int64_t bytesLeftInRange = range_length - buffer->size();
905            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
906                maxBytesToRead = bytesLeftInRange;
907
908                if (bytesLeftInRange == 0) {
909                    break;
910                }
911            }
912        }
913
914        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
915        // to help us break out of the loop.
916        ssize_t n = (*source)->readAt(
917                buffer->size(), buffer->data() + buffer->size(),
918                maxBytesToRead);
919
920        if (n < 0) {
921            return n;
922        }
923
924        if (n == 0) {
925            break;
926        }
927
928        buffer->setRange(0, buffer->size() + (size_t)n);
929        bytesRead += n;
930    }
931
932    *out = buffer;
933    if (actualUrl != NULL) {
934        *actualUrl = (*source)->getUri();
935        if (actualUrl->isEmpty()) {
936            *actualUrl = url;
937        }
938    }
939
940    return bytesRead;
941}
942
943sp<M3UParser> LiveSession::fetchPlaylist(
944        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
945    ALOGV("fetchPlaylist '%s'", url);
946
947    *unchanged = false;
948
949    sp<ABuffer> buffer;
950    String8 actualUrl;
951    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
952
953    if (err <= 0) {
954        return NULL;
955    }
956
957    // MD5 functionality is not available on the simulator, treat all
958    // playlists as changed.
959
960#if defined(HAVE_ANDROID_OS)
961    uint8_t hash[16];
962
963    MD5_CTX m;
964    MD5_Init(&m);
965    MD5_Update(&m, buffer->data(), buffer->size());
966
967    MD5_Final(hash, &m);
968
969    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
970        // playlist unchanged
971        *unchanged = true;
972
973        return NULL;
974    }
975
976    if (curPlaylistHash != NULL) {
977        memcpy(curPlaylistHash, hash, sizeof(hash));
978    }
979#endif
980
981    sp<M3UParser> playlist =
982        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
983
984    if (playlist->initCheck() != OK) {
985        ALOGE("failed to parse .m3u8 playlist");
986
987        return NULL;
988    }
989
990    return playlist;
991}
992
993static double uniformRand() {
994    return (double)rand() / RAND_MAX;
995}
996
997size_t LiveSession::getBandwidthIndex() {
998    if (mBandwidthItems.size() == 0) {
999        return 0;
1000    }
1001
1002#if 1
1003    char value[PROPERTY_VALUE_MAX];
1004    ssize_t index = -1;
1005    if (property_get("media.httplive.bw-index", value, NULL)) {
1006        char *end;
1007        index = strtol(value, &end, 10);
1008        CHECK(end > value && *end == '\0');
1009
1010        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1011            index = mBandwidthItems.size() - 1;
1012        }
1013    }
1014
1015    if (index < 0) {
1016        int32_t bandwidthBps;
1017        if (mHTTPDataSource != NULL
1018                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
1019            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
1020        } else {
1021            ALOGV("no bandwidth estimate.");
1022            return 0;  // Pick the lowest bandwidth stream by default.
1023        }
1024
1025        char value[PROPERTY_VALUE_MAX];
1026        if (property_get("media.httplive.max-bw", value, NULL)) {
1027            char *end;
1028            long maxBw = strtoul(value, &end, 10);
1029            if (end > value && *end == '\0') {
1030                if (maxBw > 0 && bandwidthBps > maxBw) {
1031                    ALOGV("bandwidth capped to %ld bps", maxBw);
1032                    bandwidthBps = maxBw;
1033                }
1034            }
1035        }
1036
1037        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1038
1039        index = mBandwidthItems.size() - 1;
1040        while (index > 0) {
1041            // consider only 80% of the available bandwidth, but if we are switching up,
1042            // be even more conservative (70%) to avoid overestimating and immediately
1043            // switching back.
1044            size_t adjustedBandwidthBps = bandwidthBps;
1045            if (index > mCurBandwidthIndex) {
1046                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1047            } else {
1048                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1049            }
1050            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1051                break;
1052            }
1053            --index;
1054        }
1055    }
1056#elif 0
1057    // Change bandwidth at random()
1058    size_t index = uniformRand() * mBandwidthItems.size();
1059#elif 0
1060    // There's a 50% chance to stay on the current bandwidth and
1061    // a 50% chance to switch to the next higher bandwidth (wrapping around
1062    // to lowest)
1063    const size_t kMinIndex = 0;
1064
1065    static ssize_t mCurBandwidthIndex = -1;
1066
1067    size_t index;
1068    if (mCurBandwidthIndex < 0) {
1069        index = kMinIndex;
1070    } else if (uniformRand() < 0.5) {
1071        index = (size_t)mCurBandwidthIndex;
1072    } else {
1073        index = mCurBandwidthIndex + 1;
1074        if (index == mBandwidthItems.size()) {
1075            index = kMinIndex;
1076        }
1077    }
1078    mCurBandwidthIndex = index;
1079#elif 0
1080    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1081
1082    size_t index = mBandwidthItems.size() - 1;
1083    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1084        --index;
1085    }
1086#elif 1
1087    char value[PROPERTY_VALUE_MAX];
1088    size_t index;
1089    if (property_get("media.httplive.bw-index", value, NULL)) {
1090        char *end;
1091        index = strtoul(value, &end, 10);
1092        CHECK(end > value && *end == '\0');
1093
1094        if (index >= mBandwidthItems.size()) {
1095            index = mBandwidthItems.size() - 1;
1096        }
1097    } else {
1098        index = 0;
1099    }
1100#else
1101    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1102#endif
1103
1104    CHECK_GE(index, 0);
1105
1106    return index;
1107}
1108
1109int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1110    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1111    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1112    if (audioMeta != NULL) {
1113        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1114    }
1115
1116    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1117    if (videoMeta != NULL
1118            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1119        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1120            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1121        }
1122
1123    }
1124    return minSegmentStartTimeUs;
1125}
1126
1127status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1128    int64_t timeUs;
1129    CHECK(msg->findInt64("timeUs", &timeUs));
1130
1131    if (!mReconfigurationInProgress) {
1132        changeConfiguration(timeUs, mCurBandwidthIndex);
1133        return OK;
1134    } else {
1135        return -EWOULDBLOCK;
1136    }
1137}
1138
1139status_t LiveSession::getDuration(int64_t *durationUs) const {
1140    int64_t maxDurationUs = -1ll;
1141    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1142        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1143
1144        if (fetcherDurationUs > maxDurationUs) {
1145            maxDurationUs = fetcherDurationUs;
1146        }
1147    }
1148
1149    *durationUs = maxDurationUs;
1150
1151    return OK;
1152}
1153
1154bool LiveSession::isSeekable() const {
1155    int64_t durationUs;
1156    return getDuration(&durationUs) == OK && durationUs >= 0;
1157}
1158
1159bool LiveSession::hasDynamicDuration() const {
1160    return false;
1161}
1162
1163size_t LiveSession::getTrackCount() const {
1164    if (mPlaylist == NULL) {
1165        return 0;
1166    } else {
1167        return mPlaylist->getTrackCount();
1168    }
1169}
1170
1171sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1172    if (mPlaylist == NULL) {
1173        return NULL;
1174    } else {
1175        return mPlaylist->getTrackInfo(trackIndex);
1176    }
1177}
1178
1179status_t LiveSession::selectTrack(size_t index, bool select) {
1180    if (mPlaylist == NULL) {
1181        return INVALID_OPERATION;
1182    }
1183
1184    ++mSubtitleGeneration;
1185    status_t err = mPlaylist->selectTrack(index, select);
1186    if (err == OK) {
1187        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1188        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1189        msg->setInt32("pickTrack", select);
1190        msg->post();
1191    }
1192    return err;
1193}
1194
1195ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1196    if (mPlaylist == NULL) {
1197        return -1;
1198    } else {
1199        return mPlaylist->getSelectedTrack(type);
1200    }
1201}
1202
1203bool LiveSession::canSwitchUp() {
1204    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1205    status_t err = OK;
1206    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1207        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1208        int64_t dur = source->getBufferedDurationUs(&err);
1209        if (err == OK && dur > 10000000) {
1210            return true;
1211        }
1212    }
1213    return false;
1214}
1215
1216void LiveSession::changeConfiguration(
1217        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1218    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1219    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1220    cancelBandwidthSwitch();
1221
1222    CHECK(!mReconfigurationInProgress);
1223    mReconfigurationInProgress = true;
1224
1225    mCurBandwidthIndex = bandwidthIndex;
1226
1227    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1228          timeUs, bandwidthIndex, pickTrack);
1229
1230    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1231    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1232
1233    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1234    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1235
1236    AString URIs[kMaxStreams];
1237    for (size_t i = 0; i < kMaxStreams; ++i) {
1238        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1239            streamMask |= indexToType(i);
1240        }
1241    }
1242
1243    // Step 1, stop and discard fetchers that are no longer needed.
1244    // Pause those that we'll reuse.
1245    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1246        const AString &uri = mFetcherInfos.keyAt(i);
1247
1248        bool discardFetcher = true;
1249
1250        // If we're seeking all current fetchers are discarded.
1251        if (timeUs < 0ll) {
1252            // delay fetcher removal if not picking tracks
1253            discardFetcher = pickTrack;
1254
1255            for (size_t j = 0; j < kMaxStreams; ++j) {
1256                StreamType type = indexToType(j);
1257                if ((streamMask & type) && uri == URIs[j]) {
1258                    resumeMask |= type;
1259                    streamMask &= ~type;
1260                    discardFetcher = false;
1261                }
1262            }
1263        }
1264
1265        if (discardFetcher) {
1266            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1267        } else {
1268            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1269        }
1270    }
1271
1272    sp<AMessage> msg;
1273    if (timeUs < 0ll) {
1274        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1275        msg = new AMessage(kWhatChangeConfiguration3, id());
1276    } else {
1277        msg = new AMessage(kWhatChangeConfiguration2, id());
1278    }
1279    msg->setInt32("streamMask", streamMask);
1280    msg->setInt32("resumeMask", resumeMask);
1281    msg->setInt32("pickTrack", pickTrack);
1282    msg->setInt64("timeUs", timeUs);
1283    for (size_t i = 0; i < kMaxStreams; ++i) {
1284        if ((streamMask | resumeMask) & indexToType(i)) {
1285            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1286        }
1287    }
1288
1289    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1290    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1291    // fetchers have completed their asynchronous operation, we'll post
1292    // mContinuation, which then is handled below in onChangeConfiguration2.
1293    mContinuationCounter = mFetcherInfos.size();
1294    mContinuation = msg;
1295
1296    if (mContinuationCounter == 0) {
1297        msg->post();
1298
1299        if (mSeekReplyID != 0) {
1300            CHECK(mSeekReply != NULL);
1301            mSeekReply->setInt32("err", OK);
1302            mSeekReply->postReply(mSeekReplyID);
1303            mSeekReplyID = 0;
1304            mSeekReply.clear();
1305        }
1306    }
1307}
1308
1309void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1310    if (!mReconfigurationInProgress) {
1311        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1312        msg->findInt32("pickTrack", &pickTrack);
1313        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1314        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1315    } else {
1316        msg->post(1000000ll); // retry in 1 sec
1317    }
1318}
1319
1320void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1321    mContinuation.clear();
1322
1323    // All fetchers are either suspended or have been removed now.
1324
1325    uint32_t streamMask, resumeMask;
1326    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1327    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1328
1329    // currently onChangeConfiguration2 is only called for seeking;
1330    // remove the following CHECK if using it else where.
1331    CHECK_EQ(resumeMask, 0);
1332    streamMask |= resumeMask;
1333
1334    AString URIs[kMaxStreams];
1335    for (size_t i = 0; i < kMaxStreams; ++i) {
1336        if (streamMask & indexToType(i)) {
1337            const AString &uriKey = mStreams[i].uriKey();
1338            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1339            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1340        }
1341    }
1342
1343    // Determine which decoders to shutdown on the player side,
1344    // a decoder has to be shutdown if either
1345    // 1) its streamtype was active before but now longer isn't.
1346    // or
1347    // 2) its streamtype was already active and still is but the URI
1348    //    has changed.
1349    uint32_t changedMask = 0;
1350    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1351        if (((mStreamMask & streamMask & indexToType(i))
1352                && !(URIs[i] == mStreams[i].mUri))
1353                || (mStreamMask & ~streamMask & indexToType(i))) {
1354            changedMask |= indexToType(i);
1355        }
1356    }
1357
1358    if (changedMask == 0) {
1359        // If nothing changed as far as the audio/video decoders
1360        // are concerned we can proceed.
1361        onChangeConfiguration3(msg);
1362        return;
1363    }
1364
1365    // Something changed, inform the player which will shutdown the
1366    // corresponding decoders and will post the reply once that's done.
1367    // Handling the reply will continue executing below in
1368    // onChangeConfiguration3.
1369    sp<AMessage> notify = mNotify->dup();
1370    notify->setInt32("what", kWhatStreamsChanged);
1371    notify->setInt32("changedMask", changedMask);
1372
1373    msg->setWhat(kWhatChangeConfiguration3);
1374    msg->setTarget(id());
1375
1376    notify->setMessage("reply", msg);
1377    notify->post();
1378}
1379
1380void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1381    mContinuation.clear();
1382    // All remaining fetchers are still suspended, the player has shutdown
1383    // any decoders that needed it.
1384
1385    uint32_t streamMask, resumeMask;
1386    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1387    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1388
1389    int64_t timeUs;
1390    int32_t pickTrack;
1391    bool switching = false;
1392    CHECK(msg->findInt64("timeUs", &timeUs));
1393    CHECK(msg->findInt32("pickTrack", &pickTrack));
1394
1395    if (timeUs < 0ll) {
1396        if (!pickTrack) {
1397            switching = true;
1398        }
1399        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1400    } else {
1401        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1402    }
1403
1404    for (size_t i = 0; i < kMaxStreams; ++i) {
1405        if (streamMask & indexToType(i)) {
1406            if (switching) {
1407                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1408            } else {
1409                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1410            }
1411        }
1412    }
1413
1414    mNewStreamMask = streamMask | resumeMask;
1415    if (switching) {
1416        mSwapMask = mStreamMask & ~resumeMask;
1417    }
1418
1419    // Of all existing fetchers:
1420    // * Resume fetchers that are still needed and assign them original packet sources.
1421    // * Mark otherwise unneeded fetchers for removal.
1422    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1423    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1424        const AString &uri = mFetcherInfos.keyAt(i);
1425
1426        sp<AnotherPacketSource> sources[kMaxStreams];
1427        for (size_t j = 0; j < kMaxStreams; ++j) {
1428            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1429                sources[j] = mPacketSources.valueFor(indexToType(j));
1430
1431                if (j != kSubtitleIndex) {
1432                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1433                    sp<AnotherPacketSource> discontinuityQueue;
1434                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1435                    discontinuityQueue->queueDiscontinuity(
1436                            ATSParser::DISCONTINUITY_NONE,
1437                            NULL,
1438                            true);
1439                }
1440            }
1441        }
1442
1443        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1444        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1445                || sources[kSubtitleIndex] != NULL) {
1446            info.mFetcher->startAsync(
1447                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1448        } else {
1449            info.mToBeRemoved = true;
1450        }
1451    }
1452
1453    // streamMask now only contains the types that need a new fetcher created.
1454
1455    if (streamMask != 0) {
1456        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1457    }
1458
1459    // Find out when the original fetchers have buffered up to and start the new fetchers
1460    // at a later timestamp.
1461    for (size_t i = 0; i < kMaxStreams; i++) {
1462        if (!(indexToType(i) & streamMask)) {
1463            continue;
1464        }
1465
1466        AString uri;
1467        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1468
1469        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1470        CHECK(fetcher != NULL);
1471
1472        int32_t latestSeq = -1;
1473        int64_t startTimeUs = -1;
1474        int64_t segmentStartTimeUs = -1ll;
1475        int32_t discontinuitySeq = -1;
1476        sp<AnotherPacketSource> sources[kMaxStreams];
1477
1478        if (i == kSubtitleIndex) {
1479            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1480        }
1481
1482        // TRICKY: looping from i as earlier streams are already removed from streamMask
1483        for (size_t j = i; j < kMaxStreams; ++j) {
1484            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1485            if ((streamMask & indexToType(j)) && uri == streamUri) {
1486                sources[j] = mPacketSources.valueFor(indexToType(j));
1487
1488                if (timeUs >= 0) {
1489                    sources[j]->clear();
1490                    startTimeUs = timeUs;
1491
1492                    sp<AnotherPacketSource> discontinuityQueue;
1493                    sp<AMessage> extra = new AMessage;
1494                    extra->setInt64("timeUs", timeUs);
1495                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1496                    discontinuityQueue->queueDiscontinuity(
1497                            ATSParser::DISCONTINUITY_TIME, extra, true);
1498                } else {
1499                    int32_t type;
1500                    int64_t srcSegmentStartTimeUs;
1501                    sp<AMessage> meta;
1502                    if (pickTrack) {
1503                        // selecting
1504                        meta = sources[j]->getLatestDequeuedMeta();
1505                    } else {
1506                        // adapting
1507                        meta = sources[j]->getLatestEnqueuedMeta();
1508                    }
1509
1510                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1511                        int64_t tmpUs;
1512                        int64_t tmpSegmentUs;
1513
1514                        CHECK(meta->findInt64("timeUs", &tmpUs));
1515                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
1516                        if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
1517                            startTimeUs = tmpUs;
1518                            segmentStartTimeUs = tmpSegmentUs;
1519                        } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
1520                            startTimeUs = tmpUs;
1521                        }
1522
1523                        int32_t seq;
1524                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1525                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1526                            discontinuitySeq = seq;
1527                        }
1528                    }
1529
1530                    if (pickTrack) {
1531                        // selecting track, queue discontinuities before content
1532                        sources[j]->clear();
1533                        if (j == kSubtitleIndex) {
1534                            break;
1535                        }
1536                        sp<AnotherPacketSource> discontinuityQueue;
1537                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1538                        discontinuityQueue->queueDiscontinuity(
1539                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1540                    } else {
1541                        // adapting, queue discontinuities after resume
1542                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1543                        sources[j]->clear();
1544                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1545                        if (extraStreams & indexToType(j)) {
1546                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1547                        }
1548                    }
1549                }
1550
1551                streamMask &= ~indexToType(j);
1552            }
1553        }
1554
1555        fetcher->startAsync(
1556                sources[kAudioIndex],
1557                sources[kVideoIndex],
1558                sources[kSubtitleIndex],
1559                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1560                segmentStartTimeUs,
1561                discontinuitySeq,
1562                switching);
1563    }
1564
1565    // All fetchers have now been started, the configuration change
1566    // has completed.
1567
1568    cancelCheckBandwidthEvent();
1569    scheduleCheckBandwidthEvent();
1570
1571    ALOGV("XXX configuration change completed.");
1572    mReconfigurationInProgress = false;
1573    if (switching) {
1574        mSwitchInProgress = true;
1575    } else {
1576        mStreamMask = mNewStreamMask;
1577    }
1578
1579    if (mDisconnectReplyID != 0) {
1580        finishDisconnect();
1581    }
1582}
1583
1584void LiveSession::onSwapped(const sp<AMessage> &msg) {
1585    int32_t switchGeneration;
1586    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1587    if (switchGeneration != mSwitchGeneration) {
1588        return;
1589    }
1590
1591    int32_t stream;
1592    CHECK(msg->findInt32("stream", &stream));
1593
1594    ssize_t idx = typeToIndex(stream);
1595    CHECK(idx >= 0);
1596    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1597        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1598    }
1599    mStreams[idx].mUri = mStreams[idx].mNewUri;
1600    mStreams[idx].mNewUri.clear();
1601
1602    mSwapMask &= ~stream;
1603    if (mSwapMask != 0) {
1604        return;
1605    }
1606
1607    // Check if new variant contains extra streams.
1608    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1609    while (extraStreams) {
1610        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1611        swapPacketSource(extraStream);
1612        extraStreams &= ~extraStream;
1613
1614        idx = typeToIndex(extraStream);
1615        CHECK(idx >= 0);
1616        if (mStreams[idx].mNewUri.empty()) {
1617            ALOGW("swapping extra stream type %d %s to empty stream",
1618                    extraStream, mStreams[idx].mUri.c_str());
1619        }
1620        mStreams[idx].mUri = mStreams[idx].mNewUri;
1621        mStreams[idx].mNewUri.clear();
1622    }
1623
1624    tryToFinishBandwidthSwitch();
1625}
1626
1627void LiveSession::onCheckSwitchDown() {
1628    if (mSwitchDownMonitor == NULL) {
1629        return;
1630    }
1631
1632    for (size_t i = 0; i < kMaxStreams; ++i) {
1633        int32_t targetDuration;
1634        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1635        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1636
1637        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1638            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1639            int64_t targetDurationUs = targetDuration * 1000000ll;
1640
1641            if (bufferedDurationUs < targetDurationUs / 3) {
1642                (new AMessage(kWhatSwitchDown, id()))->post();
1643                break;
1644            }
1645        }
1646    }
1647
1648    mSwitchDownMonitor->post(1000000ll);
1649}
1650
1651void LiveSession::onSwitchDown() {
1652    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1653        return;
1654    }
1655
1656    ssize_t bandwidthIndex = getBandwidthIndex();
1657    if (bandwidthIndex < mCurBandwidthIndex) {
1658        changeConfiguration(-1, bandwidthIndex, false);
1659        return;
1660    }
1661
1662    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1663}
1664
1665// Mark switch done when:
1666//   1. all old buffers are swapped out
1667void LiveSession::tryToFinishBandwidthSwitch() {
1668    if (!mSwitchInProgress) {
1669        return;
1670    }
1671
1672    bool needToRemoveFetchers = false;
1673    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1674        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1675            needToRemoveFetchers = true;
1676            break;
1677        }
1678    }
1679
1680    if (!needToRemoveFetchers && mSwapMask == 0) {
1681        ALOGI("mSwitchInProgress = false");
1682        mStreamMask = mNewStreamMask;
1683        mSwitchInProgress = false;
1684    }
1685}
1686
1687void LiveSession::scheduleCheckBandwidthEvent() {
1688    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1689    msg->setInt32("generation", mCheckBandwidthGeneration);
1690    msg->post(10000000ll);
1691}
1692
1693void LiveSession::cancelCheckBandwidthEvent() {
1694    ++mCheckBandwidthGeneration;
1695}
1696
1697void LiveSession::cancelBandwidthSwitch() {
1698    Mutex::Autolock lock(mSwapMutex);
1699    mSwitchGeneration++;
1700    mSwitchInProgress = false;
1701    mSwapMask = 0;
1702
1703    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1704        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1705        if (info.mToBeRemoved) {
1706            info.mToBeRemoved = false;
1707        }
1708    }
1709
1710    for (size_t i = 0; i < kMaxStreams; ++i) {
1711        if (!mStreams[i].mNewUri.empty()) {
1712            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1713            if (j < 0) {
1714                mStreams[i].mNewUri.clear();
1715                continue;
1716            }
1717
1718            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1719            info.mFetcher->stopAsync();
1720            mFetcherInfos.removeItemsAt(j);
1721            mStreams[i].mNewUri.clear();
1722        }
1723    }
1724}
1725
1726bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1727    if (mReconfigurationInProgress || mSwitchInProgress) {
1728        return false;
1729    }
1730
1731    if (mCurBandwidthIndex < 0) {
1732        return true;
1733    }
1734
1735    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1736        return false;
1737    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1738        return canSwitchUp();
1739    } else {
1740        return true;
1741    }
1742}
1743
1744void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1745    size_t bandwidthIndex = getBandwidthIndex();
1746    if (canSwitchBandwidthTo(bandwidthIndex)) {
1747        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1748    } else {
1749        // Come back and check again 10 seconds later in case there is nothing to do now.
1750        // If we DO change configuration, once that completes it'll schedule a new
1751        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1752        msg->post(10000000ll);
1753    }
1754}
1755
1756void LiveSession::postPrepared(status_t err) {
1757    CHECK(mInPreparationPhase);
1758
1759    sp<AMessage> notify = mNotify->dup();
1760    if (err == OK || err == ERROR_END_OF_STREAM) {
1761        notify->setInt32("what", kWhatPrepared);
1762    } else {
1763        notify->setInt32("what", kWhatPreparationFailed);
1764        notify->setInt32("err", err);
1765    }
1766
1767    notify->post();
1768
1769    mInPreparationPhase = false;
1770
1771    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1772    mSwitchDownMonitor->post();
1773}
1774
1775}  // namespace android
1776
1777