LiveSession.cpp revision 0ad776d2e4c6b4968d9dcd9bf34b962366b312a9
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84    }
85}
86
87LiveSession::~LiveSession() {
88}
89
90sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
91    ABuffer *discontinuity = new ABuffer(0);
92    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
93    discontinuity->meta()->setInt32("swapPacketSource", swap);
94    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
95    discontinuity->meta()->setInt64("timeUs", -1);
96    return discontinuity;
97}
98
99void LiveSession::swapPacketSource(StreamType stream) {
100    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
101    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
102    sp<AnotherPacketSource> tmp = aps;
103    aps = aps2;
104    aps2 = tmp;
105    aps2->clear();
106}
107
108status_t LiveSession::dequeueAccessUnit(
109        StreamType stream, sp<ABuffer> *accessUnit) {
110    if (!(mStreamMask & stream)) {
111        // return -EWOULDBLOCK to avoid halting the decoder
112        // when switching between audio/video and audio only.
113        return -EWOULDBLOCK;
114    }
115
116    status_t finalResult;
117    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
118    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
119        discontinuityQueue->dequeueAccessUnit(accessUnit);
120        // seeking, track switching
121        sp<AMessage> extra;
122        int64_t timeUs;
123        if ((*accessUnit)->meta()->findMessage("extra", &extra)
124                && extra != NULL
125                && extra->findInt64("timeUs", &timeUs)) {
126            // seeking only
127            mLastSeekTimeUs = timeUs;
128            mDiscontinuityOffsetTimesUs.clear();
129            mDiscontinuityAbsStartTimesUs.clear();
130        }
131        return INFO_DISCONTINUITY;
132    }
133
134    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
135
136    if (!packetSource->hasBufferAvailable(&finalResult)) {
137        return finalResult == OK ? -EAGAIN : finalResult;
138    }
139
140    // wait for counterpart
141    sp<AnotherPacketSource> otherSource;
142    if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) {
143        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
144    } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) {
145        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
146    }
147    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
148        return finalResult == OK ? -EAGAIN : finalResult;
149    }
150
151    status_t err = packetSource->dequeueAccessUnit(accessUnit);
152
153    size_t streamIdx;
154    const char *streamStr;
155    switch (stream) {
156        case STREAMTYPE_AUDIO:
157            streamIdx = kAudioIndex;
158            streamStr = "audio";
159            break;
160        case STREAMTYPE_VIDEO:
161            streamIdx = kVideoIndex;
162            streamStr = "video";
163            break;
164        case STREAMTYPE_SUBTITLES:
165            streamIdx = kSubtitleIndex;
166            streamStr = "subs";
167            break;
168        default:
169            TRESPASS();
170    }
171
172    StreamItem& strm = mStreams[streamIdx];
173    if (err == INFO_DISCONTINUITY) {
174        // adaptive streaming, discontinuities in the playlist
175        int32_t type;
176        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
177
178        sp<AMessage> extra;
179        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
180            extra.clear();
181        }
182
183        ALOGI("[%s] read discontinuity of type %d, extra = %s",
184              streamStr,
185              type,
186              extra == NULL ? "NULL" : extra->debugString().c_str());
187
188        int32_t swap;
189        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
190            int32_t switchGeneration;
191            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
192            {
193                Mutex::Autolock lock(mSwapMutex);
194                if (switchGeneration == mSwitchGeneration) {
195                    swapPacketSource(stream);
196                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
197                    msg->setInt32("stream", stream);
198                    msg->setInt32("switchGeneration", switchGeneration);
199                    msg->post();
200                }
201            }
202        } else {
203            size_t seq = strm.mCurDiscontinuitySeq;
204            int64_t offsetTimeUs;
205            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
206                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
207            } else {
208                offsetTimeUs = 0;
209            }
210
211            seq += 1;
212            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
213                int64_t firstTimeUs;
214                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
215                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
216                offsetTimeUs += strm.mLastSampleDurationUs;
217            } else {
218                offsetTimeUs += strm.mLastSampleDurationUs;
219            }
220
221            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
222        }
223    } else if (err == OK) {
224
225        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
226            int64_t timeUs;
227            int32_t discontinuitySeq = 0;
228            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
229            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
230            strm.mCurDiscontinuitySeq = discontinuitySeq;
231
232            int32_t discard = 0;
233            int64_t firstTimeUs;
234            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
235                int64_t durUs; // approximate sample duration
236                if (timeUs > strm.mLastDequeuedTimeUs) {
237                    durUs = timeUs - strm.mLastDequeuedTimeUs;
238                } else {
239                    durUs = strm.mLastDequeuedTimeUs - timeUs;
240                }
241                strm.mLastSampleDurationUs = durUs;
242                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
243            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
244                firstTimeUs = timeUs;
245            } else {
246                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
247                firstTimeUs = timeUs;
248            }
249
250            strm.mLastDequeuedTimeUs = timeUs;
251            if (timeUs >= firstTimeUs) {
252                timeUs -= firstTimeUs;
253            } else {
254                timeUs = 0;
255            }
256            timeUs += mLastSeekTimeUs;
257            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
258                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
259            }
260
261            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
262            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
263            mLastDequeuedTimeUs = timeUs;
264            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
265        } else if (stream == STREAMTYPE_SUBTITLES) {
266            (*accessUnit)->meta()->setInt32(
267                    "trackIndex", mPlaylist->getSelectedIndex());
268            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
269        }
270    } else {
271        ALOGI("[%s] encountered error %d", streamStr, err);
272    }
273
274    return err;
275}
276
277status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
278    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
279    if (!(mStreamMask & stream)) {
280        return UNKNOWN_ERROR;
281    }
282
283    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
284
285    sp<MetaData> meta = packetSource->getFormat();
286
287    if (meta == NULL) {
288        return -EAGAIN;
289    }
290
291    return convertMetaDataToMessage(meta, format);
292}
293
294void LiveSession::connectAsync(
295        const char *url, const KeyedVector<String8, String8> *headers) {
296    sp<AMessage> msg = new AMessage(kWhatConnect, id());
297    msg->setString("url", url);
298
299    if (headers != NULL) {
300        msg->setPointer(
301                "headers",
302                new KeyedVector<String8, String8>(*headers));
303    }
304
305    msg->post();
306}
307
308status_t LiveSession::disconnect() {
309    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
310
311    sp<AMessage> response;
312    status_t err = msg->postAndAwaitResponse(&response);
313
314    return err;
315}
316
317status_t LiveSession::seekTo(int64_t timeUs) {
318    sp<AMessage> msg = new AMessage(kWhatSeek, id());
319    msg->setInt64("timeUs", timeUs);
320
321    sp<AMessage> response;
322    status_t err = msg->postAndAwaitResponse(&response);
323
324    uint32_t replyID;
325    CHECK(response == mSeekReply && 0 != mSeekReplyID);
326    mSeekReply.clear();
327    mSeekReplyID = 0;
328    return err;
329}
330
331void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
332    switch (msg->what()) {
333        case kWhatConnect:
334        {
335            onConnect(msg);
336            break;
337        }
338
339        case kWhatDisconnect:
340        {
341            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
342
343            if (mReconfigurationInProgress) {
344                break;
345            }
346
347            finishDisconnect();
348            break;
349        }
350
351        case kWhatSeek:
352        {
353            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
354
355            status_t err = onSeek(msg);
356
357            mSeekReply = new AMessage;
358            mSeekReply->setInt32("err", err);
359            break;
360        }
361
362        case kWhatFetcherNotify:
363        {
364            int32_t what;
365            CHECK(msg->findInt32("what", &what));
366
367            switch (what) {
368                case PlaylistFetcher::kWhatStarted:
369                    break;
370                case PlaylistFetcher::kWhatPaused:
371                case PlaylistFetcher::kWhatStopped:
372                {
373                    if (what == PlaylistFetcher::kWhatStopped) {
374                        AString uri;
375                        CHECK(msg->findString("uri", &uri));
376                        if (mFetcherInfos.removeItem(uri) < 0) {
377                            // ignore duplicated kWhatStopped messages.
378                            break;
379                        }
380
381                        if (mSwitchInProgress) {
382                            tryToFinishBandwidthSwitch();
383                        }
384                    }
385
386                    if (mContinuation != NULL) {
387                        CHECK_GT(mContinuationCounter, 0);
388                        if (--mContinuationCounter == 0) {
389                            mContinuation->post();
390
391                            if (mSeekReplyID != 0) {
392                                CHECK(mSeekReply != NULL);
393                                mSeekReply->postReply(mSeekReplyID);
394                            }
395                        }
396                    }
397                    break;
398                }
399
400                case PlaylistFetcher::kWhatDurationUpdate:
401                {
402                    AString uri;
403                    CHECK(msg->findString("uri", &uri));
404
405                    int64_t durationUs;
406                    CHECK(msg->findInt64("durationUs", &durationUs));
407
408                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
409                    info->mDurationUs = durationUs;
410                    break;
411                }
412
413                case PlaylistFetcher::kWhatError:
414                {
415                    status_t err;
416                    CHECK(msg->findInt32("err", &err));
417
418                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
419
420                    if (mInPreparationPhase) {
421                        postPrepared(err);
422                    }
423
424                    cancelBandwidthSwitch();
425
426                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
427
428                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
429
430                    mPacketSources.valueFor(
431                            STREAMTYPE_SUBTITLES)->signalEOS(err);
432
433                    sp<AMessage> notify = mNotify->dup();
434                    notify->setInt32("what", kWhatError);
435                    notify->setInt32("err", err);
436                    notify->post();
437                    break;
438                }
439
440                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
441                {
442                    AString uri;
443                    CHECK(msg->findString("uri", &uri));
444
445                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
446                    info->mIsPrepared = true;
447
448                    if (mInPreparationPhase) {
449                        bool allFetchersPrepared = true;
450                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
451                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
452                                allFetchersPrepared = false;
453                                break;
454                            }
455                        }
456
457                        if (allFetchersPrepared) {
458                            postPrepared(OK);
459                        }
460                    }
461                    break;
462                }
463
464                case PlaylistFetcher::kWhatStartedAt:
465                {
466                    int32_t switchGeneration;
467                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
468
469                    if (switchGeneration != mSwitchGeneration) {
470                        break;
471                    }
472
473                    // Resume fetcher for the original variant; the resumed fetcher should
474                    // continue until the timestamps found in msg, which is stored by the
475                    // new fetcher to indicate where the new variant has started buffering.
476                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
477                        const FetcherInfo info = mFetcherInfos.valueAt(i);
478                        if (info.mToBeRemoved) {
479                            info.mFetcher->resumeUntilAsync(msg);
480                        }
481                    }
482                    break;
483                }
484
485                default:
486                    TRESPASS();
487            }
488
489            break;
490        }
491
492        case kWhatCheckBandwidth:
493        {
494            int32_t generation;
495            CHECK(msg->findInt32("generation", &generation));
496
497            if (generation != mCheckBandwidthGeneration) {
498                break;
499            }
500
501            onCheckBandwidth();
502            break;
503        }
504
505        case kWhatChangeConfiguration:
506        {
507            onChangeConfiguration(msg);
508            break;
509        }
510
511        case kWhatChangeConfiguration2:
512        {
513            onChangeConfiguration2(msg);
514            break;
515        }
516
517        case kWhatChangeConfiguration3:
518        {
519            onChangeConfiguration3(msg);
520            break;
521        }
522
523        case kWhatFinishDisconnect2:
524        {
525            onFinishDisconnect2();
526            break;
527        }
528
529        case kWhatSwapped:
530        {
531            onSwapped(msg);
532            break;
533        }
534
535        case kWhatCheckSwitchDown:
536        {
537            onCheckSwitchDown();
538            break;
539        }
540
541        case kWhatSwitchDown:
542        {
543            onSwitchDown();
544            break;
545        }
546
547        default:
548            TRESPASS();
549            break;
550    }
551}
552
553// static
554int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
555    if (a->mBandwidth < b->mBandwidth) {
556        return -1;
557    } else if (a->mBandwidth == b->mBandwidth) {
558        return 0;
559    }
560
561    return 1;
562}
563
564// static
565LiveSession::StreamType LiveSession::indexToType(int idx) {
566    CHECK(idx >= 0 && idx < kMaxStreams);
567    return (StreamType)(1 << idx);
568}
569
570void LiveSession::onConnect(const sp<AMessage> &msg) {
571    AString url;
572    CHECK(msg->findString("url", &url));
573
574    KeyedVector<String8, String8> *headers = NULL;
575    if (!msg->findPointer("headers", (void **)&headers)) {
576        mExtraHeaders.clear();
577    } else {
578        mExtraHeaders = *headers;
579
580        delete headers;
581        headers = NULL;
582    }
583
584    // TODO currently we don't know if we are coming here from incognito mode
585    ALOGI("onConnect %s", uriDebugString(url).c_str());
586
587    mMasterURL = url;
588
589    bool dummy;
590    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
591
592    if (mPlaylist == NULL) {
593        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
594
595        postPrepared(ERROR_IO);
596        return;
597    }
598
599    // We trust the content provider to make a reasonable choice of preferred
600    // initial bandwidth by listing it first in the variant playlist.
601    // At startup we really don't have a good estimate on the available
602    // network bandwidth since we haven't tranferred any data yet. Once
603    // we have we can make a better informed choice.
604    size_t initialBandwidth = 0;
605    size_t initialBandwidthIndex = 0;
606
607    if (mPlaylist->isVariantPlaylist()) {
608        for (size_t i = 0; i < mPlaylist->size(); ++i) {
609            BandwidthItem item;
610
611            item.mPlaylistIndex = i;
612
613            sp<AMessage> meta;
614            AString uri;
615            mPlaylist->itemAt(i, &uri, &meta);
616
617            unsigned long bandwidth;
618            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
619
620            if (initialBandwidth == 0) {
621                initialBandwidth = item.mBandwidth;
622            }
623
624            mBandwidthItems.push(item);
625        }
626
627        CHECK_GT(mBandwidthItems.size(), 0u);
628
629        mBandwidthItems.sort(SortByBandwidth);
630
631        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
632            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
633                initialBandwidthIndex = i;
634                break;
635            }
636        }
637    } else {
638        // dummy item.
639        BandwidthItem item;
640        item.mPlaylistIndex = 0;
641        item.mBandwidth = 0;
642        mBandwidthItems.push(item);
643    }
644
645    mPlaylist->pickRandomMediaItems();
646    changeConfiguration(
647            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
648}
649
650void LiveSession::finishDisconnect() {
651    // No reconfiguration is currently pending, make sure none will trigger
652    // during disconnection either.
653    cancelCheckBandwidthEvent();
654
655    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
656    // (finishDisconnect, onFinishDisconnect2)
657    cancelBandwidthSwitch();
658
659    // cancel switch down monitor
660    mSwitchDownMonitor.clear();
661
662    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
663        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
664    }
665
666    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
667
668    mContinuationCounter = mFetcherInfos.size();
669    mContinuation = msg;
670
671    if (mContinuationCounter == 0) {
672        msg->post();
673    }
674}
675
676void LiveSession::onFinishDisconnect2() {
677    mContinuation.clear();
678
679    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
680    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
681
682    mPacketSources.valueFor(
683            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
684
685    sp<AMessage> response = new AMessage;
686    response->setInt32("err", OK);
687
688    response->postReply(mDisconnectReplyID);
689    mDisconnectReplyID = 0;
690}
691
692sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
693    ssize_t index = mFetcherInfos.indexOfKey(uri);
694
695    if (index >= 0) {
696        return NULL;
697    }
698
699    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
700    notify->setString("uri", uri);
701    notify->setInt32("switchGeneration", mSwitchGeneration);
702
703    FetcherInfo info;
704    info.mFetcher = new PlaylistFetcher(notify, this, uri);
705    info.mDurationUs = -1ll;
706    info.mIsPrepared = false;
707    info.mToBeRemoved = false;
708    looper()->registerHandler(info.mFetcher);
709
710    mFetcherInfos.add(uri, info);
711
712    return info.mFetcher;
713}
714
715/*
716 * Illustration of parameters:
717 *
718 * 0      `range_offset`
719 * +------------+-------------------------------------------------------+--+--+
720 * |            |                                 | next block to fetch |  |  |
721 * |            | `source` handle => `out` buffer |                     |  |  |
722 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
723 * |            |<----------- `range_length` / buffer capacity ----------->|  |
724 * |<------------------------------ file_size ------------------------------->|
725 *
726 * Special parameter values:
727 * - range_length == -1 means entire file
728 * - block_size == 0 means entire range
729 *
730 */
731ssize_t LiveSession::fetchFile(
732        const char *url, sp<ABuffer> *out,
733        int64_t range_offset, int64_t range_length,
734        uint32_t block_size, /* download block size */
735        sp<DataSource> *source, /* to return and reuse source */
736        String8 *actualUrl) {
737    off64_t size;
738    sp<DataSource> temp_source;
739    if (source == NULL) {
740        source = &temp_source;
741    }
742
743    if (*source == NULL) {
744        if (!strncasecmp(url, "file://", 7)) {
745            *source = new FileSource(url + 7);
746        } else if (strncasecmp(url, "http://", 7)
747                && strncasecmp(url, "https://", 8)) {
748            return ERROR_UNSUPPORTED;
749        } else {
750            KeyedVector<String8, String8> headers = mExtraHeaders;
751            if (range_offset > 0 || range_length >= 0) {
752                headers.add(
753                        String8("Range"),
754                        String8(
755                            StringPrintf(
756                                "bytes=%lld-%s",
757                                range_offset,
758                                range_length < 0
759                                    ? "" : StringPrintf("%lld",
760                                            range_offset + range_length - 1).c_str()).c_str()));
761            }
762            status_t err = mHTTPDataSource->connect(url, &headers);
763
764            if (err != OK) {
765                return err;
766            }
767
768            *source = mHTTPDataSource;
769        }
770    }
771
772    status_t getSizeErr = (*source)->getSize(&size);
773    if (getSizeErr != OK) {
774        size = 65536;
775    }
776
777    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
778    if (*out == NULL) {
779        buffer->setRange(0, 0);
780    }
781
782    ssize_t bytesRead = 0;
783    // adjust range_length if only reading partial block
784    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
785        range_length = buffer->size() + block_size;
786    }
787    for (;;) {
788        // Only resize when we don't know the size.
789        size_t bufferRemaining = buffer->capacity() - buffer->size();
790        if (bufferRemaining == 0 && getSizeErr != OK) {
791            bufferRemaining = 32768;
792
793            ALOGV("increasing download buffer to %zu bytes",
794                 buffer->size() + bufferRemaining);
795
796            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
797            memcpy(copy->data(), buffer->data(), buffer->size());
798            copy->setRange(0, buffer->size());
799
800            buffer = copy;
801        }
802
803        size_t maxBytesToRead = bufferRemaining;
804        if (range_length >= 0) {
805            int64_t bytesLeftInRange = range_length - buffer->size();
806            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
807                maxBytesToRead = bytesLeftInRange;
808
809                if (bytesLeftInRange == 0) {
810                    break;
811                }
812            }
813        }
814
815        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
816        // to help us break out of the loop.
817        ssize_t n = (*source)->readAt(
818                buffer->size(), buffer->data() + buffer->size(),
819                maxBytesToRead);
820
821        if (n < 0) {
822            return n;
823        }
824
825        if (n == 0) {
826            break;
827        }
828
829        buffer->setRange(0, buffer->size() + (size_t)n);
830        bytesRead += n;
831    }
832
833    *out = buffer;
834    if (actualUrl != NULL) {
835        *actualUrl = (*source)->getUri();
836        if (actualUrl->isEmpty()) {
837            *actualUrl = url;
838        }
839    }
840
841    return bytesRead;
842}
843
844sp<M3UParser> LiveSession::fetchPlaylist(
845        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
846    ALOGV("fetchPlaylist '%s'", url);
847
848    *unchanged = false;
849
850    sp<ABuffer> buffer;
851    String8 actualUrl;
852    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
853
854    if (err <= 0) {
855        return NULL;
856    }
857
858    // MD5 functionality is not available on the simulator, treat all
859    // playlists as changed.
860
861#if defined(HAVE_ANDROID_OS)
862    uint8_t hash[16];
863
864    MD5_CTX m;
865    MD5_Init(&m);
866    MD5_Update(&m, buffer->data(), buffer->size());
867
868    MD5_Final(hash, &m);
869
870    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
871        // playlist unchanged
872        *unchanged = true;
873
874        return NULL;
875    }
876
877    if (curPlaylistHash != NULL) {
878        memcpy(curPlaylistHash, hash, sizeof(hash));
879    }
880#endif
881
882    sp<M3UParser> playlist =
883        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
884
885    if (playlist->initCheck() != OK) {
886        ALOGE("failed to parse .m3u8 playlist");
887
888        return NULL;
889    }
890
891    return playlist;
892}
893
894static double uniformRand() {
895    return (double)rand() / RAND_MAX;
896}
897
898size_t LiveSession::getBandwidthIndex() {
899    if (mBandwidthItems.size() == 0) {
900        return 0;
901    }
902
903#if 1
904    char value[PROPERTY_VALUE_MAX];
905    ssize_t index = -1;
906    if (property_get("media.httplive.bw-index", value, NULL)) {
907        char *end;
908        index = strtol(value, &end, 10);
909        CHECK(end > value && *end == '\0');
910
911        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
912            index = mBandwidthItems.size() - 1;
913        }
914    }
915
916    if (index < 0) {
917        int32_t bandwidthBps;
918        if (mHTTPDataSource != NULL
919                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
920            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
921        } else {
922            ALOGV("no bandwidth estimate.");
923            return 0;  // Pick the lowest bandwidth stream by default.
924        }
925
926        char value[PROPERTY_VALUE_MAX];
927        if (property_get("media.httplive.max-bw", value, NULL)) {
928            char *end;
929            long maxBw = strtoul(value, &end, 10);
930            if (end > value && *end == '\0') {
931                if (maxBw > 0 && bandwidthBps > maxBw) {
932                    ALOGV("bandwidth capped to %ld bps", maxBw);
933                    bandwidthBps = maxBw;
934                }
935            }
936        }
937
938        // Consider only 80% of the available bandwidth usable.
939        bandwidthBps = (bandwidthBps * 8) / 10;
940
941        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
942
943        index = mBandwidthItems.size() - 1;
944        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
945                                > (size_t)bandwidthBps) {
946            --index;
947        }
948    }
949#elif 0
950    // Change bandwidth at random()
951    size_t index = uniformRand() * mBandwidthItems.size();
952#elif 0
953    // There's a 50% chance to stay on the current bandwidth and
954    // a 50% chance to switch to the next higher bandwidth (wrapping around
955    // to lowest)
956    const size_t kMinIndex = 0;
957
958    static ssize_t mCurBandwidthIndex = -1;
959
960    size_t index;
961    if (mCurBandwidthIndex < 0) {
962        index = kMinIndex;
963    } else if (uniformRand() < 0.5) {
964        index = (size_t)mCurBandwidthIndex;
965    } else {
966        index = mCurBandwidthIndex + 1;
967        if (index == mBandwidthItems.size()) {
968            index = kMinIndex;
969        }
970    }
971    mCurBandwidthIndex = index;
972#elif 0
973    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
974
975    size_t index = mBandwidthItems.size() - 1;
976    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
977        --index;
978    }
979#elif 1
980    char value[PROPERTY_VALUE_MAX];
981    size_t index;
982    if (property_get("media.httplive.bw-index", value, NULL)) {
983        char *end;
984        index = strtoul(value, &end, 10);
985        CHECK(end > value && *end == '\0');
986
987        if (index >= mBandwidthItems.size()) {
988            index = mBandwidthItems.size() - 1;
989        }
990    } else {
991        index = 0;
992    }
993#else
994    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
995#endif
996
997    CHECK_GE(index, 0);
998
999    return index;
1000}
1001
1002status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1003    int64_t timeUs;
1004    CHECK(msg->findInt64("timeUs", &timeUs));
1005
1006    if (!mReconfigurationInProgress) {
1007        changeConfiguration(timeUs, getBandwidthIndex());
1008    }
1009
1010    return OK;
1011}
1012
1013status_t LiveSession::getDuration(int64_t *durationUs) const {
1014    int64_t maxDurationUs = 0ll;
1015    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1016        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1017
1018        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1019            maxDurationUs = fetcherDurationUs;
1020        }
1021    }
1022
1023    *durationUs = maxDurationUs;
1024
1025    return OK;
1026}
1027
1028bool LiveSession::isSeekable() const {
1029    int64_t durationUs;
1030    return getDuration(&durationUs) == OK && durationUs >= 0;
1031}
1032
1033bool LiveSession::hasDynamicDuration() const {
1034    return false;
1035}
1036
1037size_t LiveSession::getTrackCount() const {
1038    if (mPlaylist == NULL) {
1039        return 0;
1040    } else {
1041        return mPlaylist->getTrackCount();
1042    }
1043}
1044
1045sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1046    if (mPlaylist == NULL) {
1047        return NULL;
1048    } else {
1049        return mPlaylist->getTrackInfo(trackIndex);
1050    }
1051}
1052
1053status_t LiveSession::selectTrack(size_t index, bool select) {
1054    status_t err = mPlaylist->selectTrack(index, select);
1055    if (err == OK) {
1056        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1057        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1058        msg->setInt32("pickTrack", select);
1059        msg->post();
1060    }
1061    return err;
1062}
1063
1064bool LiveSession::canSwitchUp() {
1065    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1066    status_t err = OK;
1067    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1068        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1069        int64_t dur = source->getBufferedDurationUs(&err);
1070        if (err == OK && dur > 10000000) {
1071            return true;
1072        }
1073    }
1074    return false;
1075}
1076
1077void LiveSession::changeConfiguration(
1078        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1079    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1080    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1081    cancelBandwidthSwitch();
1082
1083    CHECK(!mReconfigurationInProgress);
1084    mReconfigurationInProgress = true;
1085
1086    mCurBandwidthIndex = bandwidthIndex;
1087
1088    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1089          timeUs, bandwidthIndex, pickTrack);
1090
1091    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1092    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1093
1094    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1095    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1096
1097    AString URIs[kMaxStreams];
1098    for (size_t i = 0; i < kMaxStreams; ++i) {
1099        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1100            streamMask |= indexToType(i);
1101        }
1102    }
1103
1104    // Step 1, stop and discard fetchers that are no longer needed.
1105    // Pause those that we'll reuse.
1106    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1107        const AString &uri = mFetcherInfos.keyAt(i);
1108
1109        bool discardFetcher = true;
1110
1111        // If we're seeking all current fetchers are discarded.
1112        if (timeUs < 0ll) {
1113            // delay fetcher removal if not picking tracks
1114            discardFetcher = pickTrack;
1115
1116            for (size_t j = 0; j < kMaxStreams; ++j) {
1117                StreamType type = indexToType(j);
1118                if ((streamMask & type) && uri == URIs[j]) {
1119                    resumeMask |= type;
1120                    streamMask &= ~type;
1121                    discardFetcher = false;
1122                }
1123            }
1124        }
1125
1126        if (discardFetcher) {
1127            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1128        } else {
1129            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1130        }
1131    }
1132
1133    sp<AMessage> msg;
1134    if (timeUs < 0ll) {
1135        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1136        msg = new AMessage(kWhatChangeConfiguration3, id());
1137    } else {
1138        msg = new AMessage(kWhatChangeConfiguration2, id());
1139    }
1140    msg->setInt32("streamMask", streamMask);
1141    msg->setInt32("resumeMask", resumeMask);
1142    msg->setInt32("pickTrack", pickTrack);
1143    msg->setInt64("timeUs", timeUs);
1144    for (size_t i = 0; i < kMaxStreams; ++i) {
1145        if ((streamMask | resumeMask) & indexToType(i)) {
1146            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1147        }
1148    }
1149
1150    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1151    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1152    // fetchers have completed their asynchronous operation, we'll post
1153    // mContinuation, which then is handled below in onChangeConfiguration2.
1154    mContinuationCounter = mFetcherInfos.size();
1155    mContinuation = msg;
1156
1157    if (mContinuationCounter == 0) {
1158        msg->post();
1159
1160        if (mSeekReplyID != 0) {
1161            CHECK(mSeekReply != NULL);
1162            mSeekReply->postReply(mSeekReplyID);
1163        }
1164    }
1165}
1166
1167void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1168    if (!mReconfigurationInProgress) {
1169        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1170        msg->findInt32("pickTrack", &pickTrack);
1171        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1172        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1173    } else {
1174        msg->post(1000000ll); // retry in 1 sec
1175    }
1176}
1177
1178void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1179    mContinuation.clear();
1180
1181    // All fetchers are either suspended or have been removed now.
1182
1183    uint32_t streamMask, resumeMask;
1184    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1185    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1186
1187    // currently onChangeConfiguration2 is only called for seeking;
1188    // remove the following CHECK if using it else where.
1189    CHECK_EQ(resumeMask, 0);
1190    streamMask |= resumeMask;
1191
1192    AString URIs[kMaxStreams];
1193    for (size_t i = 0; i < kMaxStreams; ++i) {
1194        if (streamMask & indexToType(i)) {
1195            const AString &uriKey = mStreams[i].uriKey();
1196            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1197            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1198        }
1199    }
1200
1201    // Determine which decoders to shutdown on the player side,
1202    // a decoder has to be shutdown if either
1203    // 1) its streamtype was active before but now longer isn't.
1204    // or
1205    // 2) its streamtype was already active and still is but the URI
1206    //    has changed.
1207    uint32_t changedMask = 0;
1208    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1209        if (((mStreamMask & streamMask & indexToType(i))
1210                && !(URIs[i] == mStreams[i].mUri))
1211                || (mStreamMask & ~streamMask & indexToType(i))) {
1212            changedMask |= indexToType(i);
1213        }
1214    }
1215
1216    if (changedMask == 0) {
1217        // If nothing changed as far as the audio/video decoders
1218        // are concerned we can proceed.
1219        onChangeConfiguration3(msg);
1220        return;
1221    }
1222
1223    // Something changed, inform the player which will shutdown the
1224    // corresponding decoders and will post the reply once that's done.
1225    // Handling the reply will continue executing below in
1226    // onChangeConfiguration3.
1227    sp<AMessage> notify = mNotify->dup();
1228    notify->setInt32("what", kWhatStreamsChanged);
1229    notify->setInt32("changedMask", changedMask);
1230
1231    msg->setWhat(kWhatChangeConfiguration3);
1232    msg->setTarget(id());
1233
1234    notify->setMessage("reply", msg);
1235    notify->post();
1236}
1237
1238void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1239    mContinuation.clear();
1240    // All remaining fetchers are still suspended, the player has shutdown
1241    // any decoders that needed it.
1242
1243    uint32_t streamMask, resumeMask;
1244    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1245    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1246
1247    for (size_t i = 0; i < kMaxStreams; ++i) {
1248        if (streamMask & indexToType(i)) {
1249            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1250        }
1251    }
1252
1253    int64_t timeUs;
1254    int32_t pickTrack;
1255    bool switching = false;
1256    CHECK(msg->findInt64("timeUs", &timeUs));
1257    CHECK(msg->findInt32("pickTrack", &pickTrack));
1258
1259    if (timeUs < 0ll) {
1260        if (!pickTrack) {
1261            switching = true;
1262        }
1263        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1264    } else {
1265        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1266    }
1267
1268    mNewStreamMask = streamMask | resumeMask;
1269
1270    // Of all existing fetchers:
1271    // * Resume fetchers that are still needed and assign them original packet sources.
1272    // * Mark otherwise unneeded fetchers for removal.
1273    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1274    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1275        const AString &uri = mFetcherInfos.keyAt(i);
1276
1277        sp<AnotherPacketSource> sources[kMaxStreams];
1278        for (size_t j = 0; j < kMaxStreams; ++j) {
1279            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1280                sources[j] = mPacketSources.valueFor(indexToType(j));
1281
1282                if (j != kSubtitleIndex) {
1283                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1284                    sp<AnotherPacketSource> discontinuityQueue;
1285                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1286                    discontinuityQueue->queueDiscontinuity(
1287                            ATSParser::DISCONTINUITY_NONE,
1288                            NULL,
1289                            true);
1290                }
1291            }
1292        }
1293
1294        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1295        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1296                || sources[kSubtitleIndex] != NULL) {
1297            info.mFetcher->startAsync(
1298                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1299        } else {
1300            info.mToBeRemoved = true;
1301        }
1302    }
1303
1304    // streamMask now only contains the types that need a new fetcher created.
1305
1306    if (streamMask != 0) {
1307        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1308    }
1309
1310    // Find out when the original fetchers have buffered up to and start the new fetchers
1311    // at a later timestamp.
1312    for (size_t i = 0; i < kMaxStreams; i++) {
1313        if (!(indexToType(i) & streamMask)) {
1314            continue;
1315        }
1316
1317        AString uri;
1318        uri = mStreams[i].mUri;
1319
1320        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1321        CHECK(fetcher != NULL);
1322
1323        int32_t latestSeq = -1;
1324        int64_t startTimeUs = -1;
1325        int64_t segmentStartTimeUs = -1ll;
1326        int32_t discontinuitySeq = -1;
1327        sp<AnotherPacketSource> sources[kMaxStreams];
1328
1329        // TRICKY: looping from i as earlier streams are already removed from streamMask
1330        for (size_t j = i; j < kMaxStreams; ++j) {
1331            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1332                sources[j] = mPacketSources.valueFor(indexToType(j));
1333
1334                if (timeUs >= 0) {
1335                    sources[j]->clear();
1336                    startTimeUs = timeUs;
1337
1338                    sp<AnotherPacketSource> discontinuityQueue;
1339                    sp<AMessage> extra = new AMessage;
1340                    extra->setInt64("timeUs", timeUs);
1341                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1342                    discontinuityQueue->queueDiscontinuity(
1343                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1344                } else {
1345                    int32_t type;
1346                    int64_t srcSegmentStartTimeUs;
1347                    sp<AMessage> meta;
1348                    if (pickTrack) {
1349                        // selecting
1350                        meta = sources[j]->getLatestDequeuedMeta();
1351                    } else {
1352                        // adapting
1353                        meta = sources[j]->getLatestEnqueuedMeta();
1354                    }
1355
1356                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1357                        int64_t tmpUs;
1358                        CHECK(meta->findInt64("timeUs", &tmpUs));
1359                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1360                            startTimeUs = tmpUs;
1361                        }
1362
1363                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1364                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1365                            segmentStartTimeUs = tmpUs;
1366                        }
1367
1368                        int32_t seq;
1369                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1370                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1371                            discontinuitySeq = seq;
1372                        }
1373                    }
1374
1375                    if (pickTrack) {
1376                        // selecting track, queue discontinuities before content
1377                        sources[j]->clear();
1378                        if (j == kSubtitleIndex) {
1379                            break;
1380                        }
1381                        sp<AnotherPacketSource> discontinuityQueue;
1382                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1383                        discontinuityQueue->queueDiscontinuity(
1384                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1385                    } else {
1386                        // adapting, queue discontinuities after resume
1387                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1388                        sources[j]->clear();
1389                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1390                        if (extraStreams & indexToType(j)) {
1391                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1392                        }
1393                    }
1394                }
1395
1396                streamMask &= ~indexToType(j);
1397            }
1398        }
1399
1400        fetcher->startAsync(
1401                sources[kAudioIndex],
1402                sources[kVideoIndex],
1403                sources[kSubtitleIndex],
1404                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1405                segmentStartTimeUs,
1406                discontinuitySeq,
1407                switching);
1408    }
1409
1410    // All fetchers have now been started, the configuration change
1411    // has completed.
1412
1413    scheduleCheckBandwidthEvent();
1414
1415    ALOGV("XXX configuration change completed.");
1416    mReconfigurationInProgress = false;
1417    if (switching) {
1418        mSwitchInProgress = true;
1419        mSwapMask = streamMask;
1420    } else {
1421        mStreamMask = mNewStreamMask;
1422    }
1423
1424    if (mDisconnectReplyID != 0) {
1425        finishDisconnect();
1426    }
1427}
1428
1429void LiveSession::onSwapped(const sp<AMessage> &msg) {
1430    int32_t switchGeneration;
1431    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1432    if (switchGeneration != mSwitchGeneration) {
1433        return;
1434    }
1435
1436    int32_t stream;
1437    CHECK(msg->findInt32("stream", &stream));
1438    mSwapMask &= ~stream;
1439    if (mSwapMask != 0) {
1440        return;
1441    }
1442
1443    // Check if new variant contains extra streams.
1444    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1445    while (extraStreams) {
1446        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1447        swapPacketSource(extraStream);
1448        extraStreams &= ~extraStream;
1449    }
1450
1451    tryToFinishBandwidthSwitch();
1452}
1453
1454void LiveSession::onCheckSwitchDown() {
1455    if (mSwitchDownMonitor == NULL) {
1456        return;
1457    }
1458
1459    for (size_t i = 0; i < kMaxStreams; ++i) {
1460        int32_t targetDuration;
1461        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1462        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1463
1464        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1465            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1466            int64_t targetDurationUs = targetDuration * 1000000ll;
1467
1468            if (bufferedDurationUs < targetDurationUs / 3) {
1469                (new AMessage(kWhatSwitchDown, id()))->post();
1470                break;
1471            }
1472        }
1473    }
1474
1475    mSwitchDownMonitor->post(1000000ll);
1476}
1477
1478void LiveSession::onSwitchDown() {
1479    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1480        return;
1481    }
1482
1483    ssize_t bandwidthIndex = getBandwidthIndex();
1484    if (bandwidthIndex < mCurBandwidthIndex) {
1485        changeConfiguration(-1, bandwidthIndex, false);
1486        return;
1487    }
1488
1489    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1490}
1491
1492// Mark switch done when:
1493//   1. all old buffers are swapped out
1494void LiveSession::tryToFinishBandwidthSwitch() {
1495    if (!mSwitchInProgress) {
1496        return;
1497    }
1498
1499    bool needToRemoveFetchers = false;
1500    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1501        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1502            needToRemoveFetchers = true;
1503            break;
1504        }
1505    }
1506
1507    if (!needToRemoveFetchers && mSwapMask == 0) {
1508        ALOGI("mSwitchInProgress = false");
1509        mStreamMask = mNewStreamMask;
1510        mSwitchInProgress = false;
1511    }
1512}
1513
1514void LiveSession::scheduleCheckBandwidthEvent() {
1515    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1516    msg->setInt32("generation", mCheckBandwidthGeneration);
1517    msg->post(10000000ll);
1518}
1519
1520void LiveSession::cancelCheckBandwidthEvent() {
1521    ++mCheckBandwidthGeneration;
1522}
1523
1524void LiveSession::cancelBandwidthSwitch() {
1525    Mutex::Autolock lock(mSwapMutex);
1526    mSwitchGeneration++;
1527    mSwitchInProgress = false;
1528    mSwapMask = 0;
1529}
1530
1531bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1532    if (mReconfigurationInProgress || mSwitchInProgress) {
1533        return false;
1534    }
1535
1536    if (mCurBandwidthIndex < 0) {
1537        return true;
1538    }
1539
1540    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1541        return false;
1542    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1543        return canSwitchUp();
1544    } else {
1545        return true;
1546    }
1547}
1548
1549void LiveSession::onCheckBandwidth() {
1550    size_t bandwidthIndex = getBandwidthIndex();
1551    if (canSwitchBandwidthTo(bandwidthIndex)) {
1552        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1553    } else {
1554        scheduleCheckBandwidthEvent();
1555    }
1556
1557    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1558    // schedule another one on return, only an explicit call to
1559    // scheduleCheckBandwidthEvent will do that.
1560    // This ensures that only one configuration change is ongoing at any
1561    // one time, once that completes it'll schedule another check bandwidth
1562    // event.
1563}
1564
1565void LiveSession::postPrepared(status_t err) {
1566    CHECK(mInPreparationPhase);
1567
1568    sp<AMessage> notify = mNotify->dup();
1569    if (err == OK || err == ERROR_END_OF_STREAM) {
1570        notify->setInt32("what", kWhatPrepared);
1571    } else {
1572        notify->setInt32("what", kWhatPreparationFailed);
1573        notify->setInt32("err", err);
1574    }
1575
1576    notify->post();
1577
1578    mInPreparationPhase = false;
1579
1580    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1581    mSwitchDownMonitor->post();
1582}
1583
1584}  // namespace android
1585
1586