LiveSession.cpp revision 800599cdd50737de1cde483a34b39923750b0658
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mBuffering[i] = false;
85    }
86}
87
88LiveSession::~LiveSession() {
89}
90
91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
92    ABuffer *discontinuity = new ABuffer(0);
93    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
94    discontinuity->meta()->setInt32("swapPacketSource", swap);
95    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
96    discontinuity->meta()->setInt64("timeUs", -1);
97    return discontinuity;
98}
99
100void LiveSession::swapPacketSource(StreamType stream) {
101    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
102    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
103    sp<AnotherPacketSource> tmp = aps;
104    aps = aps2;
105    aps2 = tmp;
106    aps2->clear();
107}
108
109status_t LiveSession::dequeueAccessUnit(
110        StreamType stream, sp<ABuffer> *accessUnit) {
111    if (!(mStreamMask & stream)) {
112        // return -EWOULDBLOCK to avoid halting the decoder
113        // when switching between audio/video and audio only.
114        return -EWOULDBLOCK;
115    }
116
117    status_t finalResult;
118    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
119    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
120        discontinuityQueue->dequeueAccessUnit(accessUnit);
121        // seeking, track switching
122        sp<AMessage> extra;
123        int64_t timeUs;
124        if ((*accessUnit)->meta()->findMessage("extra", &extra)
125                && extra != NULL
126                && extra->findInt64("timeUs", &timeUs)) {
127            // seeking only
128            mLastSeekTimeUs = timeUs;
129            mDiscontinuityOffsetTimesUs.clear();
130            mDiscontinuityAbsStartTimesUs.clear();
131        }
132        return INFO_DISCONTINUITY;
133    }
134
135    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
136
137    ssize_t idx = typeToIndex(stream);
138    if (!packetSource->hasBufferAvailable(&finalResult)) {
139        if (finalResult == OK) {
140            mBuffering[idx] = true;
141            return -EAGAIN;
142        } else {
143            return finalResult;
144        }
145    }
146
147    if (mBuffering[idx]) {
148        if (mSwitchInProgress
149                || packetSource->isFinished(0)
150                || packetSource->getEstimatedDurationUs() > 10000000ll) {
151            mBuffering[idx] = false;
152        }
153    }
154
155    if (mBuffering[idx]) {
156        return -EAGAIN;
157    }
158
159    // wait for counterpart
160    sp<AnotherPacketSource> otherSource;
161    uint32_t mask = mNewStreamMask & mStreamMask;
162    uint32_t fetchersMask  = 0;
163    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
164        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
165        fetchersMask |= fetcherMask;
166    }
167    mask &= fetchersMask;
168    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
169        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
170    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
171        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
172    }
173    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
174        return finalResult == OK ? -EAGAIN : finalResult;
175    }
176
177    status_t err = packetSource->dequeueAccessUnit(accessUnit);
178
179    size_t streamIdx;
180    const char *streamStr;
181    switch (stream) {
182        case STREAMTYPE_AUDIO:
183            streamIdx = kAudioIndex;
184            streamStr = "audio";
185            break;
186        case STREAMTYPE_VIDEO:
187            streamIdx = kVideoIndex;
188            streamStr = "video";
189            break;
190        case STREAMTYPE_SUBTITLES:
191            streamIdx = kSubtitleIndex;
192            streamStr = "subs";
193            break;
194        default:
195            TRESPASS();
196    }
197
198    StreamItem& strm = mStreams[streamIdx];
199    if (err == INFO_DISCONTINUITY) {
200        // adaptive streaming, discontinuities in the playlist
201        int32_t type;
202        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
203
204        sp<AMessage> extra;
205        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
206            extra.clear();
207        }
208
209        ALOGI("[%s] read discontinuity of type %d, extra = %s",
210              streamStr,
211              type,
212              extra == NULL ? "NULL" : extra->debugString().c_str());
213
214        int32_t swap;
215        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
216            int32_t switchGeneration;
217            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
218            {
219                Mutex::Autolock lock(mSwapMutex);
220                if (switchGeneration == mSwitchGeneration) {
221                    swapPacketSource(stream);
222                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
223                    msg->setInt32("stream", stream);
224                    msg->setInt32("switchGeneration", switchGeneration);
225                    msg->post();
226                }
227            }
228        } else {
229            size_t seq = strm.mCurDiscontinuitySeq;
230            int64_t offsetTimeUs;
231            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
232                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
233            } else {
234                offsetTimeUs = 0;
235            }
236
237            seq += 1;
238            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
239                int64_t firstTimeUs;
240                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
241                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
242                offsetTimeUs += strm.mLastSampleDurationUs;
243            } else {
244                offsetTimeUs += strm.mLastSampleDurationUs;
245            }
246
247            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
248        }
249    } else if (err == OK) {
250
251        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
252            int64_t timeUs;
253            int32_t discontinuitySeq = 0;
254            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
255            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
256            strm.mCurDiscontinuitySeq = discontinuitySeq;
257
258            int32_t discard = 0;
259            int64_t firstTimeUs;
260            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
261                int64_t durUs; // approximate sample duration
262                if (timeUs > strm.mLastDequeuedTimeUs) {
263                    durUs = timeUs - strm.mLastDequeuedTimeUs;
264                } else {
265                    durUs = strm.mLastDequeuedTimeUs - timeUs;
266                }
267                strm.mLastSampleDurationUs = durUs;
268                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
269            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
270                firstTimeUs = timeUs;
271            } else {
272                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
273                firstTimeUs = timeUs;
274            }
275
276            strm.mLastDequeuedTimeUs = timeUs;
277            if (timeUs >= firstTimeUs) {
278                timeUs -= firstTimeUs;
279            } else {
280                timeUs = 0;
281            }
282            timeUs += mLastSeekTimeUs;
283            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
284                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
285            }
286
287            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
288            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
289            mLastDequeuedTimeUs = timeUs;
290            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
291        } else if (stream == STREAMTYPE_SUBTITLES) {
292            (*accessUnit)->meta()->setInt32(
293                    "trackIndex", mPlaylist->getSelectedIndex());
294            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
295        }
296    } else {
297        ALOGI("[%s] encountered error %d", streamStr, err);
298    }
299
300    return err;
301}
302
303status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
304    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
305    if (!(mStreamMask & stream)) {
306        return UNKNOWN_ERROR;
307    }
308
309    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
310
311    sp<MetaData> meta = packetSource->getFormat();
312
313    if (meta == NULL) {
314        return -EAGAIN;
315    }
316
317    return convertMetaDataToMessage(meta, format);
318}
319
320void LiveSession::connectAsync(
321        const char *url, const KeyedVector<String8, String8> *headers) {
322    sp<AMessage> msg = new AMessage(kWhatConnect, id());
323    msg->setString("url", url);
324
325    if (headers != NULL) {
326        msg->setPointer(
327                "headers",
328                new KeyedVector<String8, String8>(*headers));
329    }
330
331    msg->post();
332}
333
334status_t LiveSession::disconnect() {
335    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
336
337    sp<AMessage> response;
338    status_t err = msg->postAndAwaitResponse(&response);
339
340    return err;
341}
342
343status_t LiveSession::seekTo(int64_t timeUs) {
344    sp<AMessage> msg = new AMessage(kWhatSeek, id());
345    msg->setInt64("timeUs", timeUs);
346
347    sp<AMessage> response;
348    status_t err = msg->postAndAwaitResponse(&response);
349
350    return err;
351}
352
353void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
354    switch (msg->what()) {
355        case kWhatConnect:
356        {
357            onConnect(msg);
358            break;
359        }
360
361        case kWhatDisconnect:
362        {
363            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
364
365            if (mReconfigurationInProgress) {
366                break;
367            }
368
369            finishDisconnect();
370            break;
371        }
372
373        case kWhatSeek:
374        {
375            uint32_t seekReplyID;
376            CHECK(msg->senderAwaitsResponse(&seekReplyID));
377            mSeekReplyID = seekReplyID;
378            mSeekReply = new AMessage;
379
380            status_t err = onSeek(msg);
381
382            if (err != OK) {
383                msg->post(50000);
384            }
385            break;
386        }
387
388        case kWhatFetcherNotify:
389        {
390            int32_t what;
391            CHECK(msg->findInt32("what", &what));
392
393            switch (what) {
394                case PlaylistFetcher::kWhatStarted:
395                    break;
396                case PlaylistFetcher::kWhatPaused:
397                case PlaylistFetcher::kWhatStopped:
398                {
399                    if (what == PlaylistFetcher::kWhatStopped) {
400                        AString uri;
401                        CHECK(msg->findString("uri", &uri));
402                        if (mFetcherInfos.removeItem(uri) < 0) {
403                            // ignore duplicated kWhatStopped messages.
404                            break;
405                        }
406
407                        if (mSwitchInProgress) {
408                            tryToFinishBandwidthSwitch();
409                        }
410                    }
411
412                    if (mContinuation != NULL) {
413                        CHECK_GT(mContinuationCounter, 0);
414                        if (--mContinuationCounter == 0) {
415                            mContinuation->post();
416
417                            if (mSeekReplyID != 0) {
418                                CHECK(mSeekReply != NULL);
419                                mSeekReply->setInt32("err", OK);
420                                mSeekReply->postReply(mSeekReplyID);
421                                mSeekReplyID = 0;
422                                mSeekReply.clear();
423                            }
424                        }
425                    }
426                    break;
427                }
428
429                case PlaylistFetcher::kWhatDurationUpdate:
430                {
431                    AString uri;
432                    CHECK(msg->findString("uri", &uri));
433
434                    int64_t durationUs;
435                    CHECK(msg->findInt64("durationUs", &durationUs));
436
437                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
438                    info->mDurationUs = durationUs;
439                    break;
440                }
441
442                case PlaylistFetcher::kWhatError:
443                {
444                    status_t err;
445                    CHECK(msg->findInt32("err", &err));
446
447                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
448
449                    // handle EOS on subtitle tracks independently
450                    AString uri;
451                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
452                        ssize_t i = mFetcherInfos.indexOfKey(uri);
453                        if (i >= 0) {
454                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
455                            if (fetcher != NULL) {
456                                uint32_t type = fetcher->getStreamTypeMask();
457                                if (type == STREAMTYPE_SUBTITLES) {
458                                    mPacketSources.valueFor(
459                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
460                                    break;
461                                }
462                            }
463                        }
464                    }
465
466                    if (mInPreparationPhase) {
467                        postPrepared(err);
468                    }
469
470                    cancelBandwidthSwitch();
471
472                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
473
474                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
475
476                    mPacketSources.valueFor(
477                            STREAMTYPE_SUBTITLES)->signalEOS(err);
478
479                    sp<AMessage> notify = mNotify->dup();
480                    notify->setInt32("what", kWhatError);
481                    notify->setInt32("err", err);
482                    notify->post();
483                    break;
484                }
485
486                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
487                {
488                    AString uri;
489                    CHECK(msg->findString("uri", &uri));
490
491                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
492                    info->mIsPrepared = true;
493
494                    if (mInPreparationPhase) {
495                        bool allFetchersPrepared = true;
496                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
497                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
498                                allFetchersPrepared = false;
499                                break;
500                            }
501                        }
502
503                        if (allFetchersPrepared) {
504                            postPrepared(OK);
505                        }
506                    }
507                    break;
508                }
509
510                case PlaylistFetcher::kWhatStartedAt:
511                {
512                    int32_t switchGeneration;
513                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
514
515                    if (switchGeneration != mSwitchGeneration) {
516                        break;
517                    }
518
519                    // Resume fetcher for the original variant; the resumed fetcher should
520                    // continue until the timestamps found in msg, which is stored by the
521                    // new fetcher to indicate where the new variant has started buffering.
522                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
523                        const FetcherInfo info = mFetcherInfos.valueAt(i);
524                        if (info.mToBeRemoved) {
525                            info.mFetcher->resumeUntilAsync(msg);
526                        }
527                    }
528                    break;
529                }
530
531                default:
532                    TRESPASS();
533            }
534
535            break;
536        }
537
538        case kWhatCheckBandwidth:
539        {
540            int32_t generation;
541            CHECK(msg->findInt32("generation", &generation));
542
543            if (generation != mCheckBandwidthGeneration) {
544                break;
545            }
546
547            onCheckBandwidth(msg);
548            break;
549        }
550
551        case kWhatChangeConfiguration:
552        {
553            onChangeConfiguration(msg);
554            break;
555        }
556
557        case kWhatChangeConfiguration2:
558        {
559            onChangeConfiguration2(msg);
560            break;
561        }
562
563        case kWhatChangeConfiguration3:
564        {
565            onChangeConfiguration3(msg);
566            break;
567        }
568
569        case kWhatFinishDisconnect2:
570        {
571            onFinishDisconnect2();
572            break;
573        }
574
575        case kWhatSwapped:
576        {
577            onSwapped(msg);
578            break;
579        }
580
581        case kWhatCheckSwitchDown:
582        {
583            onCheckSwitchDown();
584            break;
585        }
586
587        case kWhatSwitchDown:
588        {
589            onSwitchDown();
590            break;
591        }
592
593        default:
594            TRESPASS();
595            break;
596    }
597}
598
599// static
600int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
601    if (a->mBandwidth < b->mBandwidth) {
602        return -1;
603    } else if (a->mBandwidth == b->mBandwidth) {
604        return 0;
605    }
606
607    return 1;
608}
609
610// static
611LiveSession::StreamType LiveSession::indexToType(int idx) {
612    CHECK(idx >= 0 && idx < kMaxStreams);
613    return (StreamType)(1 << idx);
614}
615
616// static
617ssize_t LiveSession::typeToIndex(int32_t type) {
618    switch (type) {
619        case STREAMTYPE_AUDIO:
620            return 0;
621        case STREAMTYPE_VIDEO:
622            return 1;
623        case STREAMTYPE_SUBTITLES:
624            return 2;
625        default:
626            return -1;
627    };
628    return -1;
629}
630
631void LiveSession::onConnect(const sp<AMessage> &msg) {
632    AString url;
633    CHECK(msg->findString("url", &url));
634
635    KeyedVector<String8, String8> *headers = NULL;
636    if (!msg->findPointer("headers", (void **)&headers)) {
637        mExtraHeaders.clear();
638    } else {
639        mExtraHeaders = *headers;
640
641        delete headers;
642        headers = NULL;
643    }
644
645    // TODO currently we don't know if we are coming here from incognito mode
646    ALOGI("onConnect %s", uriDebugString(url).c_str());
647
648    mMasterURL = url;
649
650    bool dummy;
651    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
652
653    if (mPlaylist == NULL) {
654        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
655
656        postPrepared(ERROR_IO);
657        return;
658    }
659
660    // We trust the content provider to make a reasonable choice of preferred
661    // initial bandwidth by listing it first in the variant playlist.
662    // At startup we really don't have a good estimate on the available
663    // network bandwidth since we haven't tranferred any data yet. Once
664    // we have we can make a better informed choice.
665    size_t initialBandwidth = 0;
666    size_t initialBandwidthIndex = 0;
667
668    if (mPlaylist->isVariantPlaylist()) {
669        for (size_t i = 0; i < mPlaylist->size(); ++i) {
670            BandwidthItem item;
671
672            item.mPlaylistIndex = i;
673
674            sp<AMessage> meta;
675            AString uri;
676            mPlaylist->itemAt(i, &uri, &meta);
677
678            unsigned long bandwidth;
679            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
680
681            if (initialBandwidth == 0) {
682                initialBandwidth = item.mBandwidth;
683            }
684
685            mBandwidthItems.push(item);
686        }
687
688        CHECK_GT(mBandwidthItems.size(), 0u);
689
690        mBandwidthItems.sort(SortByBandwidth);
691
692        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
693            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
694                initialBandwidthIndex = i;
695                break;
696            }
697        }
698    } else {
699        // dummy item.
700        BandwidthItem item;
701        item.mPlaylistIndex = 0;
702        item.mBandwidth = 0;
703        mBandwidthItems.push(item);
704    }
705
706    mPlaylist->pickRandomMediaItems();
707    changeConfiguration(
708            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
709}
710
711void LiveSession::finishDisconnect() {
712    // No reconfiguration is currently pending, make sure none will trigger
713    // during disconnection either.
714    cancelCheckBandwidthEvent();
715
716    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
717    // (finishDisconnect, onFinishDisconnect2)
718    cancelBandwidthSwitch();
719
720    // cancel switch down monitor
721    mSwitchDownMonitor.clear();
722
723    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
724        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
725    }
726
727    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
728
729    mContinuationCounter = mFetcherInfos.size();
730    mContinuation = msg;
731
732    if (mContinuationCounter == 0) {
733        msg->post();
734    }
735}
736
737void LiveSession::onFinishDisconnect2() {
738    mContinuation.clear();
739
740    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
741    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
742
743    mPacketSources.valueFor(
744            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
745
746    sp<AMessage> response = new AMessage;
747    response->setInt32("err", OK);
748
749    response->postReply(mDisconnectReplyID);
750    mDisconnectReplyID = 0;
751}
752
753sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
754    ssize_t index = mFetcherInfos.indexOfKey(uri);
755
756    if (index >= 0) {
757        return NULL;
758    }
759
760    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
761    notify->setString("uri", uri);
762    notify->setInt32("switchGeneration", mSwitchGeneration);
763
764    FetcherInfo info;
765    info.mFetcher = new PlaylistFetcher(notify, this, uri);
766    info.mDurationUs = -1ll;
767    info.mIsPrepared = false;
768    info.mToBeRemoved = false;
769    looper()->registerHandler(info.mFetcher);
770
771    mFetcherInfos.add(uri, info);
772
773    return info.mFetcher;
774}
775
776/*
777 * Illustration of parameters:
778 *
779 * 0      `range_offset`
780 * +------------+-------------------------------------------------------+--+--+
781 * |            |                                 | next block to fetch |  |  |
782 * |            | `source` handle => `out` buffer |                     |  |  |
783 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
784 * |            |<----------- `range_length` / buffer capacity ----------->|  |
785 * |<------------------------------ file_size ------------------------------->|
786 *
787 * Special parameter values:
788 * - range_length == -1 means entire file
789 * - block_size == 0 means entire range
790 *
791 */
792ssize_t LiveSession::fetchFile(
793        const char *url, sp<ABuffer> *out,
794        int64_t range_offset, int64_t range_length,
795        uint32_t block_size, /* download block size */
796        sp<DataSource> *source, /* to return and reuse source */
797        String8 *actualUrl) {
798    off64_t size;
799    sp<DataSource> temp_source;
800    if (source == NULL) {
801        source = &temp_source;
802    }
803
804    if (*source == NULL) {
805        if (!strncasecmp(url, "file://", 7)) {
806            *source = new FileSource(url + 7);
807        } else if (strncasecmp(url, "http://", 7)
808                && strncasecmp(url, "https://", 8)) {
809            return ERROR_UNSUPPORTED;
810        } else {
811            KeyedVector<String8, String8> headers = mExtraHeaders;
812            if (range_offset > 0 || range_length >= 0) {
813                headers.add(
814                        String8("Range"),
815                        String8(
816                            StringPrintf(
817                                "bytes=%lld-%s",
818                                range_offset,
819                                range_length < 0
820                                    ? "" : StringPrintf("%lld",
821                                            range_offset + range_length - 1).c_str()).c_str()));
822            }
823            status_t err = mHTTPDataSource->connect(url, &headers);
824
825            if (err != OK) {
826                return err;
827            }
828
829            *source = mHTTPDataSource;
830        }
831    }
832
833    status_t getSizeErr = (*source)->getSize(&size);
834    if (getSizeErr != OK) {
835        size = 65536;
836    }
837
838    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
839    if (*out == NULL) {
840        buffer->setRange(0, 0);
841    }
842
843    ssize_t bytesRead = 0;
844    // adjust range_length if only reading partial block
845    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
846        range_length = buffer->size() + block_size;
847    }
848    for (;;) {
849        // Only resize when we don't know the size.
850        size_t bufferRemaining = buffer->capacity() - buffer->size();
851        if (bufferRemaining == 0 && getSizeErr != OK) {
852            bufferRemaining = 32768;
853
854            ALOGV("increasing download buffer to %zu bytes",
855                 buffer->size() + bufferRemaining);
856
857            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
858            memcpy(copy->data(), buffer->data(), buffer->size());
859            copy->setRange(0, buffer->size());
860
861            buffer = copy;
862        }
863
864        size_t maxBytesToRead = bufferRemaining;
865        if (range_length >= 0) {
866            int64_t bytesLeftInRange = range_length - buffer->size();
867            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
868                maxBytesToRead = bytesLeftInRange;
869
870                if (bytesLeftInRange == 0) {
871                    break;
872                }
873            }
874        }
875
876        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
877        // to help us break out of the loop.
878        ssize_t n = (*source)->readAt(
879                buffer->size(), buffer->data() + buffer->size(),
880                maxBytesToRead);
881
882        if (n < 0) {
883            return n;
884        }
885
886        if (n == 0) {
887            break;
888        }
889
890        buffer->setRange(0, buffer->size() + (size_t)n);
891        bytesRead += n;
892    }
893
894    *out = buffer;
895    if (actualUrl != NULL) {
896        *actualUrl = (*source)->getUri();
897        if (actualUrl->isEmpty()) {
898            *actualUrl = url;
899        }
900    }
901
902    return bytesRead;
903}
904
905sp<M3UParser> LiveSession::fetchPlaylist(
906        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
907    ALOGV("fetchPlaylist '%s'", url);
908
909    *unchanged = false;
910
911    sp<ABuffer> buffer;
912    String8 actualUrl;
913    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
914
915    if (err <= 0) {
916        return NULL;
917    }
918
919    // MD5 functionality is not available on the simulator, treat all
920    // playlists as changed.
921
922#if defined(HAVE_ANDROID_OS)
923    uint8_t hash[16];
924
925    MD5_CTX m;
926    MD5_Init(&m);
927    MD5_Update(&m, buffer->data(), buffer->size());
928
929    MD5_Final(hash, &m);
930
931    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
932        // playlist unchanged
933        *unchanged = true;
934
935        return NULL;
936    }
937
938    if (curPlaylistHash != NULL) {
939        memcpy(curPlaylistHash, hash, sizeof(hash));
940    }
941#endif
942
943    sp<M3UParser> playlist =
944        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
945
946    if (playlist->initCheck() != OK) {
947        ALOGE("failed to parse .m3u8 playlist");
948
949        return NULL;
950    }
951
952    return playlist;
953}
954
955static double uniformRand() {
956    return (double)rand() / RAND_MAX;
957}
958
959size_t LiveSession::getBandwidthIndex() {
960    if (mBandwidthItems.size() == 0) {
961        return 0;
962    }
963
964#if 1
965    char value[PROPERTY_VALUE_MAX];
966    ssize_t index = -1;
967    if (property_get("media.httplive.bw-index", value, NULL)) {
968        char *end;
969        index = strtol(value, &end, 10);
970        CHECK(end > value && *end == '\0');
971
972        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
973            index = mBandwidthItems.size() - 1;
974        }
975    }
976
977    if (index < 0) {
978        int32_t bandwidthBps;
979        if (mHTTPDataSource != NULL
980                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
981            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
982        } else {
983            ALOGV("no bandwidth estimate.");
984            return 0;  // Pick the lowest bandwidth stream by default.
985        }
986
987        char value[PROPERTY_VALUE_MAX];
988        if (property_get("media.httplive.max-bw", value, NULL)) {
989            char *end;
990            long maxBw = strtoul(value, &end, 10);
991            if (end > value && *end == '\0') {
992                if (maxBw > 0 && bandwidthBps > maxBw) {
993                    ALOGV("bandwidth capped to %ld bps", maxBw);
994                    bandwidthBps = maxBw;
995                }
996            }
997        }
998
999        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1000
1001        index = mBandwidthItems.size() - 1;
1002        while (index > 0) {
1003            // consider only 80% of the available bandwidth, but if we are switching up,
1004            // be even more conservative (70%) to avoid overestimating and immediately
1005            // switching back.
1006            size_t adjustedBandwidthBps = bandwidthBps;
1007            if (index > mCurBandwidthIndex) {
1008                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1009            } else {
1010                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1011            }
1012            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1013                break;
1014            }
1015            --index;
1016        }
1017    }
1018#elif 0
1019    // Change bandwidth at random()
1020    size_t index = uniformRand() * mBandwidthItems.size();
1021#elif 0
1022    // There's a 50% chance to stay on the current bandwidth and
1023    // a 50% chance to switch to the next higher bandwidth (wrapping around
1024    // to lowest)
1025    const size_t kMinIndex = 0;
1026
1027    static ssize_t mCurBandwidthIndex = -1;
1028
1029    size_t index;
1030    if (mCurBandwidthIndex < 0) {
1031        index = kMinIndex;
1032    } else if (uniformRand() < 0.5) {
1033        index = (size_t)mCurBandwidthIndex;
1034    } else {
1035        index = mCurBandwidthIndex + 1;
1036        if (index == mBandwidthItems.size()) {
1037            index = kMinIndex;
1038        }
1039    }
1040    mCurBandwidthIndex = index;
1041#elif 0
1042    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1043
1044    size_t index = mBandwidthItems.size() - 1;
1045    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1046        --index;
1047    }
1048#elif 1
1049    char value[PROPERTY_VALUE_MAX];
1050    size_t index;
1051    if (property_get("media.httplive.bw-index", value, NULL)) {
1052        char *end;
1053        index = strtoul(value, &end, 10);
1054        CHECK(end > value && *end == '\0');
1055
1056        if (index >= mBandwidthItems.size()) {
1057            index = mBandwidthItems.size() - 1;
1058        }
1059    } else {
1060        index = 0;
1061    }
1062#else
1063    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1064#endif
1065
1066    CHECK_GE(index, 0);
1067
1068    return index;
1069}
1070
1071status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1072    int64_t timeUs;
1073    CHECK(msg->findInt64("timeUs", &timeUs));
1074
1075    if (!mReconfigurationInProgress) {
1076        changeConfiguration(timeUs, mCurBandwidthIndex);
1077        return OK;
1078    } else {
1079        return -EWOULDBLOCK;
1080    }
1081}
1082
1083status_t LiveSession::getDuration(int64_t *durationUs) const {
1084    int64_t maxDurationUs = 0ll;
1085    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1086        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1087
1088        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1089            maxDurationUs = fetcherDurationUs;
1090        }
1091    }
1092
1093    *durationUs = maxDurationUs;
1094
1095    return OK;
1096}
1097
1098bool LiveSession::isSeekable() const {
1099    int64_t durationUs;
1100    return getDuration(&durationUs) == OK && durationUs >= 0;
1101}
1102
1103bool LiveSession::hasDynamicDuration() const {
1104    return false;
1105}
1106
1107size_t LiveSession::getTrackCount() const {
1108    if (mPlaylist == NULL) {
1109        return 0;
1110    } else {
1111        return mPlaylist->getTrackCount();
1112    }
1113}
1114
1115sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1116    if (mPlaylist == NULL) {
1117        return NULL;
1118    } else {
1119        return mPlaylist->getTrackInfo(trackIndex);
1120    }
1121}
1122
1123status_t LiveSession::selectTrack(size_t index, bool select) {
1124    status_t err = mPlaylist->selectTrack(index, select);
1125    if (err == OK) {
1126        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1127        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1128        msg->setInt32("pickTrack", select);
1129        msg->post();
1130    }
1131    return err;
1132}
1133
1134bool LiveSession::canSwitchUp() {
1135    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1136    status_t err = OK;
1137    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1138        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1139        int64_t dur = source->getBufferedDurationUs(&err);
1140        if (err == OK && dur > 10000000) {
1141            return true;
1142        }
1143    }
1144    return false;
1145}
1146
1147void LiveSession::changeConfiguration(
1148        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1149    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1150    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1151    cancelBandwidthSwitch();
1152
1153    CHECK(!mReconfigurationInProgress);
1154    mReconfigurationInProgress = true;
1155
1156    mCurBandwidthIndex = bandwidthIndex;
1157
1158    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1159          timeUs, bandwidthIndex, pickTrack);
1160
1161    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1162    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1163
1164    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1165    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1166
1167    AString URIs[kMaxStreams];
1168    for (size_t i = 0; i < kMaxStreams; ++i) {
1169        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1170            streamMask |= indexToType(i);
1171        }
1172    }
1173
1174    // Step 1, stop and discard fetchers that are no longer needed.
1175    // Pause those that we'll reuse.
1176    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1177        const AString &uri = mFetcherInfos.keyAt(i);
1178
1179        bool discardFetcher = true;
1180
1181        // If we're seeking all current fetchers are discarded.
1182        if (timeUs < 0ll) {
1183            // delay fetcher removal if not picking tracks
1184            discardFetcher = pickTrack;
1185
1186            for (size_t j = 0; j < kMaxStreams; ++j) {
1187                StreamType type = indexToType(j);
1188                if ((streamMask & type) && uri == URIs[j]) {
1189                    resumeMask |= type;
1190                    streamMask &= ~type;
1191                    discardFetcher = false;
1192                }
1193            }
1194        }
1195
1196        if (discardFetcher) {
1197            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1198        } else {
1199            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1200        }
1201    }
1202
1203    sp<AMessage> msg;
1204    if (timeUs < 0ll) {
1205        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1206        msg = new AMessage(kWhatChangeConfiguration3, id());
1207    } else {
1208        msg = new AMessage(kWhatChangeConfiguration2, id());
1209    }
1210    msg->setInt32("streamMask", streamMask);
1211    msg->setInt32("resumeMask", resumeMask);
1212    msg->setInt32("pickTrack", pickTrack);
1213    msg->setInt64("timeUs", timeUs);
1214    for (size_t i = 0; i < kMaxStreams; ++i) {
1215        if ((streamMask | resumeMask) & indexToType(i)) {
1216            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1217        }
1218    }
1219
1220    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1221    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1222    // fetchers have completed their asynchronous operation, we'll post
1223    // mContinuation, which then is handled below in onChangeConfiguration2.
1224    mContinuationCounter = mFetcherInfos.size();
1225    mContinuation = msg;
1226
1227    if (mContinuationCounter == 0) {
1228        msg->post();
1229
1230        if (mSeekReplyID != 0) {
1231            CHECK(mSeekReply != NULL);
1232            mSeekReply->setInt32("err", OK);
1233            mSeekReply->postReply(mSeekReplyID);
1234            mSeekReplyID = 0;
1235            mSeekReply.clear();
1236        }
1237    }
1238}
1239
1240void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1241    if (!mReconfigurationInProgress) {
1242        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1243        msg->findInt32("pickTrack", &pickTrack);
1244        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1245        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1246    } else {
1247        msg->post(1000000ll); // retry in 1 sec
1248    }
1249}
1250
1251void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1252    mContinuation.clear();
1253
1254    // All fetchers are either suspended or have been removed now.
1255
1256    uint32_t streamMask, resumeMask;
1257    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1258    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1259
1260    // currently onChangeConfiguration2 is only called for seeking;
1261    // remove the following CHECK if using it else where.
1262    CHECK_EQ(resumeMask, 0);
1263    streamMask |= resumeMask;
1264
1265    AString URIs[kMaxStreams];
1266    for (size_t i = 0; i < kMaxStreams; ++i) {
1267        if (streamMask & indexToType(i)) {
1268            const AString &uriKey = mStreams[i].uriKey();
1269            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1270            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1271        }
1272    }
1273
1274    // Determine which decoders to shutdown on the player side,
1275    // a decoder has to be shutdown if either
1276    // 1) its streamtype was active before but now longer isn't.
1277    // or
1278    // 2) its streamtype was already active and still is but the URI
1279    //    has changed.
1280    uint32_t changedMask = 0;
1281    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1282        if (((mStreamMask & streamMask & indexToType(i))
1283                && !(URIs[i] == mStreams[i].mUri))
1284                || (mStreamMask & ~streamMask & indexToType(i))) {
1285            changedMask |= indexToType(i);
1286        }
1287    }
1288
1289    if (changedMask == 0) {
1290        // If nothing changed as far as the audio/video decoders
1291        // are concerned we can proceed.
1292        onChangeConfiguration3(msg);
1293        return;
1294    }
1295
1296    // Something changed, inform the player which will shutdown the
1297    // corresponding decoders and will post the reply once that's done.
1298    // Handling the reply will continue executing below in
1299    // onChangeConfiguration3.
1300    sp<AMessage> notify = mNotify->dup();
1301    notify->setInt32("what", kWhatStreamsChanged);
1302    notify->setInt32("changedMask", changedMask);
1303
1304    msg->setWhat(kWhatChangeConfiguration3);
1305    msg->setTarget(id());
1306
1307    notify->setMessage("reply", msg);
1308    notify->post();
1309}
1310
1311void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1312    mContinuation.clear();
1313    // All remaining fetchers are still suspended, the player has shutdown
1314    // any decoders that needed it.
1315
1316    uint32_t streamMask, resumeMask;
1317    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1318    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1319
1320    int64_t timeUs;
1321    int32_t pickTrack;
1322    bool switching = false;
1323    CHECK(msg->findInt64("timeUs", &timeUs));
1324    CHECK(msg->findInt32("pickTrack", &pickTrack));
1325
1326    if (timeUs < 0ll) {
1327        if (!pickTrack) {
1328            switching = true;
1329        }
1330        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1331    } else {
1332        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1333    }
1334
1335    for (size_t i = 0; i < kMaxStreams; ++i) {
1336        if (streamMask & indexToType(i)) {
1337            if (switching) {
1338                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1339            } else {
1340                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1341            }
1342        }
1343    }
1344
1345    mNewStreamMask = streamMask | resumeMask;
1346    if (switching) {
1347        mSwapMask = mStreamMask & ~resumeMask;
1348    }
1349
1350    // Of all existing fetchers:
1351    // * Resume fetchers that are still needed and assign them original packet sources.
1352    // * Mark otherwise unneeded fetchers for removal.
1353    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1354    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1355        const AString &uri = mFetcherInfos.keyAt(i);
1356
1357        sp<AnotherPacketSource> sources[kMaxStreams];
1358        for (size_t j = 0; j < kMaxStreams; ++j) {
1359            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1360                sources[j] = mPacketSources.valueFor(indexToType(j));
1361
1362                if (j != kSubtitleIndex) {
1363                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1364                    sp<AnotherPacketSource> discontinuityQueue;
1365                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1366                    discontinuityQueue->queueDiscontinuity(
1367                            ATSParser::DISCONTINUITY_NONE,
1368                            NULL,
1369                            true);
1370                }
1371            }
1372        }
1373
1374        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1375        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1376                || sources[kSubtitleIndex] != NULL) {
1377            info.mFetcher->startAsync(
1378                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1379        } else {
1380            info.mToBeRemoved = true;
1381        }
1382    }
1383
1384    // streamMask now only contains the types that need a new fetcher created.
1385
1386    if (streamMask != 0) {
1387        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1388    }
1389
1390    // Find out when the original fetchers have buffered up to and start the new fetchers
1391    // at a later timestamp.
1392    for (size_t i = 0; i < kMaxStreams; i++) {
1393        if (!(indexToType(i) & streamMask)) {
1394            continue;
1395        }
1396
1397        AString uri;
1398        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1399
1400        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1401        CHECK(fetcher != NULL);
1402
1403        int32_t latestSeq = -1;
1404        int64_t startTimeUs = -1;
1405        int64_t segmentStartTimeUs = -1ll;
1406        int32_t discontinuitySeq = -1;
1407        sp<AnotherPacketSource> sources[kMaxStreams];
1408
1409        // TRICKY: looping from i as earlier streams are already removed from streamMask
1410        for (size_t j = i; j < kMaxStreams; ++j) {
1411            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1412            if ((streamMask & indexToType(j)) && uri == streamUri) {
1413                sources[j] = mPacketSources.valueFor(indexToType(j));
1414
1415                if (timeUs >= 0) {
1416                    sources[j]->clear();
1417                    startTimeUs = timeUs;
1418
1419                    sp<AnotherPacketSource> discontinuityQueue;
1420                    sp<AMessage> extra = new AMessage;
1421                    extra->setInt64("timeUs", timeUs);
1422                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1423                    discontinuityQueue->queueDiscontinuity(
1424                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1425                } else {
1426                    int32_t type;
1427                    int64_t srcSegmentStartTimeUs;
1428                    sp<AMessage> meta;
1429                    if (pickTrack) {
1430                        // selecting
1431                        meta = sources[j]->getLatestDequeuedMeta();
1432                    } else {
1433                        // adapting
1434                        meta = sources[j]->getLatestEnqueuedMeta();
1435                    }
1436
1437                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1438                        int64_t tmpUs;
1439                        CHECK(meta->findInt64("timeUs", &tmpUs));
1440                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1441                            startTimeUs = tmpUs;
1442                        }
1443
1444                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1445                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1446                            segmentStartTimeUs = tmpUs;
1447                        }
1448
1449                        int32_t seq;
1450                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1451                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1452                            discontinuitySeq = seq;
1453                        }
1454                    }
1455
1456                    if (pickTrack) {
1457                        // selecting track, queue discontinuities before content
1458                        sources[j]->clear();
1459                        if (j == kSubtitleIndex) {
1460                            break;
1461                        }
1462                        sp<AnotherPacketSource> discontinuityQueue;
1463                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1464                        discontinuityQueue->queueDiscontinuity(
1465                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1466                    } else {
1467                        // adapting, queue discontinuities after resume
1468                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1469                        sources[j]->clear();
1470                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1471                        if (extraStreams & indexToType(j)) {
1472                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1473                        }
1474                    }
1475                }
1476
1477                streamMask &= ~indexToType(j);
1478            }
1479        }
1480
1481        fetcher->startAsync(
1482                sources[kAudioIndex],
1483                sources[kVideoIndex],
1484                sources[kSubtitleIndex],
1485                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1486                segmentStartTimeUs,
1487                discontinuitySeq,
1488                switching);
1489    }
1490
1491    // All fetchers have now been started, the configuration change
1492    // has completed.
1493
1494    cancelCheckBandwidthEvent();
1495    scheduleCheckBandwidthEvent();
1496
1497    ALOGV("XXX configuration change completed.");
1498    mReconfigurationInProgress = false;
1499    if (switching) {
1500        mSwitchInProgress = true;
1501    } else {
1502        mStreamMask = mNewStreamMask;
1503    }
1504
1505    if (mDisconnectReplyID != 0) {
1506        finishDisconnect();
1507    }
1508}
1509
1510void LiveSession::onSwapped(const sp<AMessage> &msg) {
1511    int32_t switchGeneration;
1512    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1513    if (switchGeneration != mSwitchGeneration) {
1514        return;
1515    }
1516
1517    int32_t stream;
1518    CHECK(msg->findInt32("stream", &stream));
1519
1520    ssize_t idx = typeToIndex(stream);
1521    CHECK(idx >= 0);
1522    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1523        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1524    }
1525    mStreams[idx].mUri = mStreams[idx].mNewUri;
1526    mStreams[idx].mNewUri.clear();
1527
1528    mSwapMask &= ~stream;
1529    if (mSwapMask != 0) {
1530        return;
1531    }
1532
1533    // Check if new variant contains extra streams.
1534    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1535    while (extraStreams) {
1536        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1537        swapPacketSource(extraStream);
1538        extraStreams &= ~extraStream;
1539
1540        idx = typeToIndex(extraStream);
1541        CHECK(idx >= 0);
1542        if (mStreams[idx].mNewUri.empty()) {
1543            ALOGW("swapping extra stream type %d %s to empty stream",
1544                    extraStream, mStreams[idx].mUri.c_str());
1545        }
1546        mStreams[idx].mUri = mStreams[idx].mNewUri;
1547        mStreams[idx].mNewUri.clear();
1548    }
1549
1550    tryToFinishBandwidthSwitch();
1551}
1552
1553void LiveSession::onCheckSwitchDown() {
1554    if (mSwitchDownMonitor == NULL) {
1555        return;
1556    }
1557
1558    for (size_t i = 0; i < kMaxStreams; ++i) {
1559        int32_t targetDuration;
1560        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1561        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1562
1563        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1564            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1565            int64_t targetDurationUs = targetDuration * 1000000ll;
1566
1567            if (bufferedDurationUs < targetDurationUs / 3) {
1568                (new AMessage(kWhatSwitchDown, id()))->post();
1569                break;
1570            }
1571        }
1572    }
1573
1574    mSwitchDownMonitor->post(1000000ll);
1575}
1576
1577void LiveSession::onSwitchDown() {
1578    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1579        return;
1580    }
1581
1582    ssize_t bandwidthIndex = getBandwidthIndex();
1583    if (bandwidthIndex < mCurBandwidthIndex) {
1584        changeConfiguration(-1, bandwidthIndex, false);
1585        return;
1586    }
1587
1588    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1589}
1590
1591// Mark switch done when:
1592//   1. all old buffers are swapped out
1593void LiveSession::tryToFinishBandwidthSwitch() {
1594    if (!mSwitchInProgress) {
1595        return;
1596    }
1597
1598    bool needToRemoveFetchers = false;
1599    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1600        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1601            needToRemoveFetchers = true;
1602            break;
1603        }
1604    }
1605
1606    if (!needToRemoveFetchers && mSwapMask == 0) {
1607        ALOGI("mSwitchInProgress = false");
1608        mStreamMask = mNewStreamMask;
1609        mSwitchInProgress = false;
1610    }
1611}
1612
1613void LiveSession::scheduleCheckBandwidthEvent() {
1614    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1615    msg->setInt32("generation", mCheckBandwidthGeneration);
1616    msg->post(10000000ll);
1617}
1618
1619void LiveSession::cancelCheckBandwidthEvent() {
1620    ++mCheckBandwidthGeneration;
1621}
1622
1623void LiveSession::cancelBandwidthSwitch() {
1624    Mutex::Autolock lock(mSwapMutex);
1625    mSwitchGeneration++;
1626    mSwitchInProgress = false;
1627    mSwapMask = 0;
1628
1629    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1630        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1631        if (info.mToBeRemoved) {
1632            info.mToBeRemoved = false;
1633        }
1634    }
1635
1636    for (size_t i = 0; i < kMaxStreams; ++i) {
1637        if (!mStreams[i].mNewUri.empty()) {
1638            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1639            if (j < 0) {
1640                mStreams[i].mNewUri.clear();
1641                continue;
1642            }
1643
1644            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1645            info.mFetcher->stopAsync();
1646            mFetcherInfos.removeItemsAt(j);
1647            mStreams[i].mNewUri.clear();
1648        }
1649    }
1650}
1651
1652bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1653    if (mReconfigurationInProgress || mSwitchInProgress) {
1654        return false;
1655    }
1656
1657    if (mCurBandwidthIndex < 0) {
1658        return true;
1659    }
1660
1661    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1662        return false;
1663    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1664        return canSwitchUp();
1665    } else {
1666        return true;
1667    }
1668}
1669
1670void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1671    size_t bandwidthIndex = getBandwidthIndex();
1672    if (canSwitchBandwidthTo(bandwidthIndex)) {
1673        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1674    } else {
1675        // Come back and check again 10 seconds later in case there is nothing to do now.
1676        // If we DO change configuration, once that completes it'll schedule a new
1677        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1678        msg->post(10000000ll);
1679    }
1680}
1681
1682void LiveSession::postPrepared(status_t err) {
1683    CHECK(mInPreparationPhase);
1684
1685    sp<AMessage> notify = mNotify->dup();
1686    if (err == OK || err == ERROR_END_OF_STREAM) {
1687        notify->setInt32("what", kWhatPrepared);
1688    } else {
1689        notify->setInt32("what", kWhatPreparationFailed);
1690        notify->setInt32("err", err);
1691    }
1692
1693    notify->post();
1694
1695    mInPreparationPhase = false;
1696
1697    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1698    mSwitchDownMonitor->post();
1699}
1700
1701}  // namespace android
1702
1703