LiveSession.cpp revision f0d689934e70d3e5b3784265e890377db04c7c1d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mSubtitleGeneration(0),
67      mLastDequeuedTimeUs(0ll),
68      mRealTimeBaseUs(0ll),
69      mReconfigurationInProgress(false),
70      mSwitchInProgress(false),
71      mDisconnectReplyID(0),
72      mSeekReplyID(0),
73      mFirstTimeUsValid(false),
74      mFirstTimeUs(0),
75      mLastSeekTimeUs(0) {
76
77    mStreams[kAudioIndex] = StreamItem("audio");
78    mStreams[kVideoIndex] = StreamItem("video");
79    mStreams[kSubtitleIndex] = StreamItem("subtitles");
80
81    for (size_t i = 0; i < kMaxStreams; ++i) {
82        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
85        mBuffering[i] = false;
86    }
87}
88
89LiveSession::~LiveSession() {
90}
91
92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
93    ABuffer *discontinuity = new ABuffer(0);
94    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
95    discontinuity->meta()->setInt32("swapPacketSource", swap);
96    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
97    discontinuity->meta()->setInt64("timeUs", -1);
98    return discontinuity;
99}
100
101void LiveSession::swapPacketSource(StreamType stream) {
102    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
103    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
104    sp<AnotherPacketSource> tmp = aps;
105    aps = aps2;
106    aps2 = tmp;
107    aps2->clear();
108}
109
110status_t LiveSession::dequeueAccessUnit(
111        StreamType stream, sp<ABuffer> *accessUnit) {
112    if (!(mStreamMask & stream)) {
113        // return -EWOULDBLOCK to avoid halting the decoder
114        // when switching between audio/video and audio only.
115        return -EWOULDBLOCK;
116    }
117
118    status_t finalResult;
119    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
120    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
121        discontinuityQueue->dequeueAccessUnit(accessUnit);
122        // seeking, track switching
123        sp<AMessage> extra;
124        int64_t timeUs;
125        if ((*accessUnit)->meta()->findMessage("extra", &extra)
126                && extra != NULL
127                && extra->findInt64("timeUs", &timeUs)) {
128            // seeking only
129            mLastSeekTimeUs = timeUs;
130            mDiscontinuityOffsetTimesUs.clear();
131            mDiscontinuityAbsStartTimesUs.clear();
132        }
133        return INFO_DISCONTINUITY;
134    }
135
136    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
137
138    ssize_t idx = typeToIndex(stream);
139    if (!packetSource->hasBufferAvailable(&finalResult)) {
140        if (finalResult == OK) {
141            mBuffering[idx] = true;
142            return -EAGAIN;
143        } else {
144            return finalResult;
145        }
146    }
147
148    if (mBuffering[idx]) {
149        if (mSwitchInProgress
150                || packetSource->isFinished(0)
151                || packetSource->getEstimatedDurationUs() > 10000000ll) {
152            mBuffering[idx] = false;
153        }
154    }
155
156    if (mBuffering[idx]) {
157        return -EAGAIN;
158    }
159
160    // wait for counterpart
161    sp<AnotherPacketSource> otherSource;
162    uint32_t mask = mNewStreamMask & mStreamMask;
163    uint32_t fetchersMask  = 0;
164    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
165        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
166        fetchersMask |= fetcherMask;
167    }
168    mask &= fetchersMask;
169    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
170        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
171    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
172        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
173    }
174    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
175        return finalResult == OK ? -EAGAIN : finalResult;
176    }
177
178    status_t err = packetSource->dequeueAccessUnit(accessUnit);
179
180    size_t streamIdx;
181    const char *streamStr;
182    switch (stream) {
183        case STREAMTYPE_AUDIO:
184            streamIdx = kAudioIndex;
185            streamStr = "audio";
186            break;
187        case STREAMTYPE_VIDEO:
188            streamIdx = kVideoIndex;
189            streamStr = "video";
190            break;
191        case STREAMTYPE_SUBTITLES:
192            streamIdx = kSubtitleIndex;
193            streamStr = "subs";
194            break;
195        default:
196            TRESPASS();
197    }
198
199    StreamItem& strm = mStreams[streamIdx];
200    if (err == INFO_DISCONTINUITY) {
201        // adaptive streaming, discontinuities in the playlist
202        int32_t type;
203        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
204
205        sp<AMessage> extra;
206        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
207            extra.clear();
208        }
209
210        ALOGI("[%s] read discontinuity of type %d, extra = %s",
211              streamStr,
212              type,
213              extra == NULL ? "NULL" : extra->debugString().c_str());
214
215        int32_t swap;
216        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
217            int32_t switchGeneration;
218            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
219            {
220                Mutex::Autolock lock(mSwapMutex);
221                if (switchGeneration == mSwitchGeneration) {
222                    swapPacketSource(stream);
223                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
224                    msg->setInt32("stream", stream);
225                    msg->setInt32("switchGeneration", switchGeneration);
226                    msg->post();
227                }
228            }
229        } else {
230            size_t seq = strm.mCurDiscontinuitySeq;
231            int64_t offsetTimeUs;
232            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
233                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
234            } else {
235                offsetTimeUs = 0;
236            }
237
238            seq += 1;
239            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
240                int64_t firstTimeUs;
241                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
242                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
243                offsetTimeUs += strm.mLastSampleDurationUs;
244            } else {
245                offsetTimeUs += strm.mLastSampleDurationUs;
246            }
247
248            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
249        }
250    } else if (err == OK) {
251
252        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
253            int64_t timeUs;
254            int32_t discontinuitySeq = 0;
255            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
256            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
257            strm.mCurDiscontinuitySeq = discontinuitySeq;
258
259            int32_t discard = 0;
260            int64_t firstTimeUs;
261            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
262                int64_t durUs; // approximate sample duration
263                if (timeUs > strm.mLastDequeuedTimeUs) {
264                    durUs = timeUs - strm.mLastDequeuedTimeUs;
265                } else {
266                    durUs = strm.mLastDequeuedTimeUs - timeUs;
267                }
268                strm.mLastSampleDurationUs = durUs;
269                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
270            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
271                firstTimeUs = timeUs;
272            } else {
273                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
274                firstTimeUs = timeUs;
275            }
276
277            strm.mLastDequeuedTimeUs = timeUs;
278            if (timeUs >= firstTimeUs) {
279                timeUs -= firstTimeUs;
280            } else {
281                timeUs = 0;
282            }
283            timeUs += mLastSeekTimeUs;
284            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
285                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
286            }
287
288            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
289            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
290            mLastDequeuedTimeUs = timeUs;
291            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
292        } else if (stream == STREAMTYPE_SUBTITLES) {
293            int32_t subtitleGeneration;
294            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
295                    && subtitleGeneration != mSubtitleGeneration) {
296               return -EAGAIN;
297            };
298            (*accessUnit)->meta()->setInt32(
299                    "trackIndex", mPlaylist->getSelectedIndex());
300            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
301        }
302    } else {
303        ALOGI("[%s] encountered error %d", streamStr, err);
304    }
305
306    return err;
307}
308
309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
310    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
311    if (!(mStreamMask & stream)) {
312        return UNKNOWN_ERROR;
313    }
314
315    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
316
317    sp<MetaData> meta = packetSource->getFormat();
318
319    if (meta == NULL) {
320        return -EAGAIN;
321    }
322
323    return convertMetaDataToMessage(meta, format);
324}
325
326void LiveSession::connectAsync(
327        const char *url, const KeyedVector<String8, String8> *headers) {
328    sp<AMessage> msg = new AMessage(kWhatConnect, id());
329    msg->setString("url", url);
330
331    if (headers != NULL) {
332        msg->setPointer(
333                "headers",
334                new KeyedVector<String8, String8>(*headers));
335    }
336
337    msg->post();
338}
339
340status_t LiveSession::disconnect() {
341    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
342
343    sp<AMessage> response;
344    status_t err = msg->postAndAwaitResponse(&response);
345
346    return err;
347}
348
349status_t LiveSession::seekTo(int64_t timeUs) {
350    sp<AMessage> msg = new AMessage(kWhatSeek, id());
351    msg->setInt64("timeUs", timeUs);
352
353    sp<AMessage> response;
354    status_t err = msg->postAndAwaitResponse(&response);
355
356    return err;
357}
358
359void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
360    switch (msg->what()) {
361        case kWhatConnect:
362        {
363            onConnect(msg);
364            break;
365        }
366
367        case kWhatDisconnect:
368        {
369            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
370
371            if (mReconfigurationInProgress) {
372                break;
373            }
374
375            finishDisconnect();
376            break;
377        }
378
379        case kWhatSeek:
380        {
381            uint32_t seekReplyID;
382            CHECK(msg->senderAwaitsResponse(&seekReplyID));
383            mSeekReplyID = seekReplyID;
384            mSeekReply = new AMessage;
385
386            status_t err = onSeek(msg);
387
388            if (err != OK) {
389                msg->post(50000);
390            }
391            break;
392        }
393
394        case kWhatFetcherNotify:
395        {
396            int32_t what;
397            CHECK(msg->findInt32("what", &what));
398
399            switch (what) {
400                case PlaylistFetcher::kWhatStarted:
401                    break;
402                case PlaylistFetcher::kWhatPaused:
403                case PlaylistFetcher::kWhatStopped:
404                {
405                    if (what == PlaylistFetcher::kWhatStopped) {
406                        AString uri;
407                        CHECK(msg->findString("uri", &uri));
408                        if (mFetcherInfos.removeItem(uri) < 0) {
409                            // ignore duplicated kWhatStopped messages.
410                            break;
411                        }
412
413                        if (mSwitchInProgress) {
414                            tryToFinishBandwidthSwitch();
415                        }
416                    }
417
418                    if (mContinuation != NULL) {
419                        CHECK_GT(mContinuationCounter, 0);
420                        if (--mContinuationCounter == 0) {
421                            mContinuation->post();
422
423                            if (mSeekReplyID != 0) {
424                                CHECK(mSeekReply != NULL);
425                                mSeekReply->setInt32("err", OK);
426                                mSeekReply->postReply(mSeekReplyID);
427                                mSeekReplyID = 0;
428                                mSeekReply.clear();
429                            }
430                        }
431                    }
432                    break;
433                }
434
435                case PlaylistFetcher::kWhatDurationUpdate:
436                {
437                    AString uri;
438                    CHECK(msg->findString("uri", &uri));
439
440                    int64_t durationUs;
441                    CHECK(msg->findInt64("durationUs", &durationUs));
442
443                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
444                    info->mDurationUs = durationUs;
445                    break;
446                }
447
448                case PlaylistFetcher::kWhatError:
449                {
450                    status_t err;
451                    CHECK(msg->findInt32("err", &err));
452
453                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
454
455                    // handle EOS on subtitle tracks independently
456                    AString uri;
457                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
458                        ssize_t i = mFetcherInfos.indexOfKey(uri);
459                        if (i >= 0) {
460                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
461                            if (fetcher != NULL) {
462                                uint32_t type = fetcher->getStreamTypeMask();
463                                if (type == STREAMTYPE_SUBTITLES) {
464                                    mPacketSources.valueFor(
465                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
466                                    break;
467                                }
468                            }
469                        }
470                    }
471
472                    if (mInPreparationPhase) {
473                        postPrepared(err);
474                    }
475
476                    cancelBandwidthSwitch();
477
478                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
479
480                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
481
482                    mPacketSources.valueFor(
483                            STREAMTYPE_SUBTITLES)->signalEOS(err);
484
485                    sp<AMessage> notify = mNotify->dup();
486                    notify->setInt32("what", kWhatError);
487                    notify->setInt32("err", err);
488                    notify->post();
489                    break;
490                }
491
492                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
493                {
494                    AString uri;
495                    CHECK(msg->findString("uri", &uri));
496
497                    if (mFetcherInfos.indexOfKey(uri) < 0) {
498                        ALOGE("couldn't find uri");
499                        break;
500                    }
501                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
502                    info->mIsPrepared = true;
503
504                    if (mInPreparationPhase) {
505                        bool allFetchersPrepared = true;
506                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
507                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
508                                allFetchersPrepared = false;
509                                break;
510                            }
511                        }
512
513                        if (allFetchersPrepared) {
514                            postPrepared(OK);
515                        }
516                    }
517                    break;
518                }
519
520                case PlaylistFetcher::kWhatStartedAt:
521                {
522                    int32_t switchGeneration;
523                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
524
525                    if (switchGeneration != mSwitchGeneration) {
526                        break;
527                    }
528
529                    // Resume fetcher for the original variant; the resumed fetcher should
530                    // continue until the timestamps found in msg, which is stored by the
531                    // new fetcher to indicate where the new variant has started buffering.
532                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
533                        const FetcherInfo info = mFetcherInfos.valueAt(i);
534                        if (info.mToBeRemoved) {
535                            info.mFetcher->resumeUntilAsync(msg);
536                        }
537                    }
538                    break;
539                }
540
541                default:
542                    TRESPASS();
543            }
544
545            break;
546        }
547
548        case kWhatCheckBandwidth:
549        {
550            int32_t generation;
551            CHECK(msg->findInt32("generation", &generation));
552
553            if (generation != mCheckBandwidthGeneration) {
554                break;
555            }
556
557            onCheckBandwidth(msg);
558            break;
559        }
560
561        case kWhatChangeConfiguration:
562        {
563            onChangeConfiguration(msg);
564            break;
565        }
566
567        case kWhatChangeConfiguration2:
568        {
569            onChangeConfiguration2(msg);
570            break;
571        }
572
573        case kWhatChangeConfiguration3:
574        {
575            onChangeConfiguration3(msg);
576            break;
577        }
578
579        case kWhatFinishDisconnect2:
580        {
581            onFinishDisconnect2();
582            break;
583        }
584
585        case kWhatSwapped:
586        {
587            onSwapped(msg);
588            break;
589        }
590
591        case kWhatCheckSwitchDown:
592        {
593            onCheckSwitchDown();
594            break;
595        }
596
597        case kWhatSwitchDown:
598        {
599            onSwitchDown();
600            break;
601        }
602
603        default:
604            TRESPASS();
605            break;
606    }
607}
608
609// static
610int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
611    if (a->mBandwidth < b->mBandwidth) {
612        return -1;
613    } else if (a->mBandwidth == b->mBandwidth) {
614        return 0;
615    }
616
617    return 1;
618}
619
620// static
621LiveSession::StreamType LiveSession::indexToType(int idx) {
622    CHECK(idx >= 0 && idx < kMaxStreams);
623    return (StreamType)(1 << idx);
624}
625
626// static
627ssize_t LiveSession::typeToIndex(int32_t type) {
628    switch (type) {
629        case STREAMTYPE_AUDIO:
630            return 0;
631        case STREAMTYPE_VIDEO:
632            return 1;
633        case STREAMTYPE_SUBTITLES:
634            return 2;
635        default:
636            return -1;
637    };
638    return -1;
639}
640
641void LiveSession::onConnect(const sp<AMessage> &msg) {
642    AString url;
643    CHECK(msg->findString("url", &url));
644
645    KeyedVector<String8, String8> *headers = NULL;
646    if (!msg->findPointer("headers", (void **)&headers)) {
647        mExtraHeaders.clear();
648    } else {
649        mExtraHeaders = *headers;
650
651        delete headers;
652        headers = NULL;
653    }
654
655    // TODO currently we don't know if we are coming here from incognito mode
656    ALOGI("onConnect %s", uriDebugString(url).c_str());
657
658    mMasterURL = url;
659
660    bool dummy;
661    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
662
663    if (mPlaylist == NULL) {
664        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
665
666        postPrepared(ERROR_IO);
667        return;
668    }
669
670    // We trust the content provider to make a reasonable choice of preferred
671    // initial bandwidth by listing it first in the variant playlist.
672    // At startup we really don't have a good estimate on the available
673    // network bandwidth since we haven't tranferred any data yet. Once
674    // we have we can make a better informed choice.
675    size_t initialBandwidth = 0;
676    size_t initialBandwidthIndex = 0;
677
678    if (mPlaylist->isVariantPlaylist()) {
679        for (size_t i = 0; i < mPlaylist->size(); ++i) {
680            BandwidthItem item;
681
682            item.mPlaylistIndex = i;
683
684            sp<AMessage> meta;
685            AString uri;
686            mPlaylist->itemAt(i, &uri, &meta);
687
688            unsigned long bandwidth;
689            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
690
691            if (initialBandwidth == 0) {
692                initialBandwidth = item.mBandwidth;
693            }
694
695            mBandwidthItems.push(item);
696        }
697
698        CHECK_GT(mBandwidthItems.size(), 0u);
699
700        mBandwidthItems.sort(SortByBandwidth);
701
702        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
703            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
704                initialBandwidthIndex = i;
705                break;
706            }
707        }
708    } else {
709        // dummy item.
710        BandwidthItem item;
711        item.mPlaylistIndex = 0;
712        item.mBandwidth = 0;
713        mBandwidthItems.push(item);
714    }
715
716    mPlaylist->pickRandomMediaItems();
717    changeConfiguration(
718            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
719}
720
721void LiveSession::finishDisconnect() {
722    // No reconfiguration is currently pending, make sure none will trigger
723    // during disconnection either.
724    cancelCheckBandwidthEvent();
725
726    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
727    // (finishDisconnect, onFinishDisconnect2)
728    cancelBandwidthSwitch();
729
730    // cancel switch down monitor
731    mSwitchDownMonitor.clear();
732
733    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
734        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
735    }
736
737    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
738
739    mContinuationCounter = mFetcherInfos.size();
740    mContinuation = msg;
741
742    if (mContinuationCounter == 0) {
743        msg->post();
744    }
745}
746
747void LiveSession::onFinishDisconnect2() {
748    mContinuation.clear();
749
750    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
751    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
752
753    mPacketSources.valueFor(
754            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
755
756    sp<AMessage> response = new AMessage;
757    response->setInt32("err", OK);
758
759    response->postReply(mDisconnectReplyID);
760    mDisconnectReplyID = 0;
761}
762
763sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
764    ssize_t index = mFetcherInfos.indexOfKey(uri);
765
766    if (index >= 0) {
767        return NULL;
768    }
769
770    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
771    notify->setString("uri", uri);
772    notify->setInt32("switchGeneration", mSwitchGeneration);
773
774    FetcherInfo info;
775    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
776    info.mDurationUs = -1ll;
777    info.mIsPrepared = false;
778    info.mToBeRemoved = false;
779    looper()->registerHandler(info.mFetcher);
780
781    mFetcherInfos.add(uri, info);
782
783    return info.mFetcher;
784}
785
786/*
787 * Illustration of parameters:
788 *
789 * 0      `range_offset`
790 * +------------+-------------------------------------------------------+--+--+
791 * |            |                                 | next block to fetch |  |  |
792 * |            | `source` handle => `out` buffer |                     |  |  |
793 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
794 * |            |<----------- `range_length` / buffer capacity ----------->|  |
795 * |<------------------------------ file_size ------------------------------->|
796 *
797 * Special parameter values:
798 * - range_length == -1 means entire file
799 * - block_size == 0 means entire range
800 *
801 */
802ssize_t LiveSession::fetchFile(
803        const char *url, sp<ABuffer> *out,
804        int64_t range_offset, int64_t range_length,
805        uint32_t block_size, /* download block size */
806        sp<DataSource> *source, /* to return and reuse source */
807        String8 *actualUrl) {
808    off64_t size;
809    sp<DataSource> temp_source;
810    if (source == NULL) {
811        source = &temp_source;
812    }
813
814    if (*source == NULL) {
815        if (!strncasecmp(url, "file://", 7)) {
816            *source = new FileSource(url + 7);
817        } else if (strncasecmp(url, "http://", 7)
818                && strncasecmp(url, "https://", 8)) {
819            return ERROR_UNSUPPORTED;
820        } else {
821            KeyedVector<String8, String8> headers = mExtraHeaders;
822            if (range_offset > 0 || range_length >= 0) {
823                headers.add(
824                        String8("Range"),
825                        String8(
826                            StringPrintf(
827                                "bytes=%lld-%s",
828                                range_offset,
829                                range_length < 0
830                                    ? "" : StringPrintf("%lld",
831                                            range_offset + range_length - 1).c_str()).c_str()));
832            }
833            status_t err = mHTTPDataSource->connect(url, &headers);
834
835            if (err != OK) {
836                return err;
837            }
838
839            *source = mHTTPDataSource;
840        }
841    }
842
843    status_t getSizeErr = (*source)->getSize(&size);
844    if (getSizeErr != OK) {
845        size = 65536;
846    }
847
848    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
849    if (*out == NULL) {
850        buffer->setRange(0, 0);
851    }
852
853    ssize_t bytesRead = 0;
854    // adjust range_length if only reading partial block
855    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
856        range_length = buffer->size() + block_size;
857    }
858    for (;;) {
859        // Only resize when we don't know the size.
860        size_t bufferRemaining = buffer->capacity() - buffer->size();
861        if (bufferRemaining == 0 && getSizeErr != OK) {
862            bufferRemaining = 32768;
863
864            ALOGV("increasing download buffer to %zu bytes",
865                 buffer->size() + bufferRemaining);
866
867            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
868            memcpy(copy->data(), buffer->data(), buffer->size());
869            copy->setRange(0, buffer->size());
870
871            buffer = copy;
872        }
873
874        size_t maxBytesToRead = bufferRemaining;
875        if (range_length >= 0) {
876            int64_t bytesLeftInRange = range_length - buffer->size();
877            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
878                maxBytesToRead = bytesLeftInRange;
879
880                if (bytesLeftInRange == 0) {
881                    break;
882                }
883            }
884        }
885
886        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
887        // to help us break out of the loop.
888        ssize_t n = (*source)->readAt(
889                buffer->size(), buffer->data() + buffer->size(),
890                maxBytesToRead);
891
892        if (n < 0) {
893            return n;
894        }
895
896        if (n == 0) {
897            break;
898        }
899
900        buffer->setRange(0, buffer->size() + (size_t)n);
901        bytesRead += n;
902    }
903
904    *out = buffer;
905    if (actualUrl != NULL) {
906        *actualUrl = (*source)->getUri();
907        if (actualUrl->isEmpty()) {
908            *actualUrl = url;
909        }
910    }
911
912    return bytesRead;
913}
914
915sp<M3UParser> LiveSession::fetchPlaylist(
916        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
917    ALOGV("fetchPlaylist '%s'", url);
918
919    *unchanged = false;
920
921    sp<ABuffer> buffer;
922    String8 actualUrl;
923    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
924
925    if (err <= 0) {
926        return NULL;
927    }
928
929    // MD5 functionality is not available on the simulator, treat all
930    // playlists as changed.
931
932#if defined(HAVE_ANDROID_OS)
933    uint8_t hash[16];
934
935    MD5_CTX m;
936    MD5_Init(&m);
937    MD5_Update(&m, buffer->data(), buffer->size());
938
939    MD5_Final(hash, &m);
940
941    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
942        // playlist unchanged
943        *unchanged = true;
944
945        return NULL;
946    }
947
948    if (curPlaylistHash != NULL) {
949        memcpy(curPlaylistHash, hash, sizeof(hash));
950    }
951#endif
952
953    sp<M3UParser> playlist =
954        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
955
956    if (playlist->initCheck() != OK) {
957        ALOGE("failed to parse .m3u8 playlist");
958
959        return NULL;
960    }
961
962    return playlist;
963}
964
965static double uniformRand() {
966    return (double)rand() / RAND_MAX;
967}
968
969size_t LiveSession::getBandwidthIndex() {
970    if (mBandwidthItems.size() == 0) {
971        return 0;
972    }
973
974#if 1
975    char value[PROPERTY_VALUE_MAX];
976    ssize_t index = -1;
977    if (property_get("media.httplive.bw-index", value, NULL)) {
978        char *end;
979        index = strtol(value, &end, 10);
980        CHECK(end > value && *end == '\0');
981
982        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
983            index = mBandwidthItems.size() - 1;
984        }
985    }
986
987    if (index < 0) {
988        int32_t bandwidthBps;
989        if (mHTTPDataSource != NULL
990                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
991            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
992        } else {
993            ALOGV("no bandwidth estimate.");
994            return 0;  // Pick the lowest bandwidth stream by default.
995        }
996
997        char value[PROPERTY_VALUE_MAX];
998        if (property_get("media.httplive.max-bw", value, NULL)) {
999            char *end;
1000            long maxBw = strtoul(value, &end, 10);
1001            if (end > value && *end == '\0') {
1002                if (maxBw > 0 && bandwidthBps > maxBw) {
1003                    ALOGV("bandwidth capped to %ld bps", maxBw);
1004                    bandwidthBps = maxBw;
1005                }
1006            }
1007        }
1008
1009        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1010
1011        index = mBandwidthItems.size() - 1;
1012        while (index > 0) {
1013            // consider only 80% of the available bandwidth, but if we are switching up,
1014            // be even more conservative (70%) to avoid overestimating and immediately
1015            // switching back.
1016            size_t adjustedBandwidthBps = bandwidthBps;
1017            if (index > mCurBandwidthIndex) {
1018                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1019            } else {
1020                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1021            }
1022            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1023                break;
1024            }
1025            --index;
1026        }
1027    }
1028#elif 0
1029    // Change bandwidth at random()
1030    size_t index = uniformRand() * mBandwidthItems.size();
1031#elif 0
1032    // There's a 50% chance to stay on the current bandwidth and
1033    // a 50% chance to switch to the next higher bandwidth (wrapping around
1034    // to lowest)
1035    const size_t kMinIndex = 0;
1036
1037    static ssize_t mCurBandwidthIndex = -1;
1038
1039    size_t index;
1040    if (mCurBandwidthIndex < 0) {
1041        index = kMinIndex;
1042    } else if (uniformRand() < 0.5) {
1043        index = (size_t)mCurBandwidthIndex;
1044    } else {
1045        index = mCurBandwidthIndex + 1;
1046        if (index == mBandwidthItems.size()) {
1047            index = kMinIndex;
1048        }
1049    }
1050    mCurBandwidthIndex = index;
1051#elif 0
1052    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1053
1054    size_t index = mBandwidthItems.size() - 1;
1055    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1056        --index;
1057    }
1058#elif 1
1059    char value[PROPERTY_VALUE_MAX];
1060    size_t index;
1061    if (property_get("media.httplive.bw-index", value, NULL)) {
1062        char *end;
1063        index = strtoul(value, &end, 10);
1064        CHECK(end > value && *end == '\0');
1065
1066        if (index >= mBandwidthItems.size()) {
1067            index = mBandwidthItems.size() - 1;
1068        }
1069    } else {
1070        index = 0;
1071    }
1072#else
1073    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1074#endif
1075
1076    CHECK_GE(index, 0);
1077
1078    return index;
1079}
1080
1081int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1082    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1083    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1084    if (audioMeta != NULL) {
1085        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1086    }
1087
1088    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1089    if (videoMeta != NULL
1090            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1091        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1092            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1093        }
1094
1095    }
1096    return minSegmentStartTimeUs;
1097}
1098
1099status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1100    int64_t timeUs;
1101    CHECK(msg->findInt64("timeUs", &timeUs));
1102
1103    if (!mReconfigurationInProgress) {
1104        changeConfiguration(timeUs, mCurBandwidthIndex);
1105        return OK;
1106    } else {
1107        return -EWOULDBLOCK;
1108    }
1109}
1110
1111status_t LiveSession::getDuration(int64_t *durationUs) const {
1112    int64_t maxDurationUs = -1ll;
1113    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1114        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1115
1116        if (fetcherDurationUs > maxDurationUs) {
1117            maxDurationUs = fetcherDurationUs;
1118        }
1119    }
1120
1121    *durationUs = maxDurationUs;
1122
1123    return OK;
1124}
1125
1126bool LiveSession::isSeekable() const {
1127    int64_t durationUs;
1128    return getDuration(&durationUs) == OK && durationUs >= 0;
1129}
1130
1131bool LiveSession::hasDynamicDuration() const {
1132    return false;
1133}
1134
1135size_t LiveSession::getTrackCount() const {
1136    if (mPlaylist == NULL) {
1137        return 0;
1138    } else {
1139        return mPlaylist->getTrackCount();
1140    }
1141}
1142
1143sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1144    if (mPlaylist == NULL) {
1145        return NULL;
1146    } else {
1147        return mPlaylist->getTrackInfo(trackIndex);
1148    }
1149}
1150
1151status_t LiveSession::selectTrack(size_t index, bool select) {
1152    if (mPlaylist == NULL) {
1153        return INVALID_OPERATION;
1154    }
1155
1156    ++mSubtitleGeneration;
1157    status_t err = mPlaylist->selectTrack(index, select);
1158    if (err == OK) {
1159        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1160        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1161        msg->setInt32("pickTrack", select);
1162        msg->post();
1163    }
1164    return err;
1165}
1166
1167ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1168    if (mPlaylist == NULL) {
1169        return -1;
1170    } else {
1171        return mPlaylist->getSelectedTrack(type);
1172    }
1173}
1174
1175bool LiveSession::canSwitchUp() {
1176    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1177    status_t err = OK;
1178    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1179        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1180        int64_t dur = source->getBufferedDurationUs(&err);
1181        if (err == OK && dur > 10000000) {
1182            return true;
1183        }
1184    }
1185    return false;
1186}
1187
1188void LiveSession::changeConfiguration(
1189        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1190    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1191    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1192    cancelBandwidthSwitch();
1193
1194    CHECK(!mReconfigurationInProgress);
1195    mReconfigurationInProgress = true;
1196
1197    mCurBandwidthIndex = bandwidthIndex;
1198
1199    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1200          timeUs, bandwidthIndex, pickTrack);
1201
1202    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1203    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1204
1205    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1206    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1207
1208    AString URIs[kMaxStreams];
1209    for (size_t i = 0; i < kMaxStreams; ++i) {
1210        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1211            streamMask |= indexToType(i);
1212        }
1213    }
1214
1215    // Step 1, stop and discard fetchers that are no longer needed.
1216    // Pause those that we'll reuse.
1217    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1218        const AString &uri = mFetcherInfos.keyAt(i);
1219
1220        bool discardFetcher = true;
1221
1222        // If we're seeking all current fetchers are discarded.
1223        if (timeUs < 0ll) {
1224            // delay fetcher removal if not picking tracks
1225            discardFetcher = pickTrack;
1226
1227            for (size_t j = 0; j < kMaxStreams; ++j) {
1228                StreamType type = indexToType(j);
1229                if ((streamMask & type) && uri == URIs[j]) {
1230                    resumeMask |= type;
1231                    streamMask &= ~type;
1232                    discardFetcher = false;
1233                }
1234            }
1235        }
1236
1237        if (discardFetcher) {
1238            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1239        } else {
1240            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1241        }
1242    }
1243
1244    sp<AMessage> msg;
1245    if (timeUs < 0ll) {
1246        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1247        msg = new AMessage(kWhatChangeConfiguration3, id());
1248    } else {
1249        msg = new AMessage(kWhatChangeConfiguration2, id());
1250    }
1251    msg->setInt32("streamMask", streamMask);
1252    msg->setInt32("resumeMask", resumeMask);
1253    msg->setInt32("pickTrack", pickTrack);
1254    msg->setInt64("timeUs", timeUs);
1255    for (size_t i = 0; i < kMaxStreams; ++i) {
1256        if ((streamMask | resumeMask) & indexToType(i)) {
1257            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1258        }
1259    }
1260
1261    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1262    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1263    // fetchers have completed their asynchronous operation, we'll post
1264    // mContinuation, which then is handled below in onChangeConfiguration2.
1265    mContinuationCounter = mFetcherInfos.size();
1266    mContinuation = msg;
1267
1268    if (mContinuationCounter == 0) {
1269        msg->post();
1270
1271        if (mSeekReplyID != 0) {
1272            CHECK(mSeekReply != NULL);
1273            mSeekReply->setInt32("err", OK);
1274            mSeekReply->postReply(mSeekReplyID);
1275            mSeekReplyID = 0;
1276            mSeekReply.clear();
1277        }
1278    }
1279}
1280
1281void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1282    if (!mReconfigurationInProgress) {
1283        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1284        msg->findInt32("pickTrack", &pickTrack);
1285        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1286        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1287    } else {
1288        msg->post(1000000ll); // retry in 1 sec
1289    }
1290}
1291
1292void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1293    mContinuation.clear();
1294
1295    // All fetchers are either suspended or have been removed now.
1296
1297    uint32_t streamMask, resumeMask;
1298    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1299    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1300
1301    // currently onChangeConfiguration2 is only called for seeking;
1302    // remove the following CHECK if using it else where.
1303    CHECK_EQ(resumeMask, 0);
1304    streamMask |= resumeMask;
1305
1306    AString URIs[kMaxStreams];
1307    for (size_t i = 0; i < kMaxStreams; ++i) {
1308        if (streamMask & indexToType(i)) {
1309            const AString &uriKey = mStreams[i].uriKey();
1310            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1311            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1312        }
1313    }
1314
1315    // Determine which decoders to shutdown on the player side,
1316    // a decoder has to be shutdown if either
1317    // 1) its streamtype was active before but now longer isn't.
1318    // or
1319    // 2) its streamtype was already active and still is but the URI
1320    //    has changed.
1321    uint32_t changedMask = 0;
1322    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1323        if (((mStreamMask & streamMask & indexToType(i))
1324                && !(URIs[i] == mStreams[i].mUri))
1325                || (mStreamMask & ~streamMask & indexToType(i))) {
1326            changedMask |= indexToType(i);
1327        }
1328    }
1329
1330    if (changedMask == 0) {
1331        // If nothing changed as far as the audio/video decoders
1332        // are concerned we can proceed.
1333        onChangeConfiguration3(msg);
1334        return;
1335    }
1336
1337    // Something changed, inform the player which will shutdown the
1338    // corresponding decoders and will post the reply once that's done.
1339    // Handling the reply will continue executing below in
1340    // onChangeConfiguration3.
1341    sp<AMessage> notify = mNotify->dup();
1342    notify->setInt32("what", kWhatStreamsChanged);
1343    notify->setInt32("changedMask", changedMask);
1344
1345    msg->setWhat(kWhatChangeConfiguration3);
1346    msg->setTarget(id());
1347
1348    notify->setMessage("reply", msg);
1349    notify->post();
1350}
1351
1352void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1353    mContinuation.clear();
1354    // All remaining fetchers are still suspended, the player has shutdown
1355    // any decoders that needed it.
1356
1357    uint32_t streamMask, resumeMask;
1358    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1359    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1360
1361    int64_t timeUs;
1362    int32_t pickTrack;
1363    bool switching = false;
1364    CHECK(msg->findInt64("timeUs", &timeUs));
1365    CHECK(msg->findInt32("pickTrack", &pickTrack));
1366
1367    if (timeUs < 0ll) {
1368        if (!pickTrack) {
1369            switching = true;
1370        }
1371        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1372    } else {
1373        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1374    }
1375
1376    for (size_t i = 0; i < kMaxStreams; ++i) {
1377        if (streamMask & indexToType(i)) {
1378            if (switching) {
1379                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1380            } else {
1381                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1382            }
1383        }
1384    }
1385
1386    mNewStreamMask = streamMask | resumeMask;
1387    if (switching) {
1388        mSwapMask = mStreamMask & ~resumeMask;
1389    }
1390
1391    // Of all existing fetchers:
1392    // * Resume fetchers that are still needed and assign them original packet sources.
1393    // * Mark otherwise unneeded fetchers for removal.
1394    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1395    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1396        const AString &uri = mFetcherInfos.keyAt(i);
1397
1398        sp<AnotherPacketSource> sources[kMaxStreams];
1399        for (size_t j = 0; j < kMaxStreams; ++j) {
1400            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1401                sources[j] = mPacketSources.valueFor(indexToType(j));
1402
1403                if (j != kSubtitleIndex) {
1404                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1405                    sp<AnotherPacketSource> discontinuityQueue;
1406                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1407                    discontinuityQueue->queueDiscontinuity(
1408                            ATSParser::DISCONTINUITY_NONE,
1409                            NULL,
1410                            true);
1411                }
1412            }
1413        }
1414
1415        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1416        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1417                || sources[kSubtitleIndex] != NULL) {
1418            info.mFetcher->startAsync(
1419                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1420        } else {
1421            info.mToBeRemoved = true;
1422        }
1423    }
1424
1425    // streamMask now only contains the types that need a new fetcher created.
1426
1427    if (streamMask != 0) {
1428        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1429    }
1430
1431    // Find out when the original fetchers have buffered up to and start the new fetchers
1432    // at a later timestamp.
1433    for (size_t i = 0; i < kMaxStreams; i++) {
1434        if (!(indexToType(i) & streamMask)) {
1435            continue;
1436        }
1437
1438        AString uri;
1439        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1440
1441        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1442        CHECK(fetcher != NULL);
1443
1444        int32_t latestSeq = -1;
1445        int64_t startTimeUs = -1;
1446        int64_t segmentStartTimeUs = -1ll;
1447        int32_t discontinuitySeq = -1;
1448        sp<AnotherPacketSource> sources[kMaxStreams];
1449
1450        if (i == kSubtitleIndex) {
1451            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1452        }
1453
1454        // TRICKY: looping from i as earlier streams are already removed from streamMask
1455        for (size_t j = i; j < kMaxStreams; ++j) {
1456            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1457            if ((streamMask & indexToType(j)) && uri == streamUri) {
1458                sources[j] = mPacketSources.valueFor(indexToType(j));
1459
1460                if (timeUs >= 0) {
1461                    sources[j]->clear();
1462                    startTimeUs = timeUs;
1463
1464                    sp<AnotherPacketSource> discontinuityQueue;
1465                    sp<AMessage> extra = new AMessage;
1466                    extra->setInt64("timeUs", timeUs);
1467                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1468                    discontinuityQueue->queueDiscontinuity(
1469                            ATSParser::DISCONTINUITY_TIME, extra, true);
1470                } else {
1471                    int32_t type;
1472                    int64_t srcSegmentStartTimeUs;
1473                    sp<AMessage> meta;
1474                    if (pickTrack) {
1475                        // selecting
1476                        meta = sources[j]->getLatestDequeuedMeta();
1477                    } else {
1478                        // adapting
1479                        meta = sources[j]->getLatestEnqueuedMeta();
1480                    }
1481
1482                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1483                        int64_t tmpUs;
1484                        CHECK(meta->findInt64("timeUs", &tmpUs));
1485                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1486                            startTimeUs = tmpUs;
1487                        }
1488
1489                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1490                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1491                            segmentStartTimeUs = tmpUs;
1492                        }
1493
1494                        int32_t seq;
1495                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1496                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1497                            discontinuitySeq = seq;
1498                        }
1499                    }
1500
1501                    if (pickTrack) {
1502                        // selecting track, queue discontinuities before content
1503                        sources[j]->clear();
1504                        if (j == kSubtitleIndex) {
1505                            break;
1506                        }
1507                        sp<AnotherPacketSource> discontinuityQueue;
1508                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1509                        discontinuityQueue->queueDiscontinuity(
1510                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1511                    } else {
1512                        // adapting, queue discontinuities after resume
1513                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1514                        sources[j]->clear();
1515                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1516                        if (extraStreams & indexToType(j)) {
1517                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1518                        }
1519                    }
1520                }
1521
1522                streamMask &= ~indexToType(j);
1523            }
1524        }
1525
1526        fetcher->startAsync(
1527                sources[kAudioIndex],
1528                sources[kVideoIndex],
1529                sources[kSubtitleIndex],
1530                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1531                segmentStartTimeUs,
1532                discontinuitySeq,
1533                switching);
1534    }
1535
1536    // All fetchers have now been started, the configuration change
1537    // has completed.
1538
1539    cancelCheckBandwidthEvent();
1540    scheduleCheckBandwidthEvent();
1541
1542    ALOGV("XXX configuration change completed.");
1543    mReconfigurationInProgress = false;
1544    if (switching) {
1545        mSwitchInProgress = true;
1546    } else {
1547        mStreamMask = mNewStreamMask;
1548    }
1549
1550    if (mDisconnectReplyID != 0) {
1551        finishDisconnect();
1552    }
1553}
1554
1555void LiveSession::onSwapped(const sp<AMessage> &msg) {
1556    int32_t switchGeneration;
1557    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1558    if (switchGeneration != mSwitchGeneration) {
1559        return;
1560    }
1561
1562    int32_t stream;
1563    CHECK(msg->findInt32("stream", &stream));
1564
1565    ssize_t idx = typeToIndex(stream);
1566    CHECK(idx >= 0);
1567    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1568        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1569    }
1570    mStreams[idx].mUri = mStreams[idx].mNewUri;
1571    mStreams[idx].mNewUri.clear();
1572
1573    mSwapMask &= ~stream;
1574    if (mSwapMask != 0) {
1575        return;
1576    }
1577
1578    // Check if new variant contains extra streams.
1579    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1580    while (extraStreams) {
1581        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1582        swapPacketSource(extraStream);
1583        extraStreams &= ~extraStream;
1584
1585        idx = typeToIndex(extraStream);
1586        CHECK(idx >= 0);
1587        if (mStreams[idx].mNewUri.empty()) {
1588            ALOGW("swapping extra stream type %d %s to empty stream",
1589                    extraStream, mStreams[idx].mUri.c_str());
1590        }
1591        mStreams[idx].mUri = mStreams[idx].mNewUri;
1592        mStreams[idx].mNewUri.clear();
1593    }
1594
1595    tryToFinishBandwidthSwitch();
1596}
1597
1598void LiveSession::onCheckSwitchDown() {
1599    if (mSwitchDownMonitor == NULL) {
1600        return;
1601    }
1602
1603    for (size_t i = 0; i < kMaxStreams; ++i) {
1604        int32_t targetDuration;
1605        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1606        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1607
1608        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1609            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1610            int64_t targetDurationUs = targetDuration * 1000000ll;
1611
1612            if (bufferedDurationUs < targetDurationUs / 3) {
1613                (new AMessage(kWhatSwitchDown, id()))->post();
1614                break;
1615            }
1616        }
1617    }
1618
1619    mSwitchDownMonitor->post(1000000ll);
1620}
1621
1622void LiveSession::onSwitchDown() {
1623    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1624        return;
1625    }
1626
1627    ssize_t bandwidthIndex = getBandwidthIndex();
1628    if (bandwidthIndex < mCurBandwidthIndex) {
1629        changeConfiguration(-1, bandwidthIndex, false);
1630        return;
1631    }
1632
1633    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1634}
1635
1636// Mark switch done when:
1637//   1. all old buffers are swapped out
1638void LiveSession::tryToFinishBandwidthSwitch() {
1639    if (!mSwitchInProgress) {
1640        return;
1641    }
1642
1643    bool needToRemoveFetchers = false;
1644    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1645        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1646            needToRemoveFetchers = true;
1647            break;
1648        }
1649    }
1650
1651    if (!needToRemoveFetchers && mSwapMask == 0) {
1652        ALOGI("mSwitchInProgress = false");
1653        mStreamMask = mNewStreamMask;
1654        mSwitchInProgress = false;
1655    }
1656}
1657
1658void LiveSession::scheduleCheckBandwidthEvent() {
1659    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1660    msg->setInt32("generation", mCheckBandwidthGeneration);
1661    msg->post(10000000ll);
1662}
1663
1664void LiveSession::cancelCheckBandwidthEvent() {
1665    ++mCheckBandwidthGeneration;
1666}
1667
1668void LiveSession::cancelBandwidthSwitch() {
1669    Mutex::Autolock lock(mSwapMutex);
1670    mSwitchGeneration++;
1671    mSwitchInProgress = false;
1672    mSwapMask = 0;
1673
1674    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1675        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1676        if (info.mToBeRemoved) {
1677            info.mToBeRemoved = false;
1678        }
1679    }
1680
1681    for (size_t i = 0; i < kMaxStreams; ++i) {
1682        if (!mStreams[i].mNewUri.empty()) {
1683            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1684            if (j < 0) {
1685                mStreams[i].mNewUri.clear();
1686                continue;
1687            }
1688
1689            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1690            info.mFetcher->stopAsync();
1691            mFetcherInfos.removeItemsAt(j);
1692            mStreams[i].mNewUri.clear();
1693        }
1694    }
1695}
1696
1697bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1698    if (mReconfigurationInProgress || mSwitchInProgress) {
1699        return false;
1700    }
1701
1702    if (mCurBandwidthIndex < 0) {
1703        return true;
1704    }
1705
1706    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1707        return false;
1708    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1709        return canSwitchUp();
1710    } else {
1711        return true;
1712    }
1713}
1714
1715void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1716    size_t bandwidthIndex = getBandwidthIndex();
1717    if (canSwitchBandwidthTo(bandwidthIndex)) {
1718        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1719    } else {
1720        // Come back and check again 10 seconds later in case there is nothing to do now.
1721        // If we DO change configuration, once that completes it'll schedule a new
1722        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1723        msg->post(10000000ll);
1724    }
1725}
1726
1727void LiveSession::postPrepared(status_t err) {
1728    CHECK(mInPreparationPhase);
1729
1730    sp<AMessage> notify = mNotify->dup();
1731    if (err == OK || err == ERROR_END_OF_STREAM) {
1732        notify->setInt32("what", kWhatPrepared);
1733    } else {
1734        notify->setInt32("what", kWhatPreparationFailed);
1735        notify->setInt32("err", err);
1736    }
1737
1738    notify->post();
1739
1740    mInPreparationPhase = false;
1741
1742    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1743    mSwitchDownMonitor->post();
1744}
1745
1746}  // namespace android
1747
1748