LiveSession.cpp revision 9d7fc5c5fab0c7c967a625d22fffda046f9d5c29
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84    }
85}
86
87LiveSession::~LiveSession() {
88}
89
90sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
91    ABuffer *discontinuity = new ABuffer(0);
92    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
93    discontinuity->meta()->setInt32("swapPacketSource", swap);
94    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
95    discontinuity->meta()->setInt64("timeUs", -1);
96    return discontinuity;
97}
98
99void LiveSession::swapPacketSource(StreamType stream) {
100    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
101    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
102    sp<AnotherPacketSource> tmp = aps;
103    aps = aps2;
104    aps2 = tmp;
105    aps2->clear();
106}
107
108status_t LiveSession::dequeueAccessUnit(
109        StreamType stream, sp<ABuffer> *accessUnit) {
110    if (!(mStreamMask & stream)) {
111        // return -EWOULDBLOCK to avoid halting the decoder
112        // when switching between audio/video and audio only.
113        return -EWOULDBLOCK;
114    }
115
116    status_t finalResult;
117    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
118    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
119        discontinuityQueue->dequeueAccessUnit(accessUnit);
120        // seeking, track switching
121        sp<AMessage> extra;
122        int64_t timeUs;
123        if ((*accessUnit)->meta()->findMessage("extra", &extra)
124                && extra != NULL
125                && extra->findInt64("timeUs", &timeUs)) {
126            // seeking only
127            mLastSeekTimeUs = timeUs;
128            mDiscontinuityOffsetTimesUs.clear();
129            mDiscontinuityAbsStartTimesUs.clear();
130        }
131        return INFO_DISCONTINUITY;
132    }
133
134    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
135
136    if (!packetSource->hasBufferAvailable(&finalResult)) {
137        return finalResult == OK ? -EAGAIN : finalResult;
138    }
139
140    // wait for counterpart
141    sp<AnotherPacketSource> otherSource;
142    if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) {
143        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
144    } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) {
145        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
146    }
147    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
148        return finalResult == OK ? -EAGAIN : finalResult;
149    }
150
151    status_t err = packetSource->dequeueAccessUnit(accessUnit);
152
153    size_t streamIdx;
154    const char *streamStr;
155    switch (stream) {
156        case STREAMTYPE_AUDIO:
157            streamIdx = kAudioIndex;
158            streamStr = "audio";
159            break;
160        case STREAMTYPE_VIDEO:
161            streamIdx = kVideoIndex;
162            streamStr = "video";
163            break;
164        case STREAMTYPE_SUBTITLES:
165            streamIdx = kSubtitleIndex;
166            streamStr = "subs";
167            break;
168        default:
169            TRESPASS();
170    }
171
172    StreamItem& strm = mStreams[streamIdx];
173    if (err == INFO_DISCONTINUITY) {
174        // adaptive streaming, discontinuities in the playlist
175        int32_t type;
176        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
177
178        sp<AMessage> extra;
179        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
180            extra.clear();
181        }
182
183        ALOGI("[%s] read discontinuity of type %d, extra = %s",
184              streamStr,
185              type,
186              extra == NULL ? "NULL" : extra->debugString().c_str());
187
188        int32_t swap;
189        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
190            int32_t switchGeneration;
191            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
192            {
193                Mutex::Autolock lock(mSwapMutex);
194                if (switchGeneration == mSwitchGeneration) {
195                    swapPacketSource(stream);
196                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
197                    msg->setInt32("stream", stream);
198                    msg->setInt32("switchGeneration", switchGeneration);
199                    msg->post();
200                }
201            }
202        } else {
203            size_t seq = strm.mCurDiscontinuitySeq;
204            int64_t offsetTimeUs;
205            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
206                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
207            } else {
208                offsetTimeUs = 0;
209            }
210
211            seq += 1;
212            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
213                int64_t firstTimeUs;
214                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
215                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
216                offsetTimeUs += strm.mLastSampleDurationUs;
217            } else {
218                offsetTimeUs += strm.mLastSampleDurationUs;
219            }
220
221            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
222        }
223    } else if (err == OK) {
224
225        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
226            int64_t timeUs;
227            int32_t discontinuitySeq = 0;
228            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
229            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
230            strm.mCurDiscontinuitySeq = discontinuitySeq;
231
232            int32_t discard = 0;
233            int64_t firstTimeUs;
234            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
235                int64_t durUs; // approximate sample duration
236                if (timeUs > strm.mLastDequeuedTimeUs) {
237                    durUs = timeUs - strm.mLastDequeuedTimeUs;
238                } else {
239                    durUs = strm.mLastDequeuedTimeUs - timeUs;
240                }
241                strm.mLastSampleDurationUs = durUs;
242                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
243            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
244                firstTimeUs = timeUs;
245            } else {
246                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
247                firstTimeUs = timeUs;
248            }
249
250            strm.mLastDequeuedTimeUs = timeUs;
251            if (timeUs >= firstTimeUs) {
252                timeUs -= firstTimeUs;
253            } else {
254                timeUs = 0;
255            }
256            timeUs += mLastSeekTimeUs;
257            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
258                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
259            }
260
261            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
262            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
263            mLastDequeuedTimeUs = timeUs;
264            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
265        } else if (stream == STREAMTYPE_SUBTITLES) {
266            (*accessUnit)->meta()->setInt32(
267                    "trackIndex", mPlaylist->getSelectedIndex());
268            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
269        }
270    } else {
271        ALOGI("[%s] encountered error %d", streamStr, err);
272    }
273
274    return err;
275}
276
277status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
278    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
279    if (!(mStreamMask & stream)) {
280        return UNKNOWN_ERROR;
281    }
282
283    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
284
285    sp<MetaData> meta = packetSource->getFormat();
286
287    if (meta == NULL) {
288        return -EAGAIN;
289    }
290
291    return convertMetaDataToMessage(meta, format);
292}
293
294void LiveSession::connectAsync(
295        const char *url, const KeyedVector<String8, String8> *headers) {
296    sp<AMessage> msg = new AMessage(kWhatConnect, id());
297    msg->setString("url", url);
298
299    if (headers != NULL) {
300        msg->setPointer(
301                "headers",
302                new KeyedVector<String8, String8>(*headers));
303    }
304
305    msg->post();
306}
307
308status_t LiveSession::disconnect() {
309    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
310
311    sp<AMessage> response;
312    status_t err = msg->postAndAwaitResponse(&response);
313
314    return err;
315}
316
317status_t LiveSession::seekTo(int64_t timeUs) {
318    sp<AMessage> msg = new AMessage(kWhatSeek, id());
319    msg->setInt64("timeUs", timeUs);
320
321    sp<AMessage> response;
322    status_t err = msg->postAndAwaitResponse(&response);
323
324    uint32_t replyID;
325    CHECK(response == mSeekReply && 0 != mSeekReplyID);
326    mSeekReply.clear();
327    mSeekReplyID = 0;
328    return err;
329}
330
331void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
332    switch (msg->what()) {
333        case kWhatConnect:
334        {
335            onConnect(msg);
336            break;
337        }
338
339        case kWhatDisconnect:
340        {
341            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
342
343            if (mReconfigurationInProgress) {
344                break;
345            }
346
347            finishDisconnect();
348            break;
349        }
350
351        case kWhatSeek:
352        {
353            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
354
355            status_t err = onSeek(msg);
356
357            mSeekReply = new AMessage;
358            mSeekReply->setInt32("err", err);
359            break;
360        }
361
362        case kWhatFetcherNotify:
363        {
364            int32_t what;
365            CHECK(msg->findInt32("what", &what));
366
367            switch (what) {
368                case PlaylistFetcher::kWhatStarted:
369                    break;
370                case PlaylistFetcher::kWhatPaused:
371                case PlaylistFetcher::kWhatStopped:
372                {
373                    if (what == PlaylistFetcher::kWhatStopped) {
374                        AString uri;
375                        CHECK(msg->findString("uri", &uri));
376                        if (mFetcherInfos.removeItem(uri) < 0) {
377                            // ignore duplicated kWhatStopped messages.
378                            break;
379                        }
380
381                        if (mSwitchInProgress) {
382                            tryToFinishBandwidthSwitch();
383                        }
384                    }
385
386                    if (mContinuation != NULL) {
387                        CHECK_GT(mContinuationCounter, 0);
388                        if (--mContinuationCounter == 0) {
389                            mContinuation->post();
390
391                            if (mSeekReplyID != 0) {
392                                CHECK(mSeekReply != NULL);
393                                mSeekReply->postReply(mSeekReplyID);
394                            }
395                        }
396                    }
397                    break;
398                }
399
400                case PlaylistFetcher::kWhatDurationUpdate:
401                {
402                    AString uri;
403                    CHECK(msg->findString("uri", &uri));
404
405                    int64_t durationUs;
406                    CHECK(msg->findInt64("durationUs", &durationUs));
407
408                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
409                    info->mDurationUs = durationUs;
410                    break;
411                }
412
413                case PlaylistFetcher::kWhatError:
414                {
415                    status_t err;
416                    CHECK(msg->findInt32("err", &err));
417
418                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
419
420                    if (mInPreparationPhase) {
421                        postPrepared(err);
422                    }
423
424                    cancelBandwidthSwitch();
425
426                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
427
428                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
429
430                    mPacketSources.valueFor(
431                            STREAMTYPE_SUBTITLES)->signalEOS(err);
432
433                    sp<AMessage> notify = mNotify->dup();
434                    notify->setInt32("what", kWhatError);
435                    notify->setInt32("err", err);
436                    notify->post();
437                    break;
438                }
439
440                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
441                {
442                    AString uri;
443                    CHECK(msg->findString("uri", &uri));
444
445                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
446                    info->mIsPrepared = true;
447
448                    if (mInPreparationPhase) {
449                        bool allFetchersPrepared = true;
450                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
451                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
452                                allFetchersPrepared = false;
453                                break;
454                            }
455                        }
456
457                        if (allFetchersPrepared) {
458                            postPrepared(OK);
459                        }
460                    }
461                    break;
462                }
463
464                case PlaylistFetcher::kWhatStartedAt:
465                {
466                    int32_t switchGeneration;
467                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
468
469                    if (switchGeneration != mSwitchGeneration) {
470                        break;
471                    }
472
473                    // Resume fetcher for the original variant; the resumed fetcher should
474                    // continue until the timestamps found in msg, which is stored by the
475                    // new fetcher to indicate where the new variant has started buffering.
476                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
477                        const FetcherInfo info = mFetcherInfos.valueAt(i);
478                        if (info.mToBeRemoved) {
479                            info.mFetcher->resumeUntilAsync(msg);
480                        }
481                    }
482                    break;
483                }
484
485                default:
486                    TRESPASS();
487            }
488
489            break;
490        }
491
492        case kWhatCheckBandwidth:
493        {
494            int32_t generation;
495            CHECK(msg->findInt32("generation", &generation));
496
497            if (generation != mCheckBandwidthGeneration) {
498                break;
499            }
500
501            onCheckBandwidth();
502            break;
503        }
504
505        case kWhatChangeConfiguration:
506        {
507            onChangeConfiguration(msg);
508            break;
509        }
510
511        case kWhatChangeConfiguration2:
512        {
513            onChangeConfiguration2(msg);
514            break;
515        }
516
517        case kWhatChangeConfiguration3:
518        {
519            onChangeConfiguration3(msg);
520            break;
521        }
522
523        case kWhatFinishDisconnect2:
524        {
525            onFinishDisconnect2();
526            break;
527        }
528
529        case kWhatSwapped:
530        {
531            onSwapped(msg);
532            break;
533        }
534        default:
535            TRESPASS();
536            break;
537    }
538}
539
540// static
541int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
542    if (a->mBandwidth < b->mBandwidth) {
543        return -1;
544    } else if (a->mBandwidth == b->mBandwidth) {
545        return 0;
546    }
547
548    return 1;
549}
550
551// static
552LiveSession::StreamType LiveSession::indexToType(int idx) {
553    CHECK(idx >= 0 && idx < kMaxStreams);
554    return (StreamType)(1 << idx);
555}
556
557void LiveSession::onConnect(const sp<AMessage> &msg) {
558    AString url;
559    CHECK(msg->findString("url", &url));
560
561    KeyedVector<String8, String8> *headers = NULL;
562    if (!msg->findPointer("headers", (void **)&headers)) {
563        mExtraHeaders.clear();
564    } else {
565        mExtraHeaders = *headers;
566
567        delete headers;
568        headers = NULL;
569    }
570
571    // TODO currently we don't know if we are coming here from incognito mode
572    ALOGI("onConnect %s", uriDebugString(url).c_str());
573
574    mMasterURL = url;
575
576    bool dummy;
577    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
578
579    if (mPlaylist == NULL) {
580        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
581
582        postPrepared(ERROR_IO);
583        return;
584    }
585
586    // We trust the content provider to make a reasonable choice of preferred
587    // initial bandwidth by listing it first in the variant playlist.
588    // At startup we really don't have a good estimate on the available
589    // network bandwidth since we haven't tranferred any data yet. Once
590    // we have we can make a better informed choice.
591    size_t initialBandwidth = 0;
592    size_t initialBandwidthIndex = 0;
593
594    if (mPlaylist->isVariantPlaylist()) {
595        for (size_t i = 0; i < mPlaylist->size(); ++i) {
596            BandwidthItem item;
597
598            item.mPlaylistIndex = i;
599
600            sp<AMessage> meta;
601            AString uri;
602            mPlaylist->itemAt(i, &uri, &meta);
603
604            unsigned long bandwidth;
605            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
606
607            if (initialBandwidth == 0) {
608                initialBandwidth = item.mBandwidth;
609            }
610
611            mBandwidthItems.push(item);
612        }
613
614        CHECK_GT(mBandwidthItems.size(), 0u);
615
616        mBandwidthItems.sort(SortByBandwidth);
617
618        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
619            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
620                initialBandwidthIndex = i;
621                break;
622            }
623        }
624    } else {
625        // dummy item.
626        BandwidthItem item;
627        item.mPlaylistIndex = 0;
628        item.mBandwidth = 0;
629        mBandwidthItems.push(item);
630    }
631
632    mPlaylist->pickRandomMediaItems();
633    changeConfiguration(
634            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
635}
636
637void LiveSession::finishDisconnect() {
638    // No reconfiguration is currently pending, make sure none will trigger
639    // during disconnection either.
640    cancelCheckBandwidthEvent();
641
642    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
643    // (finishDisconnect, onFinishDisconnect2)
644    cancelBandwidthSwitch();
645
646    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
647        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
648    }
649
650    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
651
652    mContinuationCounter = mFetcherInfos.size();
653    mContinuation = msg;
654
655    if (mContinuationCounter == 0) {
656        msg->post();
657    }
658}
659
660void LiveSession::onFinishDisconnect2() {
661    mContinuation.clear();
662
663    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
664    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
665
666    mPacketSources.valueFor(
667            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
668
669    sp<AMessage> response = new AMessage;
670    response->setInt32("err", OK);
671
672    response->postReply(mDisconnectReplyID);
673    mDisconnectReplyID = 0;
674}
675
676sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
677    ssize_t index = mFetcherInfos.indexOfKey(uri);
678
679    if (index >= 0) {
680        return NULL;
681    }
682
683    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
684    notify->setString("uri", uri);
685    notify->setInt32("switchGeneration", mSwitchGeneration);
686
687    FetcherInfo info;
688    info.mFetcher = new PlaylistFetcher(notify, this, uri);
689    info.mDurationUs = -1ll;
690    info.mIsPrepared = false;
691    info.mToBeRemoved = false;
692    looper()->registerHandler(info.mFetcher);
693
694    mFetcherInfos.add(uri, info);
695
696    return info.mFetcher;
697}
698
699/*
700 * Illustration of parameters:
701 *
702 * 0      `range_offset`
703 * +------------+-------------------------------------------------------+--+--+
704 * |            |                                 | next block to fetch |  |  |
705 * |            | `source` handle => `out` buffer |                     |  |  |
706 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
707 * |            |<----------- `range_length` / buffer capacity ----------->|  |
708 * |<------------------------------ file_size ------------------------------->|
709 *
710 * Special parameter values:
711 * - range_length == -1 means entire file
712 * - block_size == 0 means entire range
713 *
714 */
715ssize_t LiveSession::fetchFile(
716        const char *url, sp<ABuffer> *out,
717        int64_t range_offset, int64_t range_length,
718        uint32_t block_size, /* download block size */
719        sp<DataSource> *source, /* to return and reuse source */
720        String8 *actualUrl) {
721    off64_t size;
722    sp<DataSource> temp_source;
723    if (source == NULL) {
724        source = &temp_source;
725    }
726
727    if (*source == NULL) {
728        if (!strncasecmp(url, "file://", 7)) {
729            *source = new FileSource(url + 7);
730        } else if (strncasecmp(url, "http://", 7)
731                && strncasecmp(url, "https://", 8)) {
732            return ERROR_UNSUPPORTED;
733        } else {
734            KeyedVector<String8, String8> headers = mExtraHeaders;
735            if (range_offset > 0 || range_length >= 0) {
736                headers.add(
737                        String8("Range"),
738                        String8(
739                            StringPrintf(
740                                "bytes=%lld-%s",
741                                range_offset,
742                                range_length < 0
743                                    ? "" : StringPrintf("%lld",
744                                            range_offset + range_length - 1).c_str()).c_str()));
745            }
746            status_t err = mHTTPDataSource->connect(url, &headers);
747
748            if (err != OK) {
749                return err;
750            }
751
752            *source = mHTTPDataSource;
753        }
754    }
755
756    status_t getSizeErr = (*source)->getSize(&size);
757    if (getSizeErr != OK) {
758        size = 65536;
759    }
760
761    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
762    if (*out == NULL) {
763        buffer->setRange(0, 0);
764    }
765
766    ssize_t bytesRead = 0;
767    // adjust range_length if only reading partial block
768    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
769        range_length = buffer->size() + block_size;
770    }
771    for (;;) {
772        // Only resize when we don't know the size.
773        size_t bufferRemaining = buffer->capacity() - buffer->size();
774        if (bufferRemaining == 0 && getSizeErr != OK) {
775            bufferRemaining = 32768;
776
777            ALOGV("increasing download buffer to %zu bytes",
778                 buffer->size() + bufferRemaining);
779
780            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
781            memcpy(copy->data(), buffer->data(), buffer->size());
782            copy->setRange(0, buffer->size());
783
784            buffer = copy;
785        }
786
787        size_t maxBytesToRead = bufferRemaining;
788        if (range_length >= 0) {
789            int64_t bytesLeftInRange = range_length - buffer->size();
790            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
791                maxBytesToRead = bytesLeftInRange;
792
793                if (bytesLeftInRange == 0) {
794                    break;
795                }
796            }
797        }
798
799        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
800        // to help us break out of the loop.
801        ssize_t n = (*source)->readAt(
802                buffer->size(), buffer->data() + buffer->size(),
803                maxBytesToRead);
804
805        if (n < 0) {
806            return n;
807        }
808
809        if (n == 0) {
810            break;
811        }
812
813        buffer->setRange(0, buffer->size() + (size_t)n);
814        bytesRead += n;
815    }
816
817    *out = buffer;
818    if (actualUrl != NULL) {
819        *actualUrl = (*source)->getUri();
820        if (actualUrl->isEmpty()) {
821            *actualUrl = url;
822        }
823    }
824
825    return bytesRead;
826}
827
828sp<M3UParser> LiveSession::fetchPlaylist(
829        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
830    ALOGV("fetchPlaylist '%s'", url);
831
832    *unchanged = false;
833
834    sp<ABuffer> buffer;
835    String8 actualUrl;
836    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
837
838    if (err <= 0) {
839        return NULL;
840    }
841
842    // MD5 functionality is not available on the simulator, treat all
843    // playlists as changed.
844
845#if defined(HAVE_ANDROID_OS)
846    uint8_t hash[16];
847
848    MD5_CTX m;
849    MD5_Init(&m);
850    MD5_Update(&m, buffer->data(), buffer->size());
851
852    MD5_Final(hash, &m);
853
854    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
855        // playlist unchanged
856        *unchanged = true;
857
858        return NULL;
859    }
860
861    if (curPlaylistHash != NULL) {
862        memcpy(curPlaylistHash, hash, sizeof(hash));
863    }
864#endif
865
866    sp<M3UParser> playlist =
867        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
868
869    if (playlist->initCheck() != OK) {
870        ALOGE("failed to parse .m3u8 playlist");
871
872        return NULL;
873    }
874
875    return playlist;
876}
877
878static double uniformRand() {
879    return (double)rand() / RAND_MAX;
880}
881
882size_t LiveSession::getBandwidthIndex() {
883    if (mBandwidthItems.size() == 0) {
884        return 0;
885    }
886
887#if 1
888    char value[PROPERTY_VALUE_MAX];
889    ssize_t index = -1;
890    if (property_get("media.httplive.bw-index", value, NULL)) {
891        char *end;
892        index = strtol(value, &end, 10);
893        CHECK(end > value && *end == '\0');
894
895        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
896            index = mBandwidthItems.size() - 1;
897        }
898    }
899
900    if (index < 0) {
901        int32_t bandwidthBps;
902        if (mHTTPDataSource != NULL
903                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
904            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
905        } else {
906            ALOGV("no bandwidth estimate.");
907            return 0;  // Pick the lowest bandwidth stream by default.
908        }
909
910        char value[PROPERTY_VALUE_MAX];
911        if (property_get("media.httplive.max-bw", value, NULL)) {
912            char *end;
913            long maxBw = strtoul(value, &end, 10);
914            if (end > value && *end == '\0') {
915                if (maxBw > 0 && bandwidthBps > maxBw) {
916                    ALOGV("bandwidth capped to %ld bps", maxBw);
917                    bandwidthBps = maxBw;
918                }
919            }
920        }
921
922        // Consider only 80% of the available bandwidth usable.
923        bandwidthBps = (bandwidthBps * 8) / 10;
924
925        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
926
927        index = mBandwidthItems.size() - 1;
928        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
929                                > (size_t)bandwidthBps) {
930            --index;
931        }
932    }
933#elif 0
934    // Change bandwidth at random()
935    size_t index = uniformRand() * mBandwidthItems.size();
936#elif 0
937    // There's a 50% chance to stay on the current bandwidth and
938    // a 50% chance to switch to the next higher bandwidth (wrapping around
939    // to lowest)
940    const size_t kMinIndex = 0;
941
942    static ssize_t mCurBandwidthIndex = -1;
943
944    size_t index;
945    if (mCurBandwidthIndex < 0) {
946        index = kMinIndex;
947    } else if (uniformRand() < 0.5) {
948        index = (size_t)mCurBandwidthIndex;
949    } else {
950        index = mCurBandwidthIndex + 1;
951        if (index == mBandwidthItems.size()) {
952            index = kMinIndex;
953        }
954    }
955    mCurBandwidthIndex = index;
956#elif 0
957    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
958
959    size_t index = mBandwidthItems.size() - 1;
960    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
961        --index;
962    }
963#elif 1
964    char value[PROPERTY_VALUE_MAX];
965    size_t index;
966    if (property_get("media.httplive.bw-index", value, NULL)) {
967        char *end;
968        index = strtoul(value, &end, 10);
969        CHECK(end > value && *end == '\0');
970
971        if (index >= mBandwidthItems.size()) {
972            index = mBandwidthItems.size() - 1;
973        }
974    } else {
975        index = 0;
976    }
977#else
978    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
979#endif
980
981    CHECK_GE(index, 0);
982
983    return index;
984}
985
986status_t LiveSession::onSeek(const sp<AMessage> &msg) {
987    int64_t timeUs;
988    CHECK(msg->findInt64("timeUs", &timeUs));
989
990    if (!mReconfigurationInProgress) {
991        changeConfiguration(timeUs, getBandwidthIndex());
992    }
993
994    return OK;
995}
996
997status_t LiveSession::getDuration(int64_t *durationUs) const {
998    int64_t maxDurationUs = 0ll;
999    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1000        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1001
1002        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1003            maxDurationUs = fetcherDurationUs;
1004        }
1005    }
1006
1007    *durationUs = maxDurationUs;
1008
1009    return OK;
1010}
1011
1012bool LiveSession::isSeekable() const {
1013    int64_t durationUs;
1014    return getDuration(&durationUs) == OK && durationUs >= 0;
1015}
1016
1017bool LiveSession::hasDynamicDuration() const {
1018    return false;
1019}
1020
1021size_t LiveSession::getTrackCount() const {
1022    if (mPlaylist == NULL) {
1023        return 0;
1024    } else {
1025        return mPlaylist->getTrackCount();
1026    }
1027}
1028
1029sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1030    if (mPlaylist == NULL) {
1031        return NULL;
1032    } else {
1033        return mPlaylist->getTrackInfo(trackIndex);
1034    }
1035}
1036
1037status_t LiveSession::selectTrack(size_t index, bool select) {
1038    status_t err = mPlaylist->selectTrack(index, select);
1039    if (err == OK) {
1040        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1041        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1042        msg->setInt32("pickTrack", select);
1043        msg->post();
1044    }
1045    return err;
1046}
1047
1048bool LiveSession::canSwitchUp() {
1049    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1050    status_t err = OK;
1051    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1052        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1053        int64_t dur = source->getBufferedDurationUs(&err);
1054        if (err == OK && dur > 10000000) {
1055            return true;
1056        }
1057    }
1058    return false;
1059}
1060
1061void LiveSession::changeConfiguration(
1062        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1063    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1064    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1065    cancelBandwidthSwitch();
1066
1067    CHECK(!mReconfigurationInProgress);
1068    mReconfigurationInProgress = true;
1069
1070    mCurBandwidthIndex = bandwidthIndex;
1071
1072    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1073          timeUs, bandwidthIndex, pickTrack);
1074
1075    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1076    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1077
1078    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1079    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1080
1081    AString URIs[kMaxStreams];
1082    for (size_t i = 0; i < kMaxStreams; ++i) {
1083        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1084            streamMask |= indexToType(i);
1085        }
1086    }
1087
1088    // Step 1, stop and discard fetchers that are no longer needed.
1089    // Pause those that we'll reuse.
1090    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1091        const AString &uri = mFetcherInfos.keyAt(i);
1092
1093        bool discardFetcher = true;
1094
1095        // If we're seeking all current fetchers are discarded.
1096        if (timeUs < 0ll) {
1097            // delay fetcher removal if not picking tracks
1098            discardFetcher = pickTrack;
1099
1100            for (size_t j = 0; j < kMaxStreams; ++j) {
1101                StreamType type = indexToType(j);
1102                if ((streamMask & type) && uri == URIs[j]) {
1103                    resumeMask |= type;
1104                    streamMask &= ~type;
1105                    discardFetcher = false;
1106                }
1107            }
1108        }
1109
1110        if (discardFetcher) {
1111            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1112        } else {
1113            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1114        }
1115    }
1116
1117    sp<AMessage> msg;
1118    if (timeUs < 0ll) {
1119        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1120        msg = new AMessage(kWhatChangeConfiguration3, id());
1121    } else {
1122        msg = new AMessage(kWhatChangeConfiguration2, id());
1123    }
1124    msg->setInt32("streamMask", streamMask);
1125    msg->setInt32("resumeMask", resumeMask);
1126    msg->setInt32("pickTrack", pickTrack);
1127    msg->setInt64("timeUs", timeUs);
1128    for (size_t i = 0; i < kMaxStreams; ++i) {
1129        if ((streamMask | resumeMask) & indexToType(i)) {
1130            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1131        }
1132    }
1133
1134    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1135    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1136    // fetchers have completed their asynchronous operation, we'll post
1137    // mContinuation, which then is handled below in onChangeConfiguration2.
1138    mContinuationCounter = mFetcherInfos.size();
1139    mContinuation = msg;
1140
1141    if (mContinuationCounter == 0) {
1142        msg->post();
1143
1144        if (mSeekReplyID != 0) {
1145            CHECK(mSeekReply != NULL);
1146            mSeekReply->postReply(mSeekReplyID);
1147        }
1148    }
1149}
1150
1151void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1152    if (!mReconfigurationInProgress) {
1153        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1154        msg->findInt32("pickTrack", &pickTrack);
1155        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1156        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1157    } else {
1158        msg->post(1000000ll); // retry in 1 sec
1159    }
1160}
1161
1162void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1163    mContinuation.clear();
1164
1165    // All fetchers are either suspended or have been removed now.
1166
1167    uint32_t streamMask, resumeMask;
1168    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1169    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1170
1171    // currently onChangeConfiguration2 is only called for seeking;
1172    // remove the following CHECK if using it else where.
1173    CHECK_EQ(resumeMask, 0);
1174    streamMask |= resumeMask;
1175
1176    AString URIs[kMaxStreams];
1177    for (size_t i = 0; i < kMaxStreams; ++i) {
1178        if (streamMask & indexToType(i)) {
1179            const AString &uriKey = mStreams[i].uriKey();
1180            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1181            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1182        }
1183    }
1184
1185    // Determine which decoders to shutdown on the player side,
1186    // a decoder has to be shutdown if either
1187    // 1) its streamtype was active before but now longer isn't.
1188    // or
1189    // 2) its streamtype was already active and still is but the URI
1190    //    has changed.
1191    uint32_t changedMask = 0;
1192    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1193        if (((mStreamMask & streamMask & indexToType(i))
1194                && !(URIs[i] == mStreams[i].mUri))
1195                || (mStreamMask & ~streamMask & indexToType(i))) {
1196            changedMask |= indexToType(i);
1197        }
1198    }
1199
1200    if (changedMask == 0) {
1201        // If nothing changed as far as the audio/video decoders
1202        // are concerned we can proceed.
1203        onChangeConfiguration3(msg);
1204        return;
1205    }
1206
1207    // Something changed, inform the player which will shutdown the
1208    // corresponding decoders and will post the reply once that's done.
1209    // Handling the reply will continue executing below in
1210    // onChangeConfiguration3.
1211    sp<AMessage> notify = mNotify->dup();
1212    notify->setInt32("what", kWhatStreamsChanged);
1213    notify->setInt32("changedMask", changedMask);
1214
1215    msg->setWhat(kWhatChangeConfiguration3);
1216    msg->setTarget(id());
1217
1218    notify->setMessage("reply", msg);
1219    notify->post();
1220}
1221
1222void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1223    mContinuation.clear();
1224    // All remaining fetchers are still suspended, the player has shutdown
1225    // any decoders that needed it.
1226
1227    uint32_t streamMask, resumeMask;
1228    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1229    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1230
1231    for (size_t i = 0; i < kMaxStreams; ++i) {
1232        if (streamMask & indexToType(i)) {
1233            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1234        }
1235    }
1236
1237    int64_t timeUs;
1238    int32_t pickTrack;
1239    bool switching = false;
1240    CHECK(msg->findInt64("timeUs", &timeUs));
1241    CHECK(msg->findInt32("pickTrack", &pickTrack));
1242
1243    if (timeUs < 0ll) {
1244        if (!pickTrack) {
1245            switching = true;
1246        }
1247        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1248    } else {
1249        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1250    }
1251
1252    mNewStreamMask = streamMask | resumeMask;
1253
1254    // Of all existing fetchers:
1255    // * Resume fetchers that are still needed and assign them original packet sources.
1256    // * Mark otherwise unneeded fetchers for removal.
1257    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1258    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1259        const AString &uri = mFetcherInfos.keyAt(i);
1260
1261        sp<AnotherPacketSource> sources[kMaxStreams];
1262        for (size_t j = 0; j < kMaxStreams; ++j) {
1263            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1264                sources[j] = mPacketSources.valueFor(indexToType(j));
1265
1266                if (j != kSubtitleIndex) {
1267                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1268                    sp<AnotherPacketSource> discontinuityQueue;
1269                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1270                    discontinuityQueue->queueDiscontinuity(
1271                            ATSParser::DISCONTINUITY_NONE,
1272                            NULL,
1273                            true);
1274                }
1275            }
1276        }
1277
1278        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1279        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1280                || sources[kSubtitleIndex] != NULL) {
1281            info.mFetcher->startAsync(
1282                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1283        } else {
1284            info.mToBeRemoved = true;
1285        }
1286    }
1287
1288    // streamMask now only contains the types that need a new fetcher created.
1289
1290    if (streamMask != 0) {
1291        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1292    }
1293
1294    // Find out when the original fetchers have buffered up to and start the new fetchers
1295    // at a later timestamp.
1296    for (size_t i = 0; i < kMaxStreams; i++) {
1297        if (!(indexToType(i) & streamMask)) {
1298            continue;
1299        }
1300
1301        AString uri;
1302        uri = mStreams[i].mUri;
1303
1304        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1305        CHECK(fetcher != NULL);
1306
1307        int32_t latestSeq = -1;
1308        int64_t startTimeUs = -1;
1309        int64_t segmentStartTimeUs = -1ll;
1310        int32_t discontinuitySeq = -1;
1311        sp<AnotherPacketSource> sources[kMaxStreams];
1312
1313        // TRICKY: looping from i as earlier streams are already removed from streamMask
1314        for (size_t j = i; j < kMaxStreams; ++j) {
1315            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1316                sources[j] = mPacketSources.valueFor(indexToType(j));
1317
1318                if (timeUs >= 0) {
1319                    sources[j]->clear();
1320                    startTimeUs = timeUs;
1321
1322                    sp<AnotherPacketSource> discontinuityQueue;
1323                    sp<AMessage> extra = new AMessage;
1324                    extra->setInt64("timeUs", timeUs);
1325                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1326                    discontinuityQueue->queueDiscontinuity(
1327                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1328                } else {
1329                    int32_t type;
1330                    int64_t srcSegmentStartTimeUs;
1331                    sp<AMessage> meta;
1332                    if (pickTrack) {
1333                        // selecting
1334                        meta = sources[j]->getLatestDequeuedMeta();
1335                    } else {
1336                        // adapting
1337                        meta = sources[j]->getLatestEnqueuedMeta();
1338                    }
1339
1340                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1341                        int64_t tmpUs;
1342                        CHECK(meta->findInt64("timeUs", &tmpUs));
1343                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1344                            startTimeUs = tmpUs;
1345                        }
1346
1347                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1348                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1349                            segmentStartTimeUs = tmpUs;
1350                        }
1351
1352                        int32_t seq;
1353                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1354                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1355                            discontinuitySeq = seq;
1356                        }
1357                    }
1358
1359                    if (pickTrack) {
1360                        // selecting track, queue discontinuities before content
1361                        sources[j]->clear();
1362                        if (j == kSubtitleIndex) {
1363                            break;
1364                        }
1365                        sp<AnotherPacketSource> discontinuityQueue;
1366                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1367                        discontinuityQueue->queueDiscontinuity(
1368                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1369                    } else {
1370                        // adapting, queue discontinuities after resume
1371                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1372                        sources[j]->clear();
1373                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1374                        if (extraStreams & indexToType(j)) {
1375                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1376                        }
1377                    }
1378                }
1379
1380                streamMask &= ~indexToType(j);
1381            }
1382        }
1383
1384        fetcher->startAsync(
1385                sources[kAudioIndex],
1386                sources[kVideoIndex],
1387                sources[kSubtitleIndex],
1388                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1389                segmentStartTimeUs,
1390                discontinuitySeq,
1391                switching);
1392    }
1393
1394    // All fetchers have now been started, the configuration change
1395    // has completed.
1396
1397    scheduleCheckBandwidthEvent();
1398
1399    ALOGV("XXX configuration change completed.");
1400    mReconfigurationInProgress = false;
1401    if (switching) {
1402        mSwitchInProgress = true;
1403        mSwapMask = streamMask;
1404    } else {
1405        mStreamMask = mNewStreamMask;
1406    }
1407
1408    if (mDisconnectReplyID != 0) {
1409        finishDisconnect();
1410    }
1411}
1412
1413void LiveSession::onSwapped(const sp<AMessage> &msg) {
1414    int32_t switchGeneration;
1415    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1416    if (switchGeneration != mSwitchGeneration) {
1417        return;
1418    }
1419
1420    int32_t stream;
1421    CHECK(msg->findInt32("stream", &stream));
1422    mSwapMask &= ~stream;
1423    if (mSwapMask != 0) {
1424        return;
1425    }
1426
1427    // Check if new variant contains extra streams.
1428    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1429    while (extraStreams) {
1430        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1431        swapPacketSource(extraStream);
1432        extraStreams &= ~extraStream;
1433    }
1434
1435    tryToFinishBandwidthSwitch();
1436}
1437
1438// Mark switch done when:
1439//   1. all old buffers are swapped out
1440void LiveSession::tryToFinishBandwidthSwitch() {
1441    if (!mSwitchInProgress) {
1442        return;
1443    }
1444
1445    bool needToRemoveFetchers = false;
1446    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1447        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1448            needToRemoveFetchers = true;
1449            break;
1450        }
1451    }
1452
1453    if (!needToRemoveFetchers && mSwapMask == 0) {
1454        ALOGI("mSwitchInProgress = false");
1455        mStreamMask = mNewStreamMask;
1456        mSwitchInProgress = false;
1457    }
1458}
1459
1460void LiveSession::scheduleCheckBandwidthEvent() {
1461    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1462    msg->setInt32("generation", mCheckBandwidthGeneration);
1463    msg->post(10000000ll);
1464}
1465
1466void LiveSession::cancelCheckBandwidthEvent() {
1467    ++mCheckBandwidthGeneration;
1468}
1469
1470void LiveSession::cancelBandwidthSwitch() {
1471    Mutex::Autolock lock(mSwapMutex);
1472    mSwitchGeneration++;
1473    mSwitchInProgress = false;
1474    mSwapMask = 0;
1475}
1476
1477bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1478    if (mReconfigurationInProgress || mSwitchInProgress) {
1479        return false;
1480    }
1481
1482    if (mCurBandwidthIndex < 0) {
1483        return true;
1484    }
1485
1486    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1487        return false;
1488    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1489        return canSwitchUp();
1490    } else {
1491        return true;
1492    }
1493}
1494
1495void LiveSession::onCheckBandwidth() {
1496    size_t bandwidthIndex = getBandwidthIndex();
1497    if (canSwitchBandwidthTo(bandwidthIndex)) {
1498        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1499    } else {
1500        scheduleCheckBandwidthEvent();
1501    }
1502
1503    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1504    // schedule another one on return, only an explicit call to
1505    // scheduleCheckBandwidthEvent will do that.
1506    // This ensures that only one configuration change is ongoing at any
1507    // one time, once that completes it'll schedule another check bandwidth
1508    // event.
1509}
1510
1511void LiveSession::postPrepared(status_t err) {
1512    CHECK(mInPreparationPhase);
1513
1514    sp<AMessage> notify = mNotify->dup();
1515    if (err == OK || err == ERROR_END_OF_STREAM) {
1516        notify->setInt32("what", kWhatPrepared);
1517    } else {
1518        notify->setInt32("what", kWhatPreparationFailed);
1519        notify->setInt32("err", err);
1520    }
1521
1522    notify->post();
1523
1524    mInPreparationPhase = false;
1525}
1526
1527}  // namespace android
1528
1529