LiveSession.cpp revision 15f8ecfa23b650b3efa8fe841d2be6bd0c9523fb
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mBuffering[i] = false;
85    }
86}
87
88LiveSession::~LiveSession() {
89}
90
91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
92    ABuffer *discontinuity = new ABuffer(0);
93    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
94    discontinuity->meta()->setInt32("swapPacketSource", swap);
95    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
96    discontinuity->meta()->setInt64("timeUs", -1);
97    return discontinuity;
98}
99
100void LiveSession::swapPacketSource(StreamType stream) {
101    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
102    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
103    sp<AnotherPacketSource> tmp = aps;
104    aps = aps2;
105    aps2 = tmp;
106    aps2->clear();
107}
108
109status_t LiveSession::dequeueAccessUnit(
110        StreamType stream, sp<ABuffer> *accessUnit) {
111    if (!(mStreamMask & stream)) {
112        // return -EWOULDBLOCK to avoid halting the decoder
113        // when switching between audio/video and audio only.
114        return -EWOULDBLOCK;
115    }
116
117    status_t finalResult;
118    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
119    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
120        discontinuityQueue->dequeueAccessUnit(accessUnit);
121        // seeking, track switching
122        sp<AMessage> extra;
123        int64_t timeUs;
124        if ((*accessUnit)->meta()->findMessage("extra", &extra)
125                && extra != NULL
126                && extra->findInt64("timeUs", &timeUs)) {
127            // seeking only
128            mLastSeekTimeUs = timeUs;
129            mDiscontinuityOffsetTimesUs.clear();
130            mDiscontinuityAbsStartTimesUs.clear();
131        }
132        return INFO_DISCONTINUITY;
133    }
134
135    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
136
137    ssize_t idx = typeToIndex(stream);
138    if (!packetSource->hasBufferAvailable(&finalResult)) {
139        if (finalResult == OK) {
140            mBuffering[idx] = true;
141            return -EAGAIN;
142        } else {
143            return finalResult;
144        }
145    }
146
147    if (mBuffering[idx]) {
148        if (mSwitchInProgress
149                || packetSource->isFinished(0)
150                || packetSource->getEstimatedDurationUs() > 10000000ll) {
151            mBuffering[idx] = false;
152        }
153    }
154
155    if (mBuffering[idx]) {
156        return -EAGAIN;
157    }
158
159    // wait for counterpart
160    sp<AnotherPacketSource> otherSource;
161    uint32_t mask = mNewStreamMask & mStreamMask;
162    uint32_t fetchersMask  = 0;
163    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
164        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
165        fetchersMask |= fetcherMask;
166    }
167    mask &= fetchersMask;
168    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
169        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
170    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
171        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
172    }
173    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
174        return finalResult == OK ? -EAGAIN : finalResult;
175    }
176
177    status_t err = packetSource->dequeueAccessUnit(accessUnit);
178
179    size_t streamIdx;
180    const char *streamStr;
181    switch (stream) {
182        case STREAMTYPE_AUDIO:
183            streamIdx = kAudioIndex;
184            streamStr = "audio";
185            break;
186        case STREAMTYPE_VIDEO:
187            streamIdx = kVideoIndex;
188            streamStr = "video";
189            break;
190        case STREAMTYPE_SUBTITLES:
191            streamIdx = kSubtitleIndex;
192            streamStr = "subs";
193            break;
194        default:
195            TRESPASS();
196    }
197
198    StreamItem& strm = mStreams[streamIdx];
199    if (err == INFO_DISCONTINUITY) {
200        // adaptive streaming, discontinuities in the playlist
201        int32_t type;
202        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
203
204        sp<AMessage> extra;
205        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
206            extra.clear();
207        }
208
209        ALOGI("[%s] read discontinuity of type %d, extra = %s",
210              streamStr,
211              type,
212              extra == NULL ? "NULL" : extra->debugString().c_str());
213
214        int32_t swap;
215        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
216            int32_t switchGeneration;
217            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
218            {
219                Mutex::Autolock lock(mSwapMutex);
220                if (switchGeneration == mSwitchGeneration) {
221                    swapPacketSource(stream);
222                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
223                    msg->setInt32("stream", stream);
224                    msg->setInt32("switchGeneration", switchGeneration);
225                    msg->post();
226                }
227            }
228        } else {
229            size_t seq = strm.mCurDiscontinuitySeq;
230            int64_t offsetTimeUs;
231            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
232                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
233            } else {
234                offsetTimeUs = 0;
235            }
236
237            seq += 1;
238            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
239                int64_t firstTimeUs;
240                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
241                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
242                offsetTimeUs += strm.mLastSampleDurationUs;
243            } else {
244                offsetTimeUs += strm.mLastSampleDurationUs;
245            }
246
247            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
248        }
249    } else if (err == OK) {
250
251        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
252            int64_t timeUs;
253            int32_t discontinuitySeq = 0;
254            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
255            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
256            strm.mCurDiscontinuitySeq = discontinuitySeq;
257
258            int32_t discard = 0;
259            int64_t firstTimeUs;
260            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
261                int64_t durUs; // approximate sample duration
262                if (timeUs > strm.mLastDequeuedTimeUs) {
263                    durUs = timeUs - strm.mLastDequeuedTimeUs;
264                } else {
265                    durUs = strm.mLastDequeuedTimeUs - timeUs;
266                }
267                strm.mLastSampleDurationUs = durUs;
268                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
269            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
270                firstTimeUs = timeUs;
271            } else {
272                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
273                firstTimeUs = timeUs;
274            }
275
276            strm.mLastDequeuedTimeUs = timeUs;
277            if (timeUs >= firstTimeUs) {
278                timeUs -= firstTimeUs;
279            } else {
280                timeUs = 0;
281            }
282            timeUs += mLastSeekTimeUs;
283            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
284                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
285            }
286
287            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
288            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
289            mLastDequeuedTimeUs = timeUs;
290            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
291        } else if (stream == STREAMTYPE_SUBTITLES) {
292            (*accessUnit)->meta()->setInt32(
293                    "trackIndex", mPlaylist->getSelectedIndex());
294            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
295        }
296    } else {
297        ALOGI("[%s] encountered error %d", streamStr, err);
298    }
299
300    return err;
301}
302
303status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
304    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
305    if (!(mStreamMask & stream)) {
306        return UNKNOWN_ERROR;
307    }
308
309    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
310
311    sp<MetaData> meta = packetSource->getFormat();
312
313    if (meta == NULL) {
314        return -EAGAIN;
315    }
316
317    return convertMetaDataToMessage(meta, format);
318}
319
320void LiveSession::connectAsync(
321        const char *url, const KeyedVector<String8, String8> *headers) {
322    sp<AMessage> msg = new AMessage(kWhatConnect, id());
323    msg->setString("url", url);
324
325    if (headers != NULL) {
326        msg->setPointer(
327                "headers",
328                new KeyedVector<String8, String8>(*headers));
329    }
330
331    msg->post();
332}
333
334status_t LiveSession::disconnect() {
335    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
336
337    sp<AMessage> response;
338    status_t err = msg->postAndAwaitResponse(&response);
339
340    return err;
341}
342
343status_t LiveSession::seekTo(int64_t timeUs) {
344    sp<AMessage> msg = new AMessage(kWhatSeek, id());
345    msg->setInt64("timeUs", timeUs);
346
347    sp<AMessage> response;
348    status_t err = msg->postAndAwaitResponse(&response);
349
350    uint32_t replyID;
351    CHECK(response == mSeekReply && 0 != mSeekReplyID);
352    mSeekReply.clear();
353    mSeekReplyID = 0;
354    return err;
355}
356
357void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
358    switch (msg->what()) {
359        case kWhatConnect:
360        {
361            onConnect(msg);
362            break;
363        }
364
365        case kWhatDisconnect:
366        {
367            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
368
369            if (mReconfigurationInProgress) {
370                break;
371            }
372
373            finishDisconnect();
374            break;
375        }
376
377        case kWhatSeek:
378        {
379            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
380
381            status_t err = onSeek(msg);
382
383            mSeekReply = new AMessage;
384            mSeekReply->setInt32("err", err);
385            break;
386        }
387
388        case kWhatFetcherNotify:
389        {
390            int32_t what;
391            CHECK(msg->findInt32("what", &what));
392
393            switch (what) {
394                case PlaylistFetcher::kWhatStarted:
395                    break;
396                case PlaylistFetcher::kWhatPaused:
397                case PlaylistFetcher::kWhatStopped:
398                {
399                    if (what == PlaylistFetcher::kWhatStopped) {
400                        AString uri;
401                        CHECK(msg->findString("uri", &uri));
402                        if (mFetcherInfos.removeItem(uri) < 0) {
403                            // ignore duplicated kWhatStopped messages.
404                            break;
405                        }
406
407                        if (mSwitchInProgress) {
408                            tryToFinishBandwidthSwitch();
409                        }
410                    }
411
412                    if (mContinuation != NULL) {
413                        CHECK_GT(mContinuationCounter, 0);
414                        if (--mContinuationCounter == 0) {
415                            mContinuation->post();
416
417                            if (mSeekReplyID != 0) {
418                                CHECK(mSeekReply != NULL);
419                                mSeekReply->postReply(mSeekReplyID);
420                            }
421                        }
422                    }
423                    break;
424                }
425
426                case PlaylistFetcher::kWhatDurationUpdate:
427                {
428                    AString uri;
429                    CHECK(msg->findString("uri", &uri));
430
431                    int64_t durationUs;
432                    CHECK(msg->findInt64("durationUs", &durationUs));
433
434                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
435                    info->mDurationUs = durationUs;
436                    break;
437                }
438
439                case PlaylistFetcher::kWhatError:
440                {
441                    status_t err;
442                    CHECK(msg->findInt32("err", &err));
443
444                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
445
446                    // handle EOS on subtitle tracks independently
447                    AString uri;
448                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
449                        ssize_t i = mFetcherInfos.indexOfKey(uri);
450                        if (i >= 0) {
451                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
452                            if (fetcher != NULL) {
453                                uint32_t type = fetcher->getStreamTypeMask();
454                                if (type == STREAMTYPE_SUBTITLES) {
455                                    mPacketSources.valueFor(
456                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
457                                    break;
458                                }
459                            }
460                        }
461                    }
462
463                    if (mInPreparationPhase) {
464                        postPrepared(err);
465                    }
466
467                    cancelBandwidthSwitch();
468
469                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
470
471                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
472
473                    mPacketSources.valueFor(
474                            STREAMTYPE_SUBTITLES)->signalEOS(err);
475
476                    sp<AMessage> notify = mNotify->dup();
477                    notify->setInt32("what", kWhatError);
478                    notify->setInt32("err", err);
479                    notify->post();
480                    break;
481                }
482
483                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
484                {
485                    AString uri;
486                    CHECK(msg->findString("uri", &uri));
487
488                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
489                    info->mIsPrepared = true;
490
491                    if (mInPreparationPhase) {
492                        bool allFetchersPrepared = true;
493                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
494                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
495                                allFetchersPrepared = false;
496                                break;
497                            }
498                        }
499
500                        if (allFetchersPrepared) {
501                            postPrepared(OK);
502                        }
503                    }
504                    break;
505                }
506
507                case PlaylistFetcher::kWhatStartedAt:
508                {
509                    int32_t switchGeneration;
510                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
511
512                    if (switchGeneration != mSwitchGeneration) {
513                        break;
514                    }
515
516                    // Resume fetcher for the original variant; the resumed fetcher should
517                    // continue until the timestamps found in msg, which is stored by the
518                    // new fetcher to indicate where the new variant has started buffering.
519                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
520                        const FetcherInfo info = mFetcherInfos.valueAt(i);
521                        if (info.mToBeRemoved) {
522                            info.mFetcher->resumeUntilAsync(msg);
523                        }
524                    }
525                    break;
526                }
527
528                default:
529                    TRESPASS();
530            }
531
532            break;
533        }
534
535        case kWhatCheckBandwidth:
536        {
537            int32_t generation;
538            CHECK(msg->findInt32("generation", &generation));
539
540            if (generation != mCheckBandwidthGeneration) {
541                break;
542            }
543
544            onCheckBandwidth(msg);
545            break;
546        }
547
548        case kWhatChangeConfiguration:
549        {
550            onChangeConfiguration(msg);
551            break;
552        }
553
554        case kWhatChangeConfiguration2:
555        {
556            onChangeConfiguration2(msg);
557            break;
558        }
559
560        case kWhatChangeConfiguration3:
561        {
562            onChangeConfiguration3(msg);
563            break;
564        }
565
566        case kWhatFinishDisconnect2:
567        {
568            onFinishDisconnect2();
569            break;
570        }
571
572        case kWhatSwapped:
573        {
574            onSwapped(msg);
575            break;
576        }
577
578        case kWhatCheckSwitchDown:
579        {
580            onCheckSwitchDown();
581            break;
582        }
583
584        case kWhatSwitchDown:
585        {
586            onSwitchDown();
587            break;
588        }
589
590        default:
591            TRESPASS();
592            break;
593    }
594}
595
596// static
597int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
598    if (a->mBandwidth < b->mBandwidth) {
599        return -1;
600    } else if (a->mBandwidth == b->mBandwidth) {
601        return 0;
602    }
603
604    return 1;
605}
606
607// static
608LiveSession::StreamType LiveSession::indexToType(int idx) {
609    CHECK(idx >= 0 && idx < kMaxStreams);
610    return (StreamType)(1 << idx);
611}
612
613// static
614ssize_t LiveSession::typeToIndex(int32_t type) {
615    switch (type) {
616        case STREAMTYPE_AUDIO:
617            return 0;
618        case STREAMTYPE_VIDEO:
619            return 1;
620        case STREAMTYPE_SUBTITLES:
621            return 2;
622        default:
623            return -1;
624    };
625    return -1;
626}
627
628void LiveSession::onConnect(const sp<AMessage> &msg) {
629    AString url;
630    CHECK(msg->findString("url", &url));
631
632    KeyedVector<String8, String8> *headers = NULL;
633    if (!msg->findPointer("headers", (void **)&headers)) {
634        mExtraHeaders.clear();
635    } else {
636        mExtraHeaders = *headers;
637
638        delete headers;
639        headers = NULL;
640    }
641
642    // TODO currently we don't know if we are coming here from incognito mode
643    ALOGI("onConnect %s", uriDebugString(url).c_str());
644
645    mMasterURL = url;
646
647    bool dummy;
648    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
649
650    if (mPlaylist == NULL) {
651        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
652
653        postPrepared(ERROR_IO);
654        return;
655    }
656
657    // We trust the content provider to make a reasonable choice of preferred
658    // initial bandwidth by listing it first in the variant playlist.
659    // At startup we really don't have a good estimate on the available
660    // network bandwidth since we haven't tranferred any data yet. Once
661    // we have we can make a better informed choice.
662    size_t initialBandwidth = 0;
663    size_t initialBandwidthIndex = 0;
664
665    if (mPlaylist->isVariantPlaylist()) {
666        for (size_t i = 0; i < mPlaylist->size(); ++i) {
667            BandwidthItem item;
668
669            item.mPlaylistIndex = i;
670
671            sp<AMessage> meta;
672            AString uri;
673            mPlaylist->itemAt(i, &uri, &meta);
674
675            unsigned long bandwidth;
676            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
677
678            if (initialBandwidth == 0) {
679                initialBandwidth = item.mBandwidth;
680            }
681
682            mBandwidthItems.push(item);
683        }
684
685        CHECK_GT(mBandwidthItems.size(), 0u);
686
687        mBandwidthItems.sort(SortByBandwidth);
688
689        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
690            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
691                initialBandwidthIndex = i;
692                break;
693            }
694        }
695    } else {
696        // dummy item.
697        BandwidthItem item;
698        item.mPlaylistIndex = 0;
699        item.mBandwidth = 0;
700        mBandwidthItems.push(item);
701    }
702
703    mPlaylist->pickRandomMediaItems();
704    changeConfiguration(
705            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
706}
707
708void LiveSession::finishDisconnect() {
709    // No reconfiguration is currently pending, make sure none will trigger
710    // during disconnection either.
711    cancelCheckBandwidthEvent();
712
713    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
714    // (finishDisconnect, onFinishDisconnect2)
715    cancelBandwidthSwitch();
716
717    // cancel switch down monitor
718    mSwitchDownMonitor.clear();
719
720    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
721        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
722    }
723
724    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
725
726    mContinuationCounter = mFetcherInfos.size();
727    mContinuation = msg;
728
729    if (mContinuationCounter == 0) {
730        msg->post();
731    }
732}
733
734void LiveSession::onFinishDisconnect2() {
735    mContinuation.clear();
736
737    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
738    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
739
740    mPacketSources.valueFor(
741            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
742
743    sp<AMessage> response = new AMessage;
744    response->setInt32("err", OK);
745
746    response->postReply(mDisconnectReplyID);
747    mDisconnectReplyID = 0;
748}
749
750sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
751    ssize_t index = mFetcherInfos.indexOfKey(uri);
752
753    if (index >= 0) {
754        return NULL;
755    }
756
757    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
758    notify->setString("uri", uri);
759    notify->setInt32("switchGeneration", mSwitchGeneration);
760
761    FetcherInfo info;
762    info.mFetcher = new PlaylistFetcher(notify, this, uri);
763    info.mDurationUs = -1ll;
764    info.mIsPrepared = false;
765    info.mToBeRemoved = false;
766    looper()->registerHandler(info.mFetcher);
767
768    mFetcherInfos.add(uri, info);
769
770    return info.mFetcher;
771}
772
773/*
774 * Illustration of parameters:
775 *
776 * 0      `range_offset`
777 * +------------+-------------------------------------------------------+--+--+
778 * |            |                                 | next block to fetch |  |  |
779 * |            | `source` handle => `out` buffer |                     |  |  |
780 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
781 * |            |<----------- `range_length` / buffer capacity ----------->|  |
782 * |<------------------------------ file_size ------------------------------->|
783 *
784 * Special parameter values:
785 * - range_length == -1 means entire file
786 * - block_size == 0 means entire range
787 *
788 */
789ssize_t LiveSession::fetchFile(
790        const char *url, sp<ABuffer> *out,
791        int64_t range_offset, int64_t range_length,
792        uint32_t block_size, /* download block size */
793        sp<DataSource> *source, /* to return and reuse source */
794        String8 *actualUrl) {
795    off64_t size;
796    sp<DataSource> temp_source;
797    if (source == NULL) {
798        source = &temp_source;
799    }
800
801    if (*source == NULL) {
802        if (!strncasecmp(url, "file://", 7)) {
803            *source = new FileSource(url + 7);
804        } else if (strncasecmp(url, "http://", 7)
805                && strncasecmp(url, "https://", 8)) {
806            return ERROR_UNSUPPORTED;
807        } else {
808            KeyedVector<String8, String8> headers = mExtraHeaders;
809            if (range_offset > 0 || range_length >= 0) {
810                headers.add(
811                        String8("Range"),
812                        String8(
813                            StringPrintf(
814                                "bytes=%lld-%s",
815                                range_offset,
816                                range_length < 0
817                                    ? "" : StringPrintf("%lld",
818                                            range_offset + range_length - 1).c_str()).c_str()));
819            }
820            status_t err = mHTTPDataSource->connect(url, &headers);
821
822            if (err != OK) {
823                return err;
824            }
825
826            *source = mHTTPDataSource;
827        }
828    }
829
830    status_t getSizeErr = (*source)->getSize(&size);
831    if (getSizeErr != OK) {
832        size = 65536;
833    }
834
835    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
836    if (*out == NULL) {
837        buffer->setRange(0, 0);
838    }
839
840    ssize_t bytesRead = 0;
841    // adjust range_length if only reading partial block
842    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
843        range_length = buffer->size() + block_size;
844    }
845    for (;;) {
846        // Only resize when we don't know the size.
847        size_t bufferRemaining = buffer->capacity() - buffer->size();
848        if (bufferRemaining == 0 && getSizeErr != OK) {
849            bufferRemaining = 32768;
850
851            ALOGV("increasing download buffer to %zu bytes",
852                 buffer->size() + bufferRemaining);
853
854            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
855            memcpy(copy->data(), buffer->data(), buffer->size());
856            copy->setRange(0, buffer->size());
857
858            buffer = copy;
859        }
860
861        size_t maxBytesToRead = bufferRemaining;
862        if (range_length >= 0) {
863            int64_t bytesLeftInRange = range_length - buffer->size();
864            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
865                maxBytesToRead = bytesLeftInRange;
866
867                if (bytesLeftInRange == 0) {
868                    break;
869                }
870            }
871        }
872
873        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
874        // to help us break out of the loop.
875        ssize_t n = (*source)->readAt(
876                buffer->size(), buffer->data() + buffer->size(),
877                maxBytesToRead);
878
879        if (n < 0) {
880            return n;
881        }
882
883        if (n == 0) {
884            break;
885        }
886
887        buffer->setRange(0, buffer->size() + (size_t)n);
888        bytesRead += n;
889    }
890
891    *out = buffer;
892    if (actualUrl != NULL) {
893        *actualUrl = (*source)->getUri();
894        if (actualUrl->isEmpty()) {
895            *actualUrl = url;
896        }
897    }
898
899    return bytesRead;
900}
901
902sp<M3UParser> LiveSession::fetchPlaylist(
903        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
904    ALOGV("fetchPlaylist '%s'", url);
905
906    *unchanged = false;
907
908    sp<ABuffer> buffer;
909    String8 actualUrl;
910    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
911
912    if (err <= 0) {
913        return NULL;
914    }
915
916    // MD5 functionality is not available on the simulator, treat all
917    // playlists as changed.
918
919#if defined(HAVE_ANDROID_OS)
920    uint8_t hash[16];
921
922    MD5_CTX m;
923    MD5_Init(&m);
924    MD5_Update(&m, buffer->data(), buffer->size());
925
926    MD5_Final(hash, &m);
927
928    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
929        // playlist unchanged
930        *unchanged = true;
931
932        return NULL;
933    }
934
935    if (curPlaylistHash != NULL) {
936        memcpy(curPlaylistHash, hash, sizeof(hash));
937    }
938#endif
939
940    sp<M3UParser> playlist =
941        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
942
943    if (playlist->initCheck() != OK) {
944        ALOGE("failed to parse .m3u8 playlist");
945
946        return NULL;
947    }
948
949    return playlist;
950}
951
952static double uniformRand() {
953    return (double)rand() / RAND_MAX;
954}
955
956size_t LiveSession::getBandwidthIndex() {
957    if (mBandwidthItems.size() == 0) {
958        return 0;
959    }
960
961#if 1
962    char value[PROPERTY_VALUE_MAX];
963    ssize_t index = -1;
964    if (property_get("media.httplive.bw-index", value, NULL)) {
965        char *end;
966        index = strtol(value, &end, 10);
967        CHECK(end > value && *end == '\0');
968
969        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
970            index = mBandwidthItems.size() - 1;
971        }
972    }
973
974    if (index < 0) {
975        int32_t bandwidthBps;
976        if (mHTTPDataSource != NULL
977                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
978            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
979        } else {
980            ALOGV("no bandwidth estimate.");
981            return 0;  // Pick the lowest bandwidth stream by default.
982        }
983
984        char value[PROPERTY_VALUE_MAX];
985        if (property_get("media.httplive.max-bw", value, NULL)) {
986            char *end;
987            long maxBw = strtoul(value, &end, 10);
988            if (end > value && *end == '\0') {
989                if (maxBw > 0 && bandwidthBps > maxBw) {
990                    ALOGV("bandwidth capped to %ld bps", maxBw);
991                    bandwidthBps = maxBw;
992                }
993            }
994        }
995
996        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
997
998        index = mBandwidthItems.size() - 1;
999        while (index > 0) {
1000            // consider only 80% of the available bandwidth, but if we are switching up,
1001            // be even more conservative (70%) to avoid overestimating and immediately
1002            // switching back.
1003            size_t adjustedBandwidthBps = bandwidthBps;
1004            if (index > mCurBandwidthIndex) {
1005                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1006            } else {
1007                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1008            }
1009            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1010                break;
1011            }
1012            --index;
1013        }
1014    }
1015#elif 0
1016    // Change bandwidth at random()
1017    size_t index = uniformRand() * mBandwidthItems.size();
1018#elif 0
1019    // There's a 50% chance to stay on the current bandwidth and
1020    // a 50% chance to switch to the next higher bandwidth (wrapping around
1021    // to lowest)
1022    const size_t kMinIndex = 0;
1023
1024    static ssize_t mCurBandwidthIndex = -1;
1025
1026    size_t index;
1027    if (mCurBandwidthIndex < 0) {
1028        index = kMinIndex;
1029    } else if (uniformRand() < 0.5) {
1030        index = (size_t)mCurBandwidthIndex;
1031    } else {
1032        index = mCurBandwidthIndex + 1;
1033        if (index == mBandwidthItems.size()) {
1034            index = kMinIndex;
1035        }
1036    }
1037    mCurBandwidthIndex = index;
1038#elif 0
1039    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1040
1041    size_t index = mBandwidthItems.size() - 1;
1042    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1043        --index;
1044    }
1045#elif 1
1046    char value[PROPERTY_VALUE_MAX];
1047    size_t index;
1048    if (property_get("media.httplive.bw-index", value, NULL)) {
1049        char *end;
1050        index = strtoul(value, &end, 10);
1051        CHECK(end > value && *end == '\0');
1052
1053        if (index >= mBandwidthItems.size()) {
1054            index = mBandwidthItems.size() - 1;
1055        }
1056    } else {
1057        index = 0;
1058    }
1059#else
1060    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1061#endif
1062
1063    CHECK_GE(index, 0);
1064
1065    return index;
1066}
1067
1068status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1069    int64_t timeUs;
1070    CHECK(msg->findInt64("timeUs", &timeUs));
1071
1072    if (!mReconfigurationInProgress) {
1073        changeConfiguration(timeUs, getBandwidthIndex());
1074    }
1075
1076    return OK;
1077}
1078
1079status_t LiveSession::getDuration(int64_t *durationUs) const {
1080    int64_t maxDurationUs = 0ll;
1081    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1082        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1083
1084        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1085            maxDurationUs = fetcherDurationUs;
1086        }
1087    }
1088
1089    *durationUs = maxDurationUs;
1090
1091    return OK;
1092}
1093
1094bool LiveSession::isSeekable() const {
1095    int64_t durationUs;
1096    return getDuration(&durationUs) == OK && durationUs >= 0;
1097}
1098
1099bool LiveSession::hasDynamicDuration() const {
1100    return false;
1101}
1102
1103size_t LiveSession::getTrackCount() const {
1104    if (mPlaylist == NULL) {
1105        return 0;
1106    } else {
1107        return mPlaylist->getTrackCount();
1108    }
1109}
1110
1111sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1112    if (mPlaylist == NULL) {
1113        return NULL;
1114    } else {
1115        return mPlaylist->getTrackInfo(trackIndex);
1116    }
1117}
1118
1119status_t LiveSession::selectTrack(size_t index, bool select) {
1120    status_t err = mPlaylist->selectTrack(index, select);
1121    if (err == OK) {
1122        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1123        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1124        msg->setInt32("pickTrack", select);
1125        msg->post();
1126    }
1127    return err;
1128}
1129
1130bool LiveSession::canSwitchUp() {
1131    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1132    status_t err = OK;
1133    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1134        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1135        int64_t dur = source->getBufferedDurationUs(&err);
1136        if (err == OK && dur > 10000000) {
1137            return true;
1138        }
1139    }
1140    return false;
1141}
1142
1143void LiveSession::changeConfiguration(
1144        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1145    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1146    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1147    cancelBandwidthSwitch();
1148
1149    CHECK(!mReconfigurationInProgress);
1150    mReconfigurationInProgress = true;
1151
1152    mCurBandwidthIndex = bandwidthIndex;
1153
1154    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1155          timeUs, bandwidthIndex, pickTrack);
1156
1157    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1158    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1159
1160    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1161    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1162
1163    AString URIs[kMaxStreams];
1164    for (size_t i = 0; i < kMaxStreams; ++i) {
1165        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1166            streamMask |= indexToType(i);
1167        }
1168    }
1169
1170    // Step 1, stop and discard fetchers that are no longer needed.
1171    // Pause those that we'll reuse.
1172    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1173        const AString &uri = mFetcherInfos.keyAt(i);
1174
1175        bool discardFetcher = true;
1176
1177        // If we're seeking all current fetchers are discarded.
1178        if (timeUs < 0ll) {
1179            // delay fetcher removal if not picking tracks
1180            discardFetcher = pickTrack;
1181
1182            for (size_t j = 0; j < kMaxStreams; ++j) {
1183                StreamType type = indexToType(j);
1184                if ((streamMask & type) && uri == URIs[j]) {
1185                    resumeMask |= type;
1186                    streamMask &= ~type;
1187                    discardFetcher = false;
1188                }
1189            }
1190        }
1191
1192        if (discardFetcher) {
1193            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1194        } else {
1195            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1196        }
1197    }
1198
1199    sp<AMessage> msg;
1200    if (timeUs < 0ll) {
1201        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1202        msg = new AMessage(kWhatChangeConfiguration3, id());
1203    } else {
1204        msg = new AMessage(kWhatChangeConfiguration2, id());
1205    }
1206    msg->setInt32("streamMask", streamMask);
1207    msg->setInt32("resumeMask", resumeMask);
1208    msg->setInt32("pickTrack", pickTrack);
1209    msg->setInt64("timeUs", timeUs);
1210    for (size_t i = 0; i < kMaxStreams; ++i) {
1211        if ((streamMask | resumeMask) & indexToType(i)) {
1212            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1213        }
1214    }
1215
1216    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1217    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1218    // fetchers have completed their asynchronous operation, we'll post
1219    // mContinuation, which then is handled below in onChangeConfiguration2.
1220    mContinuationCounter = mFetcherInfos.size();
1221    mContinuation = msg;
1222
1223    if (mContinuationCounter == 0) {
1224        msg->post();
1225
1226        if (mSeekReplyID != 0) {
1227            CHECK(mSeekReply != NULL);
1228            mSeekReply->postReply(mSeekReplyID);
1229        }
1230    }
1231}
1232
1233void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1234    if (!mReconfigurationInProgress) {
1235        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1236        msg->findInt32("pickTrack", &pickTrack);
1237        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1238        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1239    } else {
1240        msg->post(1000000ll); // retry in 1 sec
1241    }
1242}
1243
1244void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1245    mContinuation.clear();
1246
1247    // All fetchers are either suspended or have been removed now.
1248
1249    uint32_t streamMask, resumeMask;
1250    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1251    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1252
1253    // currently onChangeConfiguration2 is only called for seeking;
1254    // remove the following CHECK if using it else where.
1255    CHECK_EQ(resumeMask, 0);
1256    streamMask |= resumeMask;
1257
1258    AString URIs[kMaxStreams];
1259    for (size_t i = 0; i < kMaxStreams; ++i) {
1260        if (streamMask & indexToType(i)) {
1261            const AString &uriKey = mStreams[i].uriKey();
1262            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1263            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1264        }
1265    }
1266
1267    // Determine which decoders to shutdown on the player side,
1268    // a decoder has to be shutdown if either
1269    // 1) its streamtype was active before but now longer isn't.
1270    // or
1271    // 2) its streamtype was already active and still is but the URI
1272    //    has changed.
1273    uint32_t changedMask = 0;
1274    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1275        if (((mStreamMask & streamMask & indexToType(i))
1276                && !(URIs[i] == mStreams[i].mUri))
1277                || (mStreamMask & ~streamMask & indexToType(i))) {
1278            changedMask |= indexToType(i);
1279        }
1280    }
1281
1282    if (changedMask == 0) {
1283        // If nothing changed as far as the audio/video decoders
1284        // are concerned we can proceed.
1285        onChangeConfiguration3(msg);
1286        return;
1287    }
1288
1289    // Something changed, inform the player which will shutdown the
1290    // corresponding decoders and will post the reply once that's done.
1291    // Handling the reply will continue executing below in
1292    // onChangeConfiguration3.
1293    sp<AMessage> notify = mNotify->dup();
1294    notify->setInt32("what", kWhatStreamsChanged);
1295    notify->setInt32("changedMask", changedMask);
1296
1297    msg->setWhat(kWhatChangeConfiguration3);
1298    msg->setTarget(id());
1299
1300    notify->setMessage("reply", msg);
1301    notify->post();
1302}
1303
1304void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1305    mContinuation.clear();
1306    // All remaining fetchers are still suspended, the player has shutdown
1307    // any decoders that needed it.
1308
1309    uint32_t streamMask, resumeMask;
1310    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1311    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1312
1313    int64_t timeUs;
1314    int32_t pickTrack;
1315    bool switching = false;
1316    CHECK(msg->findInt64("timeUs", &timeUs));
1317    CHECK(msg->findInt32("pickTrack", &pickTrack));
1318
1319    if (timeUs < 0ll) {
1320        if (!pickTrack) {
1321            switching = true;
1322        }
1323        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1324    } else {
1325        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1326    }
1327
1328    for (size_t i = 0; i < kMaxStreams; ++i) {
1329        if (streamMask & indexToType(i)) {
1330            if (switching) {
1331                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1332            } else {
1333                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1334            }
1335        }
1336    }
1337
1338    mNewStreamMask = streamMask | resumeMask;
1339    if (switching) {
1340        mSwapMask = mStreamMask & ~resumeMask;
1341    }
1342
1343    // Of all existing fetchers:
1344    // * Resume fetchers that are still needed and assign them original packet sources.
1345    // * Mark otherwise unneeded fetchers for removal.
1346    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1347    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1348        const AString &uri = mFetcherInfos.keyAt(i);
1349
1350        sp<AnotherPacketSource> sources[kMaxStreams];
1351        for (size_t j = 0; j < kMaxStreams; ++j) {
1352            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1353                sources[j] = mPacketSources.valueFor(indexToType(j));
1354
1355                if (j != kSubtitleIndex) {
1356                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1357                    sp<AnotherPacketSource> discontinuityQueue;
1358                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1359                    discontinuityQueue->queueDiscontinuity(
1360                            ATSParser::DISCONTINUITY_NONE,
1361                            NULL,
1362                            true);
1363                }
1364            }
1365        }
1366
1367        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1368        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1369                || sources[kSubtitleIndex] != NULL) {
1370            info.mFetcher->startAsync(
1371                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1372        } else {
1373            info.mToBeRemoved = true;
1374        }
1375    }
1376
1377    // streamMask now only contains the types that need a new fetcher created.
1378
1379    if (streamMask != 0) {
1380        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1381    }
1382
1383    // Find out when the original fetchers have buffered up to and start the new fetchers
1384    // at a later timestamp.
1385    for (size_t i = 0; i < kMaxStreams; i++) {
1386        if (!(indexToType(i) & streamMask)) {
1387            continue;
1388        }
1389
1390        AString uri;
1391        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1392
1393        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1394        CHECK(fetcher != NULL);
1395
1396        int32_t latestSeq = -1;
1397        int64_t startTimeUs = -1;
1398        int64_t segmentStartTimeUs = -1ll;
1399        int32_t discontinuitySeq = -1;
1400        sp<AnotherPacketSource> sources[kMaxStreams];
1401
1402        // TRICKY: looping from i as earlier streams are already removed from streamMask
1403        for (size_t j = i; j < kMaxStreams; ++j) {
1404            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1405            if ((streamMask & indexToType(j)) && uri == streamUri) {
1406                sources[j] = mPacketSources.valueFor(indexToType(j));
1407
1408                if (timeUs >= 0) {
1409                    sources[j]->clear();
1410                    startTimeUs = timeUs;
1411
1412                    sp<AnotherPacketSource> discontinuityQueue;
1413                    sp<AMessage> extra = new AMessage;
1414                    extra->setInt64("timeUs", timeUs);
1415                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1416                    discontinuityQueue->queueDiscontinuity(
1417                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1418                } else {
1419                    int32_t type;
1420                    int64_t srcSegmentStartTimeUs;
1421                    sp<AMessage> meta;
1422                    if (pickTrack) {
1423                        // selecting
1424                        meta = sources[j]->getLatestDequeuedMeta();
1425                    } else {
1426                        // adapting
1427                        meta = sources[j]->getLatestEnqueuedMeta();
1428                    }
1429
1430                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1431                        int64_t tmpUs;
1432                        CHECK(meta->findInt64("timeUs", &tmpUs));
1433                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1434                            startTimeUs = tmpUs;
1435                        }
1436
1437                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1438                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1439                            segmentStartTimeUs = tmpUs;
1440                        }
1441
1442                        int32_t seq;
1443                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1444                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1445                            discontinuitySeq = seq;
1446                        }
1447                    }
1448
1449                    if (pickTrack) {
1450                        // selecting track, queue discontinuities before content
1451                        sources[j]->clear();
1452                        if (j == kSubtitleIndex) {
1453                            break;
1454                        }
1455                        sp<AnotherPacketSource> discontinuityQueue;
1456                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1457                        discontinuityQueue->queueDiscontinuity(
1458                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1459                    } else {
1460                        // adapting, queue discontinuities after resume
1461                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1462                        sources[j]->clear();
1463                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1464                        if (extraStreams & indexToType(j)) {
1465                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1466                        }
1467                    }
1468                }
1469
1470                streamMask &= ~indexToType(j);
1471            }
1472        }
1473
1474        fetcher->startAsync(
1475                sources[kAudioIndex],
1476                sources[kVideoIndex],
1477                sources[kSubtitleIndex],
1478                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1479                segmentStartTimeUs,
1480                discontinuitySeq,
1481                switching);
1482    }
1483
1484    // All fetchers have now been started, the configuration change
1485    // has completed.
1486
1487    cancelCheckBandwidthEvent();
1488    scheduleCheckBandwidthEvent();
1489
1490    ALOGV("XXX configuration change completed.");
1491    mReconfigurationInProgress = false;
1492    if (switching) {
1493        mSwitchInProgress = true;
1494    } else {
1495        mStreamMask = mNewStreamMask;
1496    }
1497
1498    if (mDisconnectReplyID != 0) {
1499        finishDisconnect();
1500    }
1501}
1502
1503void LiveSession::onSwapped(const sp<AMessage> &msg) {
1504    int32_t switchGeneration;
1505    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1506    if (switchGeneration != mSwitchGeneration) {
1507        return;
1508    }
1509
1510    int32_t stream;
1511    CHECK(msg->findInt32("stream", &stream));
1512
1513    ssize_t idx = typeToIndex(stream);
1514    CHECK(idx >= 0);
1515    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1516        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1517    }
1518    mStreams[idx].mUri = mStreams[idx].mNewUri;
1519    mStreams[idx].mNewUri.clear();
1520
1521    mSwapMask &= ~stream;
1522    if (mSwapMask != 0) {
1523        return;
1524    }
1525
1526    // Check if new variant contains extra streams.
1527    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1528    while (extraStreams) {
1529        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1530        swapPacketSource(extraStream);
1531        extraStreams &= ~extraStream;
1532
1533        idx = typeToIndex(extraStream);
1534        CHECK(idx >= 0);
1535        if (mStreams[idx].mNewUri.empty()) {
1536            ALOGW("swapping extra stream type %d %s to empty stream",
1537                    extraStream, mStreams[idx].mUri.c_str());
1538        }
1539        mStreams[idx].mUri = mStreams[idx].mNewUri;
1540        mStreams[idx].mNewUri.clear();
1541    }
1542
1543    tryToFinishBandwidthSwitch();
1544}
1545
1546void LiveSession::onCheckSwitchDown() {
1547    if (mSwitchDownMonitor == NULL) {
1548        return;
1549    }
1550
1551    for (size_t i = 0; i < kMaxStreams; ++i) {
1552        int32_t targetDuration;
1553        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1554        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1555
1556        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1557            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1558            int64_t targetDurationUs = targetDuration * 1000000ll;
1559
1560            if (bufferedDurationUs < targetDurationUs / 3) {
1561                (new AMessage(kWhatSwitchDown, id()))->post();
1562                break;
1563            }
1564        }
1565    }
1566
1567    mSwitchDownMonitor->post(1000000ll);
1568}
1569
1570void LiveSession::onSwitchDown() {
1571    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1572        return;
1573    }
1574
1575    ssize_t bandwidthIndex = getBandwidthIndex();
1576    if (bandwidthIndex < mCurBandwidthIndex) {
1577        changeConfiguration(-1, bandwidthIndex, false);
1578        return;
1579    }
1580
1581    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1582}
1583
1584// Mark switch done when:
1585//   1. all old buffers are swapped out
1586void LiveSession::tryToFinishBandwidthSwitch() {
1587    if (!mSwitchInProgress) {
1588        return;
1589    }
1590
1591    bool needToRemoveFetchers = false;
1592    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1593        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1594            needToRemoveFetchers = true;
1595            break;
1596        }
1597    }
1598
1599    if (!needToRemoveFetchers && mSwapMask == 0) {
1600        ALOGI("mSwitchInProgress = false");
1601        mStreamMask = mNewStreamMask;
1602        mSwitchInProgress = false;
1603    }
1604}
1605
1606void LiveSession::scheduleCheckBandwidthEvent() {
1607    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1608    msg->setInt32("generation", mCheckBandwidthGeneration);
1609    msg->post(10000000ll);
1610}
1611
1612void LiveSession::cancelCheckBandwidthEvent() {
1613    ++mCheckBandwidthGeneration;
1614}
1615
1616void LiveSession::cancelBandwidthSwitch() {
1617    Mutex::Autolock lock(mSwapMutex);
1618    mSwitchGeneration++;
1619    mSwitchInProgress = false;
1620    mSwapMask = 0;
1621
1622    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1623        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1624        if (info.mToBeRemoved) {
1625            info.mToBeRemoved = false;
1626        }
1627    }
1628
1629    for (size_t i = 0; i < kMaxStreams; ++i) {
1630        if (!mStreams[i].mNewUri.empty()) {
1631            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1632            if (j < 0) {
1633                mStreams[i].mNewUri.clear();
1634                continue;
1635            }
1636
1637            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1638            info.mFetcher->stopAsync();
1639            mFetcherInfos.removeItemsAt(j);
1640            mStreams[i].mNewUri.clear();
1641        }
1642    }
1643}
1644
1645bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1646    if (mReconfigurationInProgress || mSwitchInProgress) {
1647        return false;
1648    }
1649
1650    if (mCurBandwidthIndex < 0) {
1651        return true;
1652    }
1653
1654    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1655        return false;
1656    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1657        return canSwitchUp();
1658    } else {
1659        return true;
1660    }
1661}
1662
1663void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1664    size_t bandwidthIndex = getBandwidthIndex();
1665    if (canSwitchBandwidthTo(bandwidthIndex)) {
1666        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1667    } else {
1668        // Come back and check again 10 seconds later in case there is nothing to do now.
1669        // If we DO change configuration, once that completes it'll schedule a new
1670        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1671        msg->post(10000000ll);
1672    }
1673}
1674
1675void LiveSession::postPrepared(status_t err) {
1676    CHECK(mInPreparationPhase);
1677
1678    sp<AMessage> notify = mNotify->dup();
1679    if (err == OK || err == ERROR_END_OF_STREAM) {
1680        notify->setInt32("what", kWhatPrepared);
1681    } else {
1682        notify->setInt32("what", kWhatPreparationFailed);
1683        notify->setInt32("err", err);
1684    }
1685
1686    notify->post();
1687
1688    mInPreparationPhase = false;
1689
1690    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1691    mSwitchDownMonitor->post();
1692}
1693
1694}  // namespace android
1695
1696