LiveSession.cpp revision 00598ec0b15426197494aaf9e5ec0bc88507c762
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84    }
85}
86
87LiveSession::~LiveSession() {
88}
89
90sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
91    ABuffer *discontinuity = new ABuffer(0);
92    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
93    discontinuity->meta()->setInt32("swapPacketSource", swap);
94    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
95    discontinuity->meta()->setInt64("timeUs", -1);
96    return discontinuity;
97}
98
99void LiveSession::swapPacketSource(StreamType stream) {
100    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
101    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
102    sp<AnotherPacketSource> tmp = aps;
103    aps = aps2;
104    aps2 = tmp;
105    aps2->clear();
106}
107
108status_t LiveSession::dequeueAccessUnit(
109        StreamType stream, sp<ABuffer> *accessUnit) {
110    if (!(mStreamMask & stream)) {
111        // return -EWOULDBLOCK to avoid halting the decoder
112        // when switching between audio/video and audio only.
113        return -EWOULDBLOCK;
114    }
115
116    status_t finalResult;
117    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
118    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
119        discontinuityQueue->dequeueAccessUnit(accessUnit);
120        // seeking, track switching
121        sp<AMessage> extra;
122        int64_t timeUs;
123        if ((*accessUnit)->meta()->findMessage("extra", &extra)
124                && extra != NULL
125                && extra->findInt64("timeUs", &timeUs)) {
126            // seeking only
127            mLastSeekTimeUs = timeUs;
128            mDiscontinuityOffsetTimesUs.clear();
129            mDiscontinuityAbsStartTimesUs.clear();
130        }
131        return INFO_DISCONTINUITY;
132    }
133
134    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
135
136    if (!packetSource->hasBufferAvailable(&finalResult)) {
137        return finalResult == OK ? -EAGAIN : finalResult;
138    }
139
140    // wait for counterpart
141    sp<AnotherPacketSource> otherSource;
142    if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) {
143        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
144    } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) {
145        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
146    }
147    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
148        return finalResult == OK ? -EAGAIN : finalResult;
149    }
150
151    status_t err = packetSource->dequeueAccessUnit(accessUnit);
152
153    size_t streamIdx;
154    const char *streamStr;
155    switch (stream) {
156        case STREAMTYPE_AUDIO:
157            streamIdx = kAudioIndex;
158            streamStr = "audio";
159            break;
160        case STREAMTYPE_VIDEO:
161            streamIdx = kVideoIndex;
162            streamStr = "video";
163            break;
164        case STREAMTYPE_SUBTITLES:
165            streamIdx = kSubtitleIndex;
166            streamStr = "subs";
167            break;
168        default:
169            TRESPASS();
170    }
171
172    StreamItem& strm = mStreams[streamIdx];
173    if (err == INFO_DISCONTINUITY) {
174        // adaptive streaming, discontinuities in the playlist
175        int32_t type;
176        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
177
178        sp<AMessage> extra;
179        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
180            extra.clear();
181        }
182
183        ALOGI("[%s] read discontinuity of type %d, extra = %s",
184              streamStr,
185              type,
186              extra == NULL ? "NULL" : extra->debugString().c_str());
187
188        int32_t swap;
189        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
190            int32_t switchGeneration;
191            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
192            {
193                Mutex::Autolock lock(mSwapMutex);
194                if (switchGeneration == mSwitchGeneration) {
195                    swapPacketSource(stream);
196                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
197                    msg->setInt32("stream", stream);
198                    msg->setInt32("switchGeneration", switchGeneration);
199                    msg->post();
200                }
201            }
202        } else {
203            size_t seq = strm.mCurDiscontinuitySeq;
204            int64_t offsetTimeUs;
205            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
206                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
207            } else {
208                offsetTimeUs = 0;
209            }
210
211            seq += 1;
212            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
213                int64_t firstTimeUs;
214                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
215                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
216                offsetTimeUs += strm.mLastSampleDurationUs;
217            } else {
218                offsetTimeUs += strm.mLastSampleDurationUs;
219            }
220
221            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
222        }
223    } else if (err == OK) {
224
225        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
226            int64_t timeUs;
227            int32_t discontinuitySeq = 0;
228            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
229            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
230            strm.mCurDiscontinuitySeq = discontinuitySeq;
231
232            int32_t discard = 0;
233            int64_t firstTimeUs;
234            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
235                int64_t durUs; // approximate sample duration
236                if (timeUs > strm.mLastDequeuedTimeUs) {
237                    durUs = timeUs - strm.mLastDequeuedTimeUs;
238                } else {
239                    durUs = strm.mLastDequeuedTimeUs - timeUs;
240                }
241                strm.mLastSampleDurationUs = durUs;
242                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
243            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
244                firstTimeUs = timeUs;
245            } else {
246                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
247                firstTimeUs = timeUs;
248            }
249
250            strm.mLastDequeuedTimeUs = timeUs;
251            if (timeUs >= firstTimeUs) {
252                timeUs -= firstTimeUs;
253            } else {
254                timeUs = 0;
255            }
256            timeUs += mLastSeekTimeUs;
257            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
258                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
259            }
260
261            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
262            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
263            mLastDequeuedTimeUs = timeUs;
264            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
265        } else if (stream == STREAMTYPE_SUBTITLES) {
266            (*accessUnit)->meta()->setInt32(
267                    "trackIndex", mPlaylist->getSelectedIndex());
268            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
269        }
270    } else {
271        ALOGI("[%s] encountered error %d", streamStr, err);
272    }
273
274    return err;
275}
276
277status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
278    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
279    if (!(mStreamMask & stream)) {
280        return UNKNOWN_ERROR;
281    }
282
283    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
284
285    sp<MetaData> meta = packetSource->getFormat();
286
287    if (meta == NULL) {
288        return -EAGAIN;
289    }
290
291    return convertMetaDataToMessage(meta, format);
292}
293
294void LiveSession::connectAsync(
295        const char *url, const KeyedVector<String8, String8> *headers) {
296    sp<AMessage> msg = new AMessage(kWhatConnect, id());
297    msg->setString("url", url);
298
299    if (headers != NULL) {
300        msg->setPointer(
301                "headers",
302                new KeyedVector<String8, String8>(*headers));
303    }
304
305    msg->post();
306}
307
308status_t LiveSession::disconnect() {
309    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
310
311    sp<AMessage> response;
312    status_t err = msg->postAndAwaitResponse(&response);
313
314    return err;
315}
316
317status_t LiveSession::seekTo(int64_t timeUs) {
318    sp<AMessage> msg = new AMessage(kWhatSeek, id());
319    msg->setInt64("timeUs", timeUs);
320
321    sp<AMessage> response;
322    status_t err = msg->postAndAwaitResponse(&response);
323
324    uint32_t replyID;
325    CHECK(response == mSeekReply && 0 != mSeekReplyID);
326    mSeekReply.clear();
327    mSeekReplyID = 0;
328    return err;
329}
330
331void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
332    switch (msg->what()) {
333        case kWhatConnect:
334        {
335            onConnect(msg);
336            break;
337        }
338
339        case kWhatDisconnect:
340        {
341            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
342
343            if (mReconfigurationInProgress) {
344                break;
345            }
346
347            finishDisconnect();
348            break;
349        }
350
351        case kWhatSeek:
352        {
353            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
354
355            status_t err = onSeek(msg);
356
357            mSeekReply = new AMessage;
358            mSeekReply->setInt32("err", err);
359            break;
360        }
361
362        case kWhatFetcherNotify:
363        {
364            int32_t what;
365            CHECK(msg->findInt32("what", &what));
366
367            switch (what) {
368                case PlaylistFetcher::kWhatStarted:
369                    break;
370                case PlaylistFetcher::kWhatPaused:
371                case PlaylistFetcher::kWhatStopped:
372                {
373                    if (what == PlaylistFetcher::kWhatStopped) {
374                        AString uri;
375                        CHECK(msg->findString("uri", &uri));
376                        if (mFetcherInfos.removeItem(uri) < 0) {
377                            // ignore duplicated kWhatStopped messages.
378                            break;
379                        }
380
381                        if (mSwitchInProgress) {
382                            tryToFinishBandwidthSwitch();
383                        }
384                    }
385
386                    if (mContinuation != NULL) {
387                        CHECK_GT(mContinuationCounter, 0);
388                        if (--mContinuationCounter == 0) {
389                            mContinuation->post();
390
391                            if (mSeekReplyID != 0) {
392                                CHECK(mSeekReply != NULL);
393                                mSeekReply->postReply(mSeekReplyID);
394                            }
395                        }
396                    }
397                    break;
398                }
399
400                case PlaylistFetcher::kWhatDurationUpdate:
401                {
402                    AString uri;
403                    CHECK(msg->findString("uri", &uri));
404
405                    int64_t durationUs;
406                    CHECK(msg->findInt64("durationUs", &durationUs));
407
408                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
409                    info->mDurationUs = durationUs;
410                    break;
411                }
412
413                case PlaylistFetcher::kWhatError:
414                {
415                    status_t err;
416                    CHECK(msg->findInt32("err", &err));
417
418                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
419
420                    if (mInPreparationPhase) {
421                        postPrepared(err);
422                    }
423
424                    cancelBandwidthSwitch();
425
426                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
427
428                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
429
430                    mPacketSources.valueFor(
431                            STREAMTYPE_SUBTITLES)->signalEOS(err);
432
433                    sp<AMessage> notify = mNotify->dup();
434                    notify->setInt32("what", kWhatError);
435                    notify->setInt32("err", err);
436                    notify->post();
437                    break;
438                }
439
440                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
441                {
442                    AString uri;
443                    CHECK(msg->findString("uri", &uri));
444
445                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
446                    info->mIsPrepared = true;
447
448                    if (mInPreparationPhase) {
449                        bool allFetchersPrepared = true;
450                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
451                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
452                                allFetchersPrepared = false;
453                                break;
454                            }
455                        }
456
457                        if (allFetchersPrepared) {
458                            postPrepared(OK);
459                        }
460                    }
461                    break;
462                }
463
464                case PlaylistFetcher::kWhatStartedAt:
465                {
466                    int32_t switchGeneration;
467                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
468
469                    if (switchGeneration != mSwitchGeneration) {
470                        break;
471                    }
472
473                    // Resume fetcher for the original variant; the resumed fetcher should
474                    // continue until the timestamps found in msg, which is stored by the
475                    // new fetcher to indicate where the new variant has started buffering.
476                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
477                        const FetcherInfo info = mFetcherInfos.valueAt(i);
478                        if (info.mToBeRemoved) {
479                            info.mFetcher->resumeUntilAsync(msg);
480                        }
481                    }
482                    break;
483                }
484
485                default:
486                    TRESPASS();
487            }
488
489            break;
490        }
491
492        case kWhatCheckBandwidth:
493        {
494            int32_t generation;
495            CHECK(msg->findInt32("generation", &generation));
496
497            if (generation != mCheckBandwidthGeneration) {
498                break;
499            }
500
501            onCheckBandwidth(msg);
502            break;
503        }
504
505        case kWhatChangeConfiguration:
506        {
507            onChangeConfiguration(msg);
508            break;
509        }
510
511        case kWhatChangeConfiguration2:
512        {
513            onChangeConfiguration2(msg);
514            break;
515        }
516
517        case kWhatChangeConfiguration3:
518        {
519            onChangeConfiguration3(msg);
520            break;
521        }
522
523        case kWhatFinishDisconnect2:
524        {
525            onFinishDisconnect2();
526            break;
527        }
528
529        case kWhatSwapped:
530        {
531            onSwapped(msg);
532            break;
533        }
534        default:
535            TRESPASS();
536            break;
537    }
538}
539
540// static
541int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
542    if (a->mBandwidth < b->mBandwidth) {
543        return -1;
544    } else if (a->mBandwidth == b->mBandwidth) {
545        return 0;
546    }
547
548    return 1;
549}
550
551// static
552LiveSession::StreamType LiveSession::indexToType(int idx) {
553    CHECK(idx >= 0 && idx < kMaxStreams);
554    return (StreamType)(1 << idx);
555}
556
557void LiveSession::onConnect(const sp<AMessage> &msg) {
558    AString url;
559    CHECK(msg->findString("url", &url));
560
561    KeyedVector<String8, String8> *headers = NULL;
562    if (!msg->findPointer("headers", (void **)&headers)) {
563        mExtraHeaders.clear();
564    } else {
565        mExtraHeaders = *headers;
566
567        delete headers;
568        headers = NULL;
569    }
570
571    // TODO currently we don't know if we are coming here from incognito mode
572    ALOGI("onConnect %s", uriDebugString(url).c_str());
573
574    mMasterURL = url;
575
576    bool dummy;
577    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
578
579    if (mPlaylist == NULL) {
580        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
581
582        postPrepared(ERROR_IO);
583        return;
584    }
585
586    // We trust the content provider to make a reasonable choice of preferred
587    // initial bandwidth by listing it first in the variant playlist.
588    // At startup we really don't have a good estimate on the available
589    // network bandwidth since we haven't tranferred any data yet. Once
590    // we have we can make a better informed choice.
591    size_t initialBandwidth = 0;
592    size_t initialBandwidthIndex = 0;
593
594    if (mPlaylist->isVariantPlaylist()) {
595        for (size_t i = 0; i < mPlaylist->size(); ++i) {
596            BandwidthItem item;
597
598            item.mPlaylistIndex = i;
599
600            sp<AMessage> meta;
601            AString uri;
602            mPlaylist->itemAt(i, &uri, &meta);
603
604            unsigned long bandwidth;
605            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
606
607            if (initialBandwidth == 0) {
608                initialBandwidth = item.mBandwidth;
609            }
610
611            mBandwidthItems.push(item);
612        }
613
614        CHECK_GT(mBandwidthItems.size(), 0u);
615
616        mBandwidthItems.sort(SortByBandwidth);
617
618        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
619            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
620                initialBandwidthIndex = i;
621                break;
622            }
623        }
624    } else {
625        // dummy item.
626        BandwidthItem item;
627        item.mPlaylistIndex = 0;
628        item.mBandwidth = 0;
629        mBandwidthItems.push(item);
630    }
631
632    mPlaylist->pickRandomMediaItems();
633    changeConfiguration(
634            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
635}
636
637void LiveSession::finishDisconnect() {
638    // No reconfiguration is currently pending, make sure none will trigger
639    // during disconnection either.
640    cancelCheckBandwidthEvent();
641
642    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
643    // (finishDisconnect, onFinishDisconnect2)
644    cancelBandwidthSwitch();
645
646    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
647        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
648    }
649
650    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
651
652    mContinuationCounter = mFetcherInfos.size();
653    mContinuation = msg;
654
655    if (mContinuationCounter == 0) {
656        msg->post();
657    }
658}
659
660void LiveSession::onFinishDisconnect2() {
661    mContinuation.clear();
662
663    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
664    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
665
666    mPacketSources.valueFor(
667            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
668
669    sp<AMessage> response = new AMessage;
670    response->setInt32("err", OK);
671
672    response->postReply(mDisconnectReplyID);
673    mDisconnectReplyID = 0;
674}
675
676sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
677    ssize_t index = mFetcherInfos.indexOfKey(uri);
678
679    if (index >= 0) {
680        return NULL;
681    }
682
683    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
684    notify->setString("uri", uri);
685    notify->setInt32("switchGeneration", mSwitchGeneration);
686
687    FetcherInfo info;
688    info.mFetcher = new PlaylistFetcher(notify, this, uri);
689    info.mDurationUs = -1ll;
690    info.mIsPrepared = false;
691    info.mToBeRemoved = false;
692    looper()->registerHandler(info.mFetcher);
693
694    mFetcherInfos.add(uri, info);
695
696    return info.mFetcher;
697}
698
699/*
700 * Illustration of parameters:
701 *
702 * 0      `range_offset`
703 * +------------+-------------------------------------------------------+--+--+
704 * |            |                                 | next block to fetch |  |  |
705 * |            | `source` handle => `out` buffer |                     |  |  |
706 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
707 * |            |<----------- `range_length` / buffer capacity ----------->|  |
708 * |<------------------------------ file_size ------------------------------->|
709 *
710 * Special parameter values:
711 * - range_length == -1 means entire file
712 * - block_size == 0 means entire range
713 *
714 */
715ssize_t LiveSession::fetchFile(
716        const char *url, sp<ABuffer> *out,
717        int64_t range_offset, int64_t range_length,
718        uint32_t block_size, /* download block size */
719        sp<DataSource> *source, /* to return and reuse source */
720        String8 *actualUrl) {
721    off64_t size;
722    sp<DataSource> temp_source;
723    if (source == NULL) {
724        source = &temp_source;
725    }
726
727    if (*source == NULL) {
728        if (!strncasecmp(url, "file://", 7)) {
729            *source = new FileSource(url + 7);
730        } else if (strncasecmp(url, "http://", 7)
731                && strncasecmp(url, "https://", 8)) {
732            return ERROR_UNSUPPORTED;
733        } else {
734            KeyedVector<String8, String8> headers = mExtraHeaders;
735            if (range_offset > 0 || range_length >= 0) {
736                headers.add(
737                        String8("Range"),
738                        String8(
739                            StringPrintf(
740                                "bytes=%lld-%s",
741                                range_offset,
742                                range_length < 0
743                                    ? "" : StringPrintf("%lld",
744                                            range_offset + range_length - 1).c_str()).c_str()));
745            }
746            status_t err = mHTTPDataSource->connect(url, &headers);
747
748            if (err != OK) {
749                return err;
750            }
751
752            *source = mHTTPDataSource;
753        }
754    }
755
756    status_t getSizeErr = (*source)->getSize(&size);
757    if (getSizeErr != OK) {
758        size = 65536;
759    }
760
761    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
762    if (*out == NULL) {
763        buffer->setRange(0, 0);
764    }
765
766    ssize_t bytesRead = 0;
767    // adjust range_length if only reading partial block
768    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
769        range_length = buffer->size() + block_size;
770    }
771    for (;;) {
772        // Only resize when we don't know the size.
773        size_t bufferRemaining = buffer->capacity() - buffer->size();
774        if (bufferRemaining == 0 && getSizeErr != OK) {
775            bufferRemaining = 32768;
776
777            ALOGV("increasing download buffer to %zu bytes",
778                 buffer->size() + bufferRemaining);
779
780            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
781            memcpy(copy->data(), buffer->data(), buffer->size());
782            copy->setRange(0, buffer->size());
783
784            buffer = copy;
785        }
786
787        size_t maxBytesToRead = bufferRemaining;
788        if (range_length >= 0) {
789            int64_t bytesLeftInRange = range_length - buffer->size();
790            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
791                maxBytesToRead = bytesLeftInRange;
792
793                if (bytesLeftInRange == 0) {
794                    break;
795                }
796            }
797        }
798
799        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
800        // to help us break out of the loop.
801        ssize_t n = (*source)->readAt(
802                buffer->size(), buffer->data() + buffer->size(),
803                maxBytesToRead);
804
805        if (n < 0) {
806            return n;
807        }
808
809        if (n == 0) {
810            break;
811        }
812
813        buffer->setRange(0, buffer->size() + (size_t)n);
814        bytesRead += n;
815    }
816
817    *out = buffer;
818    if (actualUrl != NULL) {
819        *actualUrl = (*source)->getUri();
820        if (actualUrl->isEmpty()) {
821            *actualUrl = url;
822        }
823    }
824
825    return bytesRead;
826}
827
828sp<M3UParser> LiveSession::fetchPlaylist(
829        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
830    ALOGV("fetchPlaylist '%s'", url);
831
832    *unchanged = false;
833
834    sp<ABuffer> buffer;
835    String8 actualUrl;
836    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
837
838    if (err <= 0) {
839        return NULL;
840    }
841
842    // MD5 functionality is not available on the simulator, treat all
843    // playlists as changed.
844
845#if defined(HAVE_ANDROID_OS)
846    uint8_t hash[16];
847
848    MD5_CTX m;
849    MD5_Init(&m);
850    MD5_Update(&m, buffer->data(), buffer->size());
851
852    MD5_Final(hash, &m);
853
854    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
855        // playlist unchanged
856        *unchanged = true;
857
858        return NULL;
859    }
860
861    if (curPlaylistHash != NULL) {
862        memcpy(curPlaylistHash, hash, sizeof(hash));
863    }
864#endif
865
866    sp<M3UParser> playlist =
867        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
868
869    if (playlist->initCheck() != OK) {
870        ALOGE("failed to parse .m3u8 playlist");
871
872        return NULL;
873    }
874
875    return playlist;
876}
877
878static double uniformRand() {
879    return (double)rand() / RAND_MAX;
880}
881
882size_t LiveSession::getBandwidthIndex() {
883    if (mBandwidthItems.size() == 0) {
884        return 0;
885    }
886
887#if 1
888    char value[PROPERTY_VALUE_MAX];
889    ssize_t index = -1;
890    if (property_get("media.httplive.bw-index", value, NULL)) {
891        char *end;
892        index = strtol(value, &end, 10);
893        CHECK(end > value && *end == '\0');
894
895        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
896            index = mBandwidthItems.size() - 1;
897        }
898    }
899
900    if (index < 0) {
901        int32_t bandwidthBps;
902        if (mHTTPDataSource != NULL
903                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
904            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
905        } else {
906            ALOGV("no bandwidth estimate.");
907            return 0;  // Pick the lowest bandwidth stream by default.
908        }
909
910        char value[PROPERTY_VALUE_MAX];
911        if (property_get("media.httplive.max-bw", value, NULL)) {
912            char *end;
913            long maxBw = strtoul(value, &end, 10);
914            if (end > value && *end == '\0') {
915                if (maxBw > 0 && bandwidthBps > maxBw) {
916                    ALOGV("bandwidth capped to %ld bps", maxBw);
917                    bandwidthBps = maxBw;
918                }
919            }
920        }
921
922        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
923
924        index = mBandwidthItems.size() - 1;
925        while (index > 0) {
926            // consider only 80% of the available bandwidth, but if we are switching up,
927            // be even more conservative (70%) to avoid overestimating and immediately
928            // switching back.
929            size_t adjustedBandwidthBps = bandwidthBps;
930            if (index > mCurBandwidthIndex) {
931                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
932            } else {
933                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
934            }
935            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
936                break;
937            }
938            --index;
939        }
940    }
941#elif 0
942    // Change bandwidth at random()
943    size_t index = uniformRand() * mBandwidthItems.size();
944#elif 0
945    // There's a 50% chance to stay on the current bandwidth and
946    // a 50% chance to switch to the next higher bandwidth (wrapping around
947    // to lowest)
948    const size_t kMinIndex = 0;
949
950    static ssize_t mCurBandwidthIndex = -1;
951
952    size_t index;
953    if (mCurBandwidthIndex < 0) {
954        index = kMinIndex;
955    } else if (uniformRand() < 0.5) {
956        index = (size_t)mCurBandwidthIndex;
957    } else {
958        index = mCurBandwidthIndex + 1;
959        if (index == mBandwidthItems.size()) {
960            index = kMinIndex;
961        }
962    }
963    mCurBandwidthIndex = index;
964#elif 0
965    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
966
967    size_t index = mBandwidthItems.size() - 1;
968    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
969        --index;
970    }
971#elif 1
972    char value[PROPERTY_VALUE_MAX];
973    size_t index;
974    if (property_get("media.httplive.bw-index", value, NULL)) {
975        char *end;
976        index = strtoul(value, &end, 10);
977        CHECK(end > value && *end == '\0');
978
979        if (index >= mBandwidthItems.size()) {
980            index = mBandwidthItems.size() - 1;
981        }
982    } else {
983        index = 0;
984    }
985#else
986    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
987#endif
988
989    CHECK_GE(index, 0);
990
991    return index;
992}
993
994status_t LiveSession::onSeek(const sp<AMessage> &msg) {
995    int64_t timeUs;
996    CHECK(msg->findInt64("timeUs", &timeUs));
997
998    if (!mReconfigurationInProgress) {
999        changeConfiguration(timeUs, getBandwidthIndex());
1000    }
1001
1002    return OK;
1003}
1004
1005status_t LiveSession::getDuration(int64_t *durationUs) const {
1006    int64_t maxDurationUs = 0ll;
1007    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1008        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1009
1010        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1011            maxDurationUs = fetcherDurationUs;
1012        }
1013    }
1014
1015    *durationUs = maxDurationUs;
1016
1017    return OK;
1018}
1019
1020bool LiveSession::isSeekable() const {
1021    int64_t durationUs;
1022    return getDuration(&durationUs) == OK && durationUs >= 0;
1023}
1024
1025bool LiveSession::hasDynamicDuration() const {
1026    return false;
1027}
1028
1029size_t LiveSession::getTrackCount() const {
1030    if (mPlaylist == NULL) {
1031        return 0;
1032    } else {
1033        return mPlaylist->getTrackCount();
1034    }
1035}
1036
1037sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1038    if (mPlaylist == NULL) {
1039        return NULL;
1040    } else {
1041        return mPlaylist->getTrackInfo(trackIndex);
1042    }
1043}
1044
1045status_t LiveSession::selectTrack(size_t index, bool select) {
1046    status_t err = mPlaylist->selectTrack(index, select);
1047    if (err == OK) {
1048        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1049        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1050        msg->setInt32("pickTrack", select);
1051        msg->post();
1052    }
1053    return err;
1054}
1055
1056bool LiveSession::canSwitchUp() {
1057    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1058    status_t err = OK;
1059    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1060        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1061        int64_t dur = source->getBufferedDurationUs(&err);
1062        if (err == OK && dur > 10000000) {
1063            return true;
1064        }
1065    }
1066    return false;
1067}
1068
1069void LiveSession::changeConfiguration(
1070        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1071    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1072    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1073    cancelBandwidthSwitch();
1074
1075    CHECK(!mReconfigurationInProgress);
1076    mReconfigurationInProgress = true;
1077
1078    mCurBandwidthIndex = bandwidthIndex;
1079
1080    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1081          timeUs, bandwidthIndex, pickTrack);
1082
1083    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1084    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1085
1086    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1087    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1088
1089    AString URIs[kMaxStreams];
1090    for (size_t i = 0; i < kMaxStreams; ++i) {
1091        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1092            streamMask |= indexToType(i);
1093        }
1094    }
1095
1096    // Step 1, stop and discard fetchers that are no longer needed.
1097    // Pause those that we'll reuse.
1098    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1099        const AString &uri = mFetcherInfos.keyAt(i);
1100
1101        bool discardFetcher = true;
1102
1103        // If we're seeking all current fetchers are discarded.
1104        if (timeUs < 0ll) {
1105            // delay fetcher removal if not picking tracks
1106            discardFetcher = pickTrack;
1107
1108            for (size_t j = 0; j < kMaxStreams; ++j) {
1109                StreamType type = indexToType(j);
1110                if ((streamMask & type) && uri == URIs[j]) {
1111                    resumeMask |= type;
1112                    streamMask &= ~type;
1113                    discardFetcher = false;
1114                }
1115            }
1116        }
1117
1118        if (discardFetcher) {
1119            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1120        } else {
1121            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1122        }
1123    }
1124
1125    sp<AMessage> msg;
1126    if (timeUs < 0ll) {
1127        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1128        msg = new AMessage(kWhatChangeConfiguration3, id());
1129    } else {
1130        msg = new AMessage(kWhatChangeConfiguration2, id());
1131    }
1132    msg->setInt32("streamMask", streamMask);
1133    msg->setInt32("resumeMask", resumeMask);
1134    msg->setInt32("pickTrack", pickTrack);
1135    msg->setInt64("timeUs", timeUs);
1136    for (size_t i = 0; i < kMaxStreams; ++i) {
1137        if ((streamMask | resumeMask) & indexToType(i)) {
1138            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1139        }
1140    }
1141
1142    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1143    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1144    // fetchers have completed their asynchronous operation, we'll post
1145    // mContinuation, which then is handled below in onChangeConfiguration2.
1146    mContinuationCounter = mFetcherInfos.size();
1147    mContinuation = msg;
1148
1149    if (mContinuationCounter == 0) {
1150        msg->post();
1151
1152        if (mSeekReplyID != 0) {
1153            CHECK(mSeekReply != NULL);
1154            mSeekReply->postReply(mSeekReplyID);
1155        }
1156    }
1157}
1158
1159void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1160    if (!mReconfigurationInProgress) {
1161        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1162        msg->findInt32("pickTrack", &pickTrack);
1163        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1164        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1165    } else {
1166        msg->post(1000000ll); // retry in 1 sec
1167    }
1168}
1169
1170void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1171    mContinuation.clear();
1172
1173    // All fetchers are either suspended or have been removed now.
1174
1175    uint32_t streamMask, resumeMask;
1176    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1177    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1178
1179    // currently onChangeConfiguration2 is only called for seeking;
1180    // remove the following CHECK if using it else where.
1181    CHECK_EQ(resumeMask, 0);
1182    streamMask |= resumeMask;
1183
1184    AString URIs[kMaxStreams];
1185    for (size_t i = 0; i < kMaxStreams; ++i) {
1186        if (streamMask & indexToType(i)) {
1187            const AString &uriKey = mStreams[i].uriKey();
1188            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1189            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1190        }
1191    }
1192
1193    // Determine which decoders to shutdown on the player side,
1194    // a decoder has to be shutdown if either
1195    // 1) its streamtype was active before but now longer isn't.
1196    // or
1197    // 2) its streamtype was already active and still is but the URI
1198    //    has changed.
1199    uint32_t changedMask = 0;
1200    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1201        if (((mStreamMask & streamMask & indexToType(i))
1202                && !(URIs[i] == mStreams[i].mUri))
1203                || (mStreamMask & ~streamMask & indexToType(i))) {
1204            changedMask |= indexToType(i);
1205        }
1206    }
1207
1208    if (changedMask == 0) {
1209        // If nothing changed as far as the audio/video decoders
1210        // are concerned we can proceed.
1211        onChangeConfiguration3(msg);
1212        return;
1213    }
1214
1215    // Something changed, inform the player which will shutdown the
1216    // corresponding decoders and will post the reply once that's done.
1217    // Handling the reply will continue executing below in
1218    // onChangeConfiguration3.
1219    sp<AMessage> notify = mNotify->dup();
1220    notify->setInt32("what", kWhatStreamsChanged);
1221    notify->setInt32("changedMask", changedMask);
1222
1223    msg->setWhat(kWhatChangeConfiguration3);
1224    msg->setTarget(id());
1225
1226    notify->setMessage("reply", msg);
1227    notify->post();
1228}
1229
1230void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1231    mContinuation.clear();
1232    // All remaining fetchers are still suspended, the player has shutdown
1233    // any decoders that needed it.
1234
1235    uint32_t streamMask, resumeMask;
1236    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1237    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1238
1239    for (size_t i = 0; i < kMaxStreams; ++i) {
1240        if (streamMask & indexToType(i)) {
1241            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1242        }
1243    }
1244
1245    int64_t timeUs;
1246    int32_t pickTrack;
1247    bool switching = false;
1248    CHECK(msg->findInt64("timeUs", &timeUs));
1249    CHECK(msg->findInt32("pickTrack", &pickTrack));
1250
1251    if (timeUs < 0ll) {
1252        if (!pickTrack) {
1253            switching = true;
1254        }
1255        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1256    } else {
1257        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1258    }
1259
1260    mNewStreamMask = streamMask | resumeMask;
1261
1262    // Of all existing fetchers:
1263    // * Resume fetchers that are still needed and assign them original packet sources.
1264    // * Mark otherwise unneeded fetchers for removal.
1265    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1266    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1267        const AString &uri = mFetcherInfos.keyAt(i);
1268
1269        sp<AnotherPacketSource> sources[kMaxStreams];
1270        for (size_t j = 0; j < kMaxStreams; ++j) {
1271            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1272                sources[j] = mPacketSources.valueFor(indexToType(j));
1273
1274                if (j != kSubtitleIndex) {
1275                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1276                    sp<AnotherPacketSource> discontinuityQueue;
1277                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1278                    discontinuityQueue->queueDiscontinuity(
1279                            ATSParser::DISCONTINUITY_NONE,
1280                            NULL,
1281                            true);
1282                }
1283            }
1284        }
1285
1286        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1287        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1288                || sources[kSubtitleIndex] != NULL) {
1289            info.mFetcher->startAsync(
1290                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1291        } else {
1292            info.mToBeRemoved = true;
1293        }
1294    }
1295
1296    // streamMask now only contains the types that need a new fetcher created.
1297
1298    if (streamMask != 0) {
1299        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1300    }
1301
1302    // Find out when the original fetchers have buffered up to and start the new fetchers
1303    // at a later timestamp.
1304    for (size_t i = 0; i < kMaxStreams; i++) {
1305        if (!(indexToType(i) & streamMask)) {
1306            continue;
1307        }
1308
1309        AString uri;
1310        uri = mStreams[i].mUri;
1311
1312        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1313        CHECK(fetcher != NULL);
1314
1315        int32_t latestSeq = -1;
1316        int64_t startTimeUs = -1;
1317        int64_t segmentStartTimeUs = -1ll;
1318        int32_t discontinuitySeq = -1;
1319        sp<AnotherPacketSource> sources[kMaxStreams];
1320
1321        // TRICKY: looping from i as earlier streams are already removed from streamMask
1322        for (size_t j = i; j < kMaxStreams; ++j) {
1323            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1324                sources[j] = mPacketSources.valueFor(indexToType(j));
1325
1326                if (timeUs >= 0) {
1327                    sources[j]->clear();
1328                    startTimeUs = timeUs;
1329
1330                    sp<AnotherPacketSource> discontinuityQueue;
1331                    sp<AMessage> extra = new AMessage;
1332                    extra->setInt64("timeUs", timeUs);
1333                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1334                    discontinuityQueue->queueDiscontinuity(
1335                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1336                } else {
1337                    int32_t type;
1338                    int64_t srcSegmentStartTimeUs;
1339                    sp<AMessage> meta;
1340                    if (pickTrack) {
1341                        // selecting
1342                        meta = sources[j]->getLatestDequeuedMeta();
1343                    } else {
1344                        // adapting
1345                        meta = sources[j]->getLatestEnqueuedMeta();
1346                    }
1347
1348                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1349                        int64_t tmpUs;
1350                        CHECK(meta->findInt64("timeUs", &tmpUs));
1351                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1352                            startTimeUs = tmpUs;
1353                        }
1354
1355                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1356                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1357                            segmentStartTimeUs = tmpUs;
1358                        }
1359
1360                        int32_t seq;
1361                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1362                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1363                            discontinuitySeq = seq;
1364                        }
1365                    }
1366
1367                    if (pickTrack) {
1368                        // selecting track, queue discontinuities before content
1369                        sources[j]->clear();
1370                        if (j == kSubtitleIndex) {
1371                            break;
1372                        }
1373                        sp<AnotherPacketSource> discontinuityQueue;
1374                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1375                        discontinuityQueue->queueDiscontinuity(
1376                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1377                    } else {
1378                        // adapting, queue discontinuities after resume
1379                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1380                        sources[j]->clear();
1381                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1382                        if (extraStreams & indexToType(j)) {
1383                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1384                        }
1385                    }
1386                }
1387
1388                streamMask &= ~indexToType(j);
1389            }
1390        }
1391
1392        fetcher->startAsync(
1393                sources[kAudioIndex],
1394                sources[kVideoIndex],
1395                sources[kSubtitleIndex],
1396                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1397                segmentStartTimeUs,
1398                discontinuitySeq,
1399                switching);
1400    }
1401
1402    // All fetchers have now been started, the configuration change
1403    // has completed.
1404
1405    cancelCheckBandwidthEvent();
1406    scheduleCheckBandwidthEvent();
1407
1408    ALOGV("XXX configuration change completed.");
1409    mReconfigurationInProgress = false;
1410    if (switching) {
1411        mSwitchInProgress = true;
1412        mSwapMask = streamMask;
1413    } else {
1414        mStreamMask = mNewStreamMask;
1415    }
1416
1417    if (mDisconnectReplyID != 0) {
1418        finishDisconnect();
1419    }
1420}
1421
1422void LiveSession::onSwapped(const sp<AMessage> &msg) {
1423    int32_t switchGeneration;
1424    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1425    if (switchGeneration != mSwitchGeneration) {
1426        return;
1427    }
1428
1429    int32_t stream;
1430    CHECK(msg->findInt32("stream", &stream));
1431    mSwapMask &= ~stream;
1432    if (mSwapMask != 0) {
1433        return;
1434    }
1435
1436    // Check if new variant contains extra streams.
1437    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1438    while (extraStreams) {
1439        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1440        swapPacketSource(extraStream);
1441        extraStreams &= ~extraStream;
1442    }
1443
1444    tryToFinishBandwidthSwitch();
1445}
1446
1447// Mark switch done when:
1448//   1. all old buffers are swapped out
1449void LiveSession::tryToFinishBandwidthSwitch() {
1450    if (!mSwitchInProgress) {
1451        return;
1452    }
1453
1454    bool needToRemoveFetchers = false;
1455    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1456        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1457            needToRemoveFetchers = true;
1458            break;
1459        }
1460    }
1461
1462    if (!needToRemoveFetchers && mSwapMask == 0) {
1463        ALOGI("mSwitchInProgress = false");
1464        mStreamMask = mNewStreamMask;
1465        mSwitchInProgress = false;
1466    }
1467}
1468
1469void LiveSession::scheduleCheckBandwidthEvent() {
1470    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1471    msg->setInt32("generation", mCheckBandwidthGeneration);
1472    msg->post(10000000ll);
1473}
1474
1475void LiveSession::cancelCheckBandwidthEvent() {
1476    ++mCheckBandwidthGeneration;
1477}
1478
1479void LiveSession::cancelBandwidthSwitch() {
1480    Mutex::Autolock lock(mSwapMutex);
1481    mSwitchGeneration++;
1482    mSwitchInProgress = false;
1483    mSwapMask = 0;
1484}
1485
1486bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1487    if (mReconfigurationInProgress || mSwitchInProgress) {
1488        return false;
1489    }
1490
1491    if (mCurBandwidthIndex < 0) {
1492        return true;
1493    }
1494
1495    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1496        return false;
1497    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1498        return canSwitchUp();
1499    } else {
1500        return true;
1501    }
1502}
1503
1504void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1505    size_t bandwidthIndex = getBandwidthIndex();
1506    if (canSwitchBandwidthTo(bandwidthIndex)) {
1507        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1508    } else {
1509        // Come back and check again 10 seconds later in case there is nothing to do now.
1510        // If we DO change configuration, once that completes it'll schedule a new
1511        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1512        msg->post(10000000ll);
1513    }
1514}
1515
1516void LiveSession::postPrepared(status_t err) {
1517    CHECK(mInPreparationPhase);
1518
1519    sp<AMessage> notify = mNotify->dup();
1520    if (err == OK || err == ERROR_END_OF_STREAM) {
1521        notify->setInt32("what", kWhatPrepared);
1522    } else {
1523        notify->setInt32("what", kWhatPreparationFailed);
1524        notify->setInt32("err", err);
1525    }
1526
1527    notify->post();
1528
1529    mInPreparationPhase = false;
1530}
1531
1532}  // namespace android
1533
1534