LiveSession.cpp revision f4a48dfa8570d6a4708a868b8b15d1236f7ca54b
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mBuffering[i] = false;
85    }
86}
87
88LiveSession::~LiveSession() {
89}
90
91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
92    ABuffer *discontinuity = new ABuffer(0);
93    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
94    discontinuity->meta()->setInt32("swapPacketSource", swap);
95    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
96    discontinuity->meta()->setInt64("timeUs", -1);
97    return discontinuity;
98}
99
100void LiveSession::swapPacketSource(StreamType stream) {
101    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
102    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
103    sp<AnotherPacketSource> tmp = aps;
104    aps = aps2;
105    aps2 = tmp;
106    aps2->clear();
107}
108
109status_t LiveSession::dequeueAccessUnit(
110        StreamType stream, sp<ABuffer> *accessUnit) {
111    if (!(mStreamMask & stream)) {
112        // return -EWOULDBLOCK to avoid halting the decoder
113        // when switching between audio/video and audio only.
114        return -EWOULDBLOCK;
115    }
116
117    status_t finalResult;
118    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
119    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
120        discontinuityQueue->dequeueAccessUnit(accessUnit);
121        // seeking, track switching
122        sp<AMessage> extra;
123        int64_t timeUs;
124        if ((*accessUnit)->meta()->findMessage("extra", &extra)
125                && extra != NULL
126                && extra->findInt64("timeUs", &timeUs)) {
127            // seeking only
128            mLastSeekTimeUs = timeUs;
129            mDiscontinuityOffsetTimesUs.clear();
130            mDiscontinuityAbsStartTimesUs.clear();
131        }
132        return INFO_DISCONTINUITY;
133    }
134
135    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
136
137    ssize_t idx = typeToIndex(stream);
138    if (!packetSource->hasBufferAvailable(&finalResult)) {
139        if (finalResult == OK) {
140            mBuffering[idx] = true;
141            return -EAGAIN;
142        } else {
143            return finalResult;
144        }
145    }
146
147    if (mBuffering[idx]) {
148        if (mSwitchInProgress
149                || packetSource->isFinished(0)
150                || packetSource->getEstimatedDurationUs() > 10000000ll) {
151            mBuffering[idx] = false;
152        }
153    }
154
155    if (mBuffering[idx]) {
156        return -EAGAIN;
157    }
158
159    // wait for counterpart
160    sp<AnotherPacketSource> otherSource;
161    if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) {
162        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
163    } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) {
164        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
165    }
166    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
167        return finalResult == OK ? -EAGAIN : finalResult;
168    }
169
170    status_t err = packetSource->dequeueAccessUnit(accessUnit);
171
172    size_t streamIdx;
173    const char *streamStr;
174    switch (stream) {
175        case STREAMTYPE_AUDIO:
176            streamIdx = kAudioIndex;
177            streamStr = "audio";
178            break;
179        case STREAMTYPE_VIDEO:
180            streamIdx = kVideoIndex;
181            streamStr = "video";
182            break;
183        case STREAMTYPE_SUBTITLES:
184            streamIdx = kSubtitleIndex;
185            streamStr = "subs";
186            break;
187        default:
188            TRESPASS();
189    }
190
191    StreamItem& strm = mStreams[streamIdx];
192    if (err == INFO_DISCONTINUITY) {
193        // adaptive streaming, discontinuities in the playlist
194        int32_t type;
195        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
196
197        sp<AMessage> extra;
198        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
199            extra.clear();
200        }
201
202        ALOGI("[%s] read discontinuity of type %d, extra = %s",
203              streamStr,
204              type,
205              extra == NULL ? "NULL" : extra->debugString().c_str());
206
207        int32_t swap;
208        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
209            int32_t switchGeneration;
210            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
211            {
212                Mutex::Autolock lock(mSwapMutex);
213                if (switchGeneration == mSwitchGeneration) {
214                    swapPacketSource(stream);
215                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
216                    msg->setInt32("stream", stream);
217                    msg->setInt32("switchGeneration", switchGeneration);
218                    msg->post();
219                }
220            }
221        } else {
222            size_t seq = strm.mCurDiscontinuitySeq;
223            int64_t offsetTimeUs;
224            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
225                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
226            } else {
227                offsetTimeUs = 0;
228            }
229
230            seq += 1;
231            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
232                int64_t firstTimeUs;
233                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
234                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
235                offsetTimeUs += strm.mLastSampleDurationUs;
236            } else {
237                offsetTimeUs += strm.mLastSampleDurationUs;
238            }
239
240            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
241        }
242    } else if (err == OK) {
243
244        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
245            int64_t timeUs;
246            int32_t discontinuitySeq = 0;
247            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
248            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
249            strm.mCurDiscontinuitySeq = discontinuitySeq;
250
251            int32_t discard = 0;
252            int64_t firstTimeUs;
253            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
254                int64_t durUs; // approximate sample duration
255                if (timeUs > strm.mLastDequeuedTimeUs) {
256                    durUs = timeUs - strm.mLastDequeuedTimeUs;
257                } else {
258                    durUs = strm.mLastDequeuedTimeUs - timeUs;
259                }
260                strm.mLastSampleDurationUs = durUs;
261                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
262            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
263                firstTimeUs = timeUs;
264            } else {
265                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
266                firstTimeUs = timeUs;
267            }
268
269            strm.mLastDequeuedTimeUs = timeUs;
270            if (timeUs >= firstTimeUs) {
271                timeUs -= firstTimeUs;
272            } else {
273                timeUs = 0;
274            }
275            timeUs += mLastSeekTimeUs;
276            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
277                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
278            }
279
280            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
281            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
282            mLastDequeuedTimeUs = timeUs;
283            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
284        } else if (stream == STREAMTYPE_SUBTITLES) {
285            (*accessUnit)->meta()->setInt32(
286                    "trackIndex", mPlaylist->getSelectedIndex());
287            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
288        }
289    } else {
290        ALOGI("[%s] encountered error %d", streamStr, err);
291    }
292
293    return err;
294}
295
296status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
297    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
298    if (!(mStreamMask & stream)) {
299        return UNKNOWN_ERROR;
300    }
301
302    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
303
304    sp<MetaData> meta = packetSource->getFormat();
305
306    if (meta == NULL) {
307        return -EAGAIN;
308    }
309
310    return convertMetaDataToMessage(meta, format);
311}
312
313void LiveSession::connectAsync(
314        const char *url, const KeyedVector<String8, String8> *headers) {
315    sp<AMessage> msg = new AMessage(kWhatConnect, id());
316    msg->setString("url", url);
317
318    if (headers != NULL) {
319        msg->setPointer(
320                "headers",
321                new KeyedVector<String8, String8>(*headers));
322    }
323
324    msg->post();
325}
326
327status_t LiveSession::disconnect() {
328    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
329
330    sp<AMessage> response;
331    status_t err = msg->postAndAwaitResponse(&response);
332
333    return err;
334}
335
336status_t LiveSession::seekTo(int64_t timeUs) {
337    sp<AMessage> msg = new AMessage(kWhatSeek, id());
338    msg->setInt64("timeUs", timeUs);
339
340    sp<AMessage> response;
341    status_t err = msg->postAndAwaitResponse(&response);
342
343    uint32_t replyID;
344    CHECK(response == mSeekReply && 0 != mSeekReplyID);
345    mSeekReply.clear();
346    mSeekReplyID = 0;
347    return err;
348}
349
350void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
351    switch (msg->what()) {
352        case kWhatConnect:
353        {
354            onConnect(msg);
355            break;
356        }
357
358        case kWhatDisconnect:
359        {
360            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
361
362            if (mReconfigurationInProgress) {
363                break;
364            }
365
366            finishDisconnect();
367            break;
368        }
369
370        case kWhatSeek:
371        {
372            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
373
374            status_t err = onSeek(msg);
375
376            mSeekReply = new AMessage;
377            mSeekReply->setInt32("err", err);
378            break;
379        }
380
381        case kWhatFetcherNotify:
382        {
383            int32_t what;
384            CHECK(msg->findInt32("what", &what));
385
386            switch (what) {
387                case PlaylistFetcher::kWhatStarted:
388                    break;
389                case PlaylistFetcher::kWhatPaused:
390                case PlaylistFetcher::kWhatStopped:
391                {
392                    if (what == PlaylistFetcher::kWhatStopped) {
393                        AString uri;
394                        CHECK(msg->findString("uri", &uri));
395                        if (mFetcherInfos.removeItem(uri) < 0) {
396                            // ignore duplicated kWhatStopped messages.
397                            break;
398                        }
399
400                        if (mSwitchInProgress) {
401                            tryToFinishBandwidthSwitch();
402                        }
403                    }
404
405                    if (mContinuation != NULL) {
406                        CHECK_GT(mContinuationCounter, 0);
407                        if (--mContinuationCounter == 0) {
408                            mContinuation->post();
409
410                            if (mSeekReplyID != 0) {
411                                CHECK(mSeekReply != NULL);
412                                mSeekReply->postReply(mSeekReplyID);
413                            }
414                        }
415                    }
416                    break;
417                }
418
419                case PlaylistFetcher::kWhatDurationUpdate:
420                {
421                    AString uri;
422                    CHECK(msg->findString("uri", &uri));
423
424                    int64_t durationUs;
425                    CHECK(msg->findInt64("durationUs", &durationUs));
426
427                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
428                    info->mDurationUs = durationUs;
429                    break;
430                }
431
432                case PlaylistFetcher::kWhatError:
433                {
434                    status_t err;
435                    CHECK(msg->findInt32("err", &err));
436
437                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
438
439                    if (mInPreparationPhase) {
440                        postPrepared(err);
441                    }
442
443                    cancelBandwidthSwitch();
444
445                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
446
447                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
448
449                    mPacketSources.valueFor(
450                            STREAMTYPE_SUBTITLES)->signalEOS(err);
451
452                    sp<AMessage> notify = mNotify->dup();
453                    notify->setInt32("what", kWhatError);
454                    notify->setInt32("err", err);
455                    notify->post();
456                    break;
457                }
458
459                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
460                {
461                    AString uri;
462                    CHECK(msg->findString("uri", &uri));
463
464                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
465                    info->mIsPrepared = true;
466
467                    if (mInPreparationPhase) {
468                        bool allFetchersPrepared = true;
469                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
470                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
471                                allFetchersPrepared = false;
472                                break;
473                            }
474                        }
475
476                        if (allFetchersPrepared) {
477                            postPrepared(OK);
478                        }
479                    }
480                    break;
481                }
482
483                case PlaylistFetcher::kWhatStartedAt:
484                {
485                    int32_t switchGeneration;
486                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
487
488                    if (switchGeneration != mSwitchGeneration) {
489                        break;
490                    }
491
492                    // Resume fetcher for the original variant; the resumed fetcher should
493                    // continue until the timestamps found in msg, which is stored by the
494                    // new fetcher to indicate where the new variant has started buffering.
495                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
496                        const FetcherInfo info = mFetcherInfos.valueAt(i);
497                        if (info.mToBeRemoved) {
498                            info.mFetcher->resumeUntilAsync(msg);
499                        }
500                    }
501                    break;
502                }
503
504                default:
505                    TRESPASS();
506            }
507
508            break;
509        }
510
511        case kWhatCheckBandwidth:
512        {
513            int32_t generation;
514            CHECK(msg->findInt32("generation", &generation));
515
516            if (generation != mCheckBandwidthGeneration) {
517                break;
518            }
519
520            onCheckBandwidth(msg);
521            break;
522        }
523
524        case kWhatChangeConfiguration:
525        {
526            onChangeConfiguration(msg);
527            break;
528        }
529
530        case kWhatChangeConfiguration2:
531        {
532            onChangeConfiguration2(msg);
533            break;
534        }
535
536        case kWhatChangeConfiguration3:
537        {
538            onChangeConfiguration3(msg);
539            break;
540        }
541
542        case kWhatFinishDisconnect2:
543        {
544            onFinishDisconnect2();
545            break;
546        }
547
548        case kWhatSwapped:
549        {
550            onSwapped(msg);
551            break;
552        }
553
554        case kWhatCheckSwitchDown:
555        {
556            onCheckSwitchDown();
557            break;
558        }
559
560        case kWhatSwitchDown:
561        {
562            onSwitchDown();
563            break;
564        }
565
566        default:
567            TRESPASS();
568            break;
569    }
570}
571
572// static
573int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
574    if (a->mBandwidth < b->mBandwidth) {
575        return -1;
576    } else if (a->mBandwidth == b->mBandwidth) {
577        return 0;
578    }
579
580    return 1;
581}
582
583// static
584LiveSession::StreamType LiveSession::indexToType(int idx) {
585    CHECK(idx >= 0 && idx < kMaxStreams);
586    return (StreamType)(1 << idx);
587}
588
589// static
590ssize_t LiveSession::typeToIndex(int32_t type) {
591    switch (type) {
592        case STREAMTYPE_AUDIO:
593            return 0;
594        case STREAMTYPE_VIDEO:
595            return 1;
596        case STREAMTYPE_SUBTITLES:
597            return 2;
598        default:
599            return -1;
600    };
601    return -1;
602}
603
604void LiveSession::onConnect(const sp<AMessage> &msg) {
605    AString url;
606    CHECK(msg->findString("url", &url));
607
608    KeyedVector<String8, String8> *headers = NULL;
609    if (!msg->findPointer("headers", (void **)&headers)) {
610        mExtraHeaders.clear();
611    } else {
612        mExtraHeaders = *headers;
613
614        delete headers;
615        headers = NULL;
616    }
617
618    // TODO currently we don't know if we are coming here from incognito mode
619    ALOGI("onConnect %s", uriDebugString(url).c_str());
620
621    mMasterURL = url;
622
623    bool dummy;
624    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
625
626    if (mPlaylist == NULL) {
627        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
628
629        postPrepared(ERROR_IO);
630        return;
631    }
632
633    // We trust the content provider to make a reasonable choice of preferred
634    // initial bandwidth by listing it first in the variant playlist.
635    // At startup we really don't have a good estimate on the available
636    // network bandwidth since we haven't tranferred any data yet. Once
637    // we have we can make a better informed choice.
638    size_t initialBandwidth = 0;
639    size_t initialBandwidthIndex = 0;
640
641    if (mPlaylist->isVariantPlaylist()) {
642        for (size_t i = 0; i < mPlaylist->size(); ++i) {
643            BandwidthItem item;
644
645            item.mPlaylistIndex = i;
646
647            sp<AMessage> meta;
648            AString uri;
649            mPlaylist->itemAt(i, &uri, &meta);
650
651            unsigned long bandwidth;
652            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
653
654            if (initialBandwidth == 0) {
655                initialBandwidth = item.mBandwidth;
656            }
657
658            mBandwidthItems.push(item);
659        }
660
661        CHECK_GT(mBandwidthItems.size(), 0u);
662
663        mBandwidthItems.sort(SortByBandwidth);
664
665        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
666            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
667                initialBandwidthIndex = i;
668                break;
669            }
670        }
671    } else {
672        // dummy item.
673        BandwidthItem item;
674        item.mPlaylistIndex = 0;
675        item.mBandwidth = 0;
676        mBandwidthItems.push(item);
677    }
678
679    mPlaylist->pickRandomMediaItems();
680    changeConfiguration(
681            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
682}
683
684void LiveSession::finishDisconnect() {
685    // No reconfiguration is currently pending, make sure none will trigger
686    // during disconnection either.
687    cancelCheckBandwidthEvent();
688
689    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
690    // (finishDisconnect, onFinishDisconnect2)
691    cancelBandwidthSwitch();
692
693    // cancel switch down monitor
694    mSwitchDownMonitor.clear();
695
696    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
697        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
698    }
699
700    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
701
702    mContinuationCounter = mFetcherInfos.size();
703    mContinuation = msg;
704
705    if (mContinuationCounter == 0) {
706        msg->post();
707    }
708}
709
710void LiveSession::onFinishDisconnect2() {
711    mContinuation.clear();
712
713    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
714    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
715
716    mPacketSources.valueFor(
717            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
718
719    sp<AMessage> response = new AMessage;
720    response->setInt32("err", OK);
721
722    response->postReply(mDisconnectReplyID);
723    mDisconnectReplyID = 0;
724}
725
726sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
727    ssize_t index = mFetcherInfos.indexOfKey(uri);
728
729    if (index >= 0) {
730        return NULL;
731    }
732
733    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
734    notify->setString("uri", uri);
735    notify->setInt32("switchGeneration", mSwitchGeneration);
736
737    FetcherInfo info;
738    info.mFetcher = new PlaylistFetcher(notify, this, uri);
739    info.mDurationUs = -1ll;
740    info.mIsPrepared = false;
741    info.mToBeRemoved = false;
742    looper()->registerHandler(info.mFetcher);
743
744    mFetcherInfos.add(uri, info);
745
746    return info.mFetcher;
747}
748
749/*
750 * Illustration of parameters:
751 *
752 * 0      `range_offset`
753 * +------------+-------------------------------------------------------+--+--+
754 * |            |                                 | next block to fetch |  |  |
755 * |            | `source` handle => `out` buffer |                     |  |  |
756 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
757 * |            |<----------- `range_length` / buffer capacity ----------->|  |
758 * |<------------------------------ file_size ------------------------------->|
759 *
760 * Special parameter values:
761 * - range_length == -1 means entire file
762 * - block_size == 0 means entire range
763 *
764 */
765ssize_t LiveSession::fetchFile(
766        const char *url, sp<ABuffer> *out,
767        int64_t range_offset, int64_t range_length,
768        uint32_t block_size, /* download block size */
769        sp<DataSource> *source, /* to return and reuse source */
770        String8 *actualUrl) {
771    off64_t size;
772    sp<DataSource> temp_source;
773    if (source == NULL) {
774        source = &temp_source;
775    }
776
777    if (*source == NULL) {
778        if (!strncasecmp(url, "file://", 7)) {
779            *source = new FileSource(url + 7);
780        } else if (strncasecmp(url, "http://", 7)
781                && strncasecmp(url, "https://", 8)) {
782            return ERROR_UNSUPPORTED;
783        } else {
784            KeyedVector<String8, String8> headers = mExtraHeaders;
785            if (range_offset > 0 || range_length >= 0) {
786                headers.add(
787                        String8("Range"),
788                        String8(
789                            StringPrintf(
790                                "bytes=%lld-%s",
791                                range_offset,
792                                range_length < 0
793                                    ? "" : StringPrintf("%lld",
794                                            range_offset + range_length - 1).c_str()).c_str()));
795            }
796            status_t err = mHTTPDataSource->connect(url, &headers);
797
798            if (err != OK) {
799                return err;
800            }
801
802            *source = mHTTPDataSource;
803        }
804    }
805
806    status_t getSizeErr = (*source)->getSize(&size);
807    if (getSizeErr != OK) {
808        size = 65536;
809    }
810
811    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
812    if (*out == NULL) {
813        buffer->setRange(0, 0);
814    }
815
816    ssize_t bytesRead = 0;
817    // adjust range_length if only reading partial block
818    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
819        range_length = buffer->size() + block_size;
820    }
821    for (;;) {
822        // Only resize when we don't know the size.
823        size_t bufferRemaining = buffer->capacity() - buffer->size();
824        if (bufferRemaining == 0 && getSizeErr != OK) {
825            bufferRemaining = 32768;
826
827            ALOGV("increasing download buffer to %zu bytes",
828                 buffer->size() + bufferRemaining);
829
830            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
831            memcpy(copy->data(), buffer->data(), buffer->size());
832            copy->setRange(0, buffer->size());
833
834            buffer = copy;
835        }
836
837        size_t maxBytesToRead = bufferRemaining;
838        if (range_length >= 0) {
839            int64_t bytesLeftInRange = range_length - buffer->size();
840            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
841                maxBytesToRead = bytesLeftInRange;
842
843                if (bytesLeftInRange == 0) {
844                    break;
845                }
846            }
847        }
848
849        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
850        // to help us break out of the loop.
851        ssize_t n = (*source)->readAt(
852                buffer->size(), buffer->data() + buffer->size(),
853                maxBytesToRead);
854
855        if (n < 0) {
856            return n;
857        }
858
859        if (n == 0) {
860            break;
861        }
862
863        buffer->setRange(0, buffer->size() + (size_t)n);
864        bytesRead += n;
865    }
866
867    *out = buffer;
868    if (actualUrl != NULL) {
869        *actualUrl = (*source)->getUri();
870        if (actualUrl->isEmpty()) {
871            *actualUrl = url;
872        }
873    }
874
875    return bytesRead;
876}
877
878sp<M3UParser> LiveSession::fetchPlaylist(
879        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
880    ALOGV("fetchPlaylist '%s'", url);
881
882    *unchanged = false;
883
884    sp<ABuffer> buffer;
885    String8 actualUrl;
886    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
887
888    if (err <= 0) {
889        return NULL;
890    }
891
892    // MD5 functionality is not available on the simulator, treat all
893    // playlists as changed.
894
895#if defined(HAVE_ANDROID_OS)
896    uint8_t hash[16];
897
898    MD5_CTX m;
899    MD5_Init(&m);
900    MD5_Update(&m, buffer->data(), buffer->size());
901
902    MD5_Final(hash, &m);
903
904    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
905        // playlist unchanged
906        *unchanged = true;
907
908        return NULL;
909    }
910
911    if (curPlaylistHash != NULL) {
912        memcpy(curPlaylistHash, hash, sizeof(hash));
913    }
914#endif
915
916    sp<M3UParser> playlist =
917        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
918
919    if (playlist->initCheck() != OK) {
920        ALOGE("failed to parse .m3u8 playlist");
921
922        return NULL;
923    }
924
925    return playlist;
926}
927
928static double uniformRand() {
929    return (double)rand() / RAND_MAX;
930}
931
932size_t LiveSession::getBandwidthIndex() {
933    if (mBandwidthItems.size() == 0) {
934        return 0;
935    }
936
937#if 1
938    char value[PROPERTY_VALUE_MAX];
939    ssize_t index = -1;
940    if (property_get("media.httplive.bw-index", value, NULL)) {
941        char *end;
942        index = strtol(value, &end, 10);
943        CHECK(end > value && *end == '\0');
944
945        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
946            index = mBandwidthItems.size() - 1;
947        }
948    }
949
950    if (index < 0) {
951        int32_t bandwidthBps;
952        if (mHTTPDataSource != NULL
953                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
954            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
955        } else {
956            ALOGV("no bandwidth estimate.");
957            return 0;  // Pick the lowest bandwidth stream by default.
958        }
959
960        char value[PROPERTY_VALUE_MAX];
961        if (property_get("media.httplive.max-bw", value, NULL)) {
962            char *end;
963            long maxBw = strtoul(value, &end, 10);
964            if (end > value && *end == '\0') {
965                if (maxBw > 0 && bandwidthBps > maxBw) {
966                    ALOGV("bandwidth capped to %ld bps", maxBw);
967                    bandwidthBps = maxBw;
968                }
969            }
970        }
971
972        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
973
974        index = mBandwidthItems.size() - 1;
975        while (index > 0) {
976            // consider only 80% of the available bandwidth, but if we are switching up,
977            // be even more conservative (70%) to avoid overestimating and immediately
978            // switching back.
979            size_t adjustedBandwidthBps = bandwidthBps;
980            if (index > mCurBandwidthIndex) {
981                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
982            } else {
983                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
984            }
985            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
986                break;
987            }
988            --index;
989        }
990    }
991#elif 0
992    // Change bandwidth at random()
993    size_t index = uniformRand() * mBandwidthItems.size();
994#elif 0
995    // There's a 50% chance to stay on the current bandwidth and
996    // a 50% chance to switch to the next higher bandwidth (wrapping around
997    // to lowest)
998    const size_t kMinIndex = 0;
999
1000    static ssize_t mCurBandwidthIndex = -1;
1001
1002    size_t index;
1003    if (mCurBandwidthIndex < 0) {
1004        index = kMinIndex;
1005    } else if (uniformRand() < 0.5) {
1006        index = (size_t)mCurBandwidthIndex;
1007    } else {
1008        index = mCurBandwidthIndex + 1;
1009        if (index == mBandwidthItems.size()) {
1010            index = kMinIndex;
1011        }
1012    }
1013    mCurBandwidthIndex = index;
1014#elif 0
1015    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1016
1017    size_t index = mBandwidthItems.size() - 1;
1018    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1019        --index;
1020    }
1021#elif 1
1022    char value[PROPERTY_VALUE_MAX];
1023    size_t index;
1024    if (property_get("media.httplive.bw-index", value, NULL)) {
1025        char *end;
1026        index = strtoul(value, &end, 10);
1027        CHECK(end > value && *end == '\0');
1028
1029        if (index >= mBandwidthItems.size()) {
1030            index = mBandwidthItems.size() - 1;
1031        }
1032    } else {
1033        index = 0;
1034    }
1035#else
1036    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1037#endif
1038
1039    CHECK_GE(index, 0);
1040
1041    return index;
1042}
1043
1044status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1045    int64_t timeUs;
1046    CHECK(msg->findInt64("timeUs", &timeUs));
1047
1048    if (!mReconfigurationInProgress) {
1049        changeConfiguration(timeUs, getBandwidthIndex());
1050    }
1051
1052    return OK;
1053}
1054
1055status_t LiveSession::getDuration(int64_t *durationUs) const {
1056    int64_t maxDurationUs = 0ll;
1057    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1058        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1059
1060        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1061            maxDurationUs = fetcherDurationUs;
1062        }
1063    }
1064
1065    *durationUs = maxDurationUs;
1066
1067    return OK;
1068}
1069
1070bool LiveSession::isSeekable() const {
1071    int64_t durationUs;
1072    return getDuration(&durationUs) == OK && durationUs >= 0;
1073}
1074
1075bool LiveSession::hasDynamicDuration() const {
1076    return false;
1077}
1078
1079size_t LiveSession::getTrackCount() const {
1080    if (mPlaylist == NULL) {
1081        return 0;
1082    } else {
1083        return mPlaylist->getTrackCount();
1084    }
1085}
1086
1087sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1088    if (mPlaylist == NULL) {
1089        return NULL;
1090    } else {
1091        return mPlaylist->getTrackInfo(trackIndex);
1092    }
1093}
1094
1095status_t LiveSession::selectTrack(size_t index, bool select) {
1096    status_t err = mPlaylist->selectTrack(index, select);
1097    if (err == OK) {
1098        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1099        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1100        msg->setInt32("pickTrack", select);
1101        msg->post();
1102    }
1103    return err;
1104}
1105
1106bool LiveSession::canSwitchUp() {
1107    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1108    status_t err = OK;
1109    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1110        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1111        int64_t dur = source->getBufferedDurationUs(&err);
1112        if (err == OK && dur > 10000000) {
1113            return true;
1114        }
1115    }
1116    return false;
1117}
1118
1119void LiveSession::changeConfiguration(
1120        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1121    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1122    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1123    cancelBandwidthSwitch();
1124
1125    CHECK(!mReconfigurationInProgress);
1126    mReconfigurationInProgress = true;
1127
1128    mCurBandwidthIndex = bandwidthIndex;
1129
1130    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1131          timeUs, bandwidthIndex, pickTrack);
1132
1133    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1134    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1135
1136    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1137    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1138
1139    AString URIs[kMaxStreams];
1140    for (size_t i = 0; i < kMaxStreams; ++i) {
1141        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1142            streamMask |= indexToType(i);
1143        }
1144    }
1145
1146    // Step 1, stop and discard fetchers that are no longer needed.
1147    // Pause those that we'll reuse.
1148    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1149        const AString &uri = mFetcherInfos.keyAt(i);
1150
1151        bool discardFetcher = true;
1152
1153        // If we're seeking all current fetchers are discarded.
1154        if (timeUs < 0ll) {
1155            // delay fetcher removal if not picking tracks
1156            discardFetcher = pickTrack;
1157
1158            for (size_t j = 0; j < kMaxStreams; ++j) {
1159                StreamType type = indexToType(j);
1160                if ((streamMask & type) && uri == URIs[j]) {
1161                    resumeMask |= type;
1162                    streamMask &= ~type;
1163                    discardFetcher = false;
1164                }
1165            }
1166        }
1167
1168        if (discardFetcher) {
1169            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1170        } else {
1171            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1172        }
1173    }
1174
1175    sp<AMessage> msg;
1176    if (timeUs < 0ll) {
1177        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1178        msg = new AMessage(kWhatChangeConfiguration3, id());
1179    } else {
1180        msg = new AMessage(kWhatChangeConfiguration2, id());
1181    }
1182    msg->setInt32("streamMask", streamMask);
1183    msg->setInt32("resumeMask", resumeMask);
1184    msg->setInt32("pickTrack", pickTrack);
1185    msg->setInt64("timeUs", timeUs);
1186    for (size_t i = 0; i < kMaxStreams; ++i) {
1187        if ((streamMask | resumeMask) & indexToType(i)) {
1188            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1189        }
1190    }
1191
1192    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1193    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1194    // fetchers have completed their asynchronous operation, we'll post
1195    // mContinuation, which then is handled below in onChangeConfiguration2.
1196    mContinuationCounter = mFetcherInfos.size();
1197    mContinuation = msg;
1198
1199    if (mContinuationCounter == 0) {
1200        msg->post();
1201
1202        if (mSeekReplyID != 0) {
1203            CHECK(mSeekReply != NULL);
1204            mSeekReply->postReply(mSeekReplyID);
1205        }
1206    }
1207}
1208
1209void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1210    if (!mReconfigurationInProgress) {
1211        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1212        msg->findInt32("pickTrack", &pickTrack);
1213        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1214        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1215    } else {
1216        msg->post(1000000ll); // retry in 1 sec
1217    }
1218}
1219
1220void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1221    mContinuation.clear();
1222
1223    // All fetchers are either suspended or have been removed now.
1224
1225    uint32_t streamMask, resumeMask;
1226    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1227    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1228
1229    // currently onChangeConfiguration2 is only called for seeking;
1230    // remove the following CHECK if using it else where.
1231    CHECK_EQ(resumeMask, 0);
1232    streamMask |= resumeMask;
1233
1234    AString URIs[kMaxStreams];
1235    for (size_t i = 0; i < kMaxStreams; ++i) {
1236        if (streamMask & indexToType(i)) {
1237            const AString &uriKey = mStreams[i].uriKey();
1238            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1239            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1240        }
1241    }
1242
1243    // Determine which decoders to shutdown on the player side,
1244    // a decoder has to be shutdown if either
1245    // 1) its streamtype was active before but now longer isn't.
1246    // or
1247    // 2) its streamtype was already active and still is but the URI
1248    //    has changed.
1249    uint32_t changedMask = 0;
1250    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1251        if (((mStreamMask & streamMask & indexToType(i))
1252                && !(URIs[i] == mStreams[i].mUri))
1253                || (mStreamMask & ~streamMask & indexToType(i))) {
1254            changedMask |= indexToType(i);
1255        }
1256    }
1257
1258    if (changedMask == 0) {
1259        // If nothing changed as far as the audio/video decoders
1260        // are concerned we can proceed.
1261        onChangeConfiguration3(msg);
1262        return;
1263    }
1264
1265    // Something changed, inform the player which will shutdown the
1266    // corresponding decoders and will post the reply once that's done.
1267    // Handling the reply will continue executing below in
1268    // onChangeConfiguration3.
1269    sp<AMessage> notify = mNotify->dup();
1270    notify->setInt32("what", kWhatStreamsChanged);
1271    notify->setInt32("changedMask", changedMask);
1272
1273    msg->setWhat(kWhatChangeConfiguration3);
1274    msg->setTarget(id());
1275
1276    notify->setMessage("reply", msg);
1277    notify->post();
1278}
1279
1280void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1281    mContinuation.clear();
1282    // All remaining fetchers are still suspended, the player has shutdown
1283    // any decoders that needed it.
1284
1285    uint32_t streamMask, resumeMask;
1286    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1287    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1288
1289    int64_t timeUs;
1290    int32_t pickTrack;
1291    bool switching = false;
1292    CHECK(msg->findInt64("timeUs", &timeUs));
1293    CHECK(msg->findInt32("pickTrack", &pickTrack));
1294
1295    if (timeUs < 0ll) {
1296        if (!pickTrack) {
1297            switching = true;
1298        }
1299        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1300    } else {
1301        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1302    }
1303
1304    for (size_t i = 0; i < kMaxStreams; ++i) {
1305        if (streamMask & indexToType(i)) {
1306            if (switching) {
1307                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1308            } else {
1309                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1310            }
1311        }
1312    }
1313
1314    mNewStreamMask = streamMask | resumeMask;
1315    if (switching) {
1316        mSwapMask = mStreamMask & ~resumeMask;
1317    }
1318
1319    // Of all existing fetchers:
1320    // * Resume fetchers that are still needed and assign them original packet sources.
1321    // * Mark otherwise unneeded fetchers for removal.
1322    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1323    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1324        const AString &uri = mFetcherInfos.keyAt(i);
1325
1326        sp<AnotherPacketSource> sources[kMaxStreams];
1327        for (size_t j = 0; j < kMaxStreams; ++j) {
1328            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1329                sources[j] = mPacketSources.valueFor(indexToType(j));
1330
1331                if (j != kSubtitleIndex) {
1332                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1333                    sp<AnotherPacketSource> discontinuityQueue;
1334                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1335                    discontinuityQueue->queueDiscontinuity(
1336                            ATSParser::DISCONTINUITY_NONE,
1337                            NULL,
1338                            true);
1339                }
1340            }
1341        }
1342
1343        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1344        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1345                || sources[kSubtitleIndex] != NULL) {
1346            info.mFetcher->startAsync(
1347                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1348        } else {
1349            info.mToBeRemoved = true;
1350        }
1351    }
1352
1353    // streamMask now only contains the types that need a new fetcher created.
1354
1355    if (streamMask != 0) {
1356        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1357    }
1358
1359    // Find out when the original fetchers have buffered up to and start the new fetchers
1360    // at a later timestamp.
1361    for (size_t i = 0; i < kMaxStreams; i++) {
1362        if (!(indexToType(i) & streamMask)) {
1363            continue;
1364        }
1365
1366        AString uri;
1367        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1368
1369        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1370        CHECK(fetcher != NULL);
1371
1372        int32_t latestSeq = -1;
1373        int64_t startTimeUs = -1;
1374        int64_t segmentStartTimeUs = -1ll;
1375        int32_t discontinuitySeq = -1;
1376        sp<AnotherPacketSource> sources[kMaxStreams];
1377
1378        // TRICKY: looping from i as earlier streams are already removed from streamMask
1379        for (size_t j = i; j < kMaxStreams; ++j) {
1380            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1381            if ((streamMask & indexToType(j)) && uri == streamUri) {
1382                sources[j] = mPacketSources.valueFor(indexToType(j));
1383
1384                if (timeUs >= 0) {
1385                    sources[j]->clear();
1386                    startTimeUs = timeUs;
1387
1388                    sp<AnotherPacketSource> discontinuityQueue;
1389                    sp<AMessage> extra = new AMessage;
1390                    extra->setInt64("timeUs", timeUs);
1391                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1392                    discontinuityQueue->queueDiscontinuity(
1393                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1394                } else {
1395                    int32_t type;
1396                    int64_t srcSegmentStartTimeUs;
1397                    sp<AMessage> meta;
1398                    if (pickTrack) {
1399                        // selecting
1400                        meta = sources[j]->getLatestDequeuedMeta();
1401                    } else {
1402                        // adapting
1403                        meta = sources[j]->getLatestEnqueuedMeta();
1404                    }
1405
1406                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1407                        int64_t tmpUs;
1408                        CHECK(meta->findInt64("timeUs", &tmpUs));
1409                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1410                            startTimeUs = tmpUs;
1411                        }
1412
1413                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1414                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1415                            segmentStartTimeUs = tmpUs;
1416                        }
1417
1418                        int32_t seq;
1419                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1420                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1421                            discontinuitySeq = seq;
1422                        }
1423                    }
1424
1425                    if (pickTrack) {
1426                        // selecting track, queue discontinuities before content
1427                        sources[j]->clear();
1428                        if (j == kSubtitleIndex) {
1429                            break;
1430                        }
1431                        sp<AnotherPacketSource> discontinuityQueue;
1432                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1433                        discontinuityQueue->queueDiscontinuity(
1434                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1435                    } else {
1436                        // adapting, queue discontinuities after resume
1437                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1438                        sources[j]->clear();
1439                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1440                        if (extraStreams & indexToType(j)) {
1441                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1442                        }
1443                    }
1444                }
1445
1446                streamMask &= ~indexToType(j);
1447            }
1448        }
1449
1450        fetcher->startAsync(
1451                sources[kAudioIndex],
1452                sources[kVideoIndex],
1453                sources[kSubtitleIndex],
1454                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1455                segmentStartTimeUs,
1456                discontinuitySeq,
1457                switching);
1458    }
1459
1460    // All fetchers have now been started, the configuration change
1461    // has completed.
1462
1463    cancelCheckBandwidthEvent();
1464    scheduleCheckBandwidthEvent();
1465
1466    ALOGV("XXX configuration change completed.");
1467    mReconfigurationInProgress = false;
1468    if (switching) {
1469        mSwitchInProgress = true;
1470    } else {
1471        mStreamMask = mNewStreamMask;
1472    }
1473
1474    if (mDisconnectReplyID != 0) {
1475        finishDisconnect();
1476    }
1477}
1478
1479void LiveSession::onSwapped(const sp<AMessage> &msg) {
1480    int32_t switchGeneration;
1481    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1482    if (switchGeneration != mSwitchGeneration) {
1483        return;
1484    }
1485
1486    int32_t stream;
1487    CHECK(msg->findInt32("stream", &stream));
1488
1489    ssize_t idx = typeToIndex(stream);
1490    CHECK(idx >= 0);
1491    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1492        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1493    }
1494    mStreams[idx].mUri = mStreams[idx].mNewUri;
1495    mStreams[idx].mNewUri.clear();
1496
1497    mSwapMask &= ~stream;
1498    if (mSwapMask != 0) {
1499        return;
1500    }
1501
1502    // Check if new variant contains extra streams.
1503    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1504    while (extraStreams) {
1505        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1506        swapPacketSource(extraStream);
1507        extraStreams &= ~extraStream;
1508
1509        idx = typeToIndex(extraStream);
1510        CHECK(idx >= 0);
1511        if (mStreams[idx].mNewUri.empty()) {
1512            ALOGW("swapping extra stream type %d %s to empty stream",
1513                    extraStream, mStreams[idx].mUri.c_str());
1514        }
1515        mStreams[idx].mUri = mStreams[idx].mNewUri;
1516        mStreams[idx].mNewUri.clear();
1517    }
1518
1519    tryToFinishBandwidthSwitch();
1520}
1521
1522void LiveSession::onCheckSwitchDown() {
1523    if (mSwitchDownMonitor == NULL) {
1524        return;
1525    }
1526
1527    for (size_t i = 0; i < kMaxStreams; ++i) {
1528        int32_t targetDuration;
1529        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1530        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1531
1532        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1533            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1534            int64_t targetDurationUs = targetDuration * 1000000ll;
1535
1536            if (bufferedDurationUs < targetDurationUs / 3) {
1537                (new AMessage(kWhatSwitchDown, id()))->post();
1538                break;
1539            }
1540        }
1541    }
1542
1543    mSwitchDownMonitor->post(1000000ll);
1544}
1545
1546void LiveSession::onSwitchDown() {
1547    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1548        return;
1549    }
1550
1551    ssize_t bandwidthIndex = getBandwidthIndex();
1552    if (bandwidthIndex < mCurBandwidthIndex) {
1553        changeConfiguration(-1, bandwidthIndex, false);
1554        return;
1555    }
1556
1557    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1558}
1559
1560// Mark switch done when:
1561//   1. all old buffers are swapped out
1562void LiveSession::tryToFinishBandwidthSwitch() {
1563    if (!mSwitchInProgress) {
1564        return;
1565    }
1566
1567    bool needToRemoveFetchers = false;
1568    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1569        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1570            needToRemoveFetchers = true;
1571            break;
1572        }
1573    }
1574
1575    if (!needToRemoveFetchers && mSwapMask == 0) {
1576        ALOGI("mSwitchInProgress = false");
1577        mStreamMask = mNewStreamMask;
1578        mSwitchInProgress = false;
1579    }
1580}
1581
1582void LiveSession::scheduleCheckBandwidthEvent() {
1583    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1584    msg->setInt32("generation", mCheckBandwidthGeneration);
1585    msg->post(10000000ll);
1586}
1587
1588void LiveSession::cancelCheckBandwidthEvent() {
1589    ++mCheckBandwidthGeneration;
1590}
1591
1592void LiveSession::cancelBandwidthSwitch() {
1593    Mutex::Autolock lock(mSwapMutex);
1594    mSwitchGeneration++;
1595    mSwitchInProgress = false;
1596    mSwapMask = 0;
1597
1598    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1599        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1600        if (info.mToBeRemoved) {
1601            info.mToBeRemoved = false;
1602        }
1603    }
1604
1605    for (size_t i = 0; i < kMaxStreams; ++i) {
1606        if (!mStreams[i].mNewUri.empty()) {
1607            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1608            if (j < 0) {
1609                mStreams[i].mNewUri.clear();
1610                continue;
1611            }
1612
1613            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1614            info.mFetcher->stopAsync();
1615            mFetcherInfos.removeItemsAt(j);
1616            mStreams[i].mNewUri.clear();
1617        }
1618    }
1619}
1620
1621bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1622    if (mReconfigurationInProgress || mSwitchInProgress) {
1623        return false;
1624    }
1625
1626    if (mCurBandwidthIndex < 0) {
1627        return true;
1628    }
1629
1630    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1631        return false;
1632    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1633        return canSwitchUp();
1634    } else {
1635        return true;
1636    }
1637}
1638
1639void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1640    size_t bandwidthIndex = getBandwidthIndex();
1641    if (canSwitchBandwidthTo(bandwidthIndex)) {
1642        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1643    } else {
1644        // Come back and check again 10 seconds later in case there is nothing to do now.
1645        // If we DO change configuration, once that completes it'll schedule a new
1646        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1647        msg->post(10000000ll);
1648    }
1649}
1650
1651void LiveSession::postPrepared(status_t err) {
1652    CHECK(mInPreparationPhase);
1653
1654    sp<AMessage> notify = mNotify->dup();
1655    if (err == OK || err == ERROR_END_OF_STREAM) {
1656        notify->setInt32("what", kWhatPrepared);
1657    } else {
1658        notify->setInt32("what", kWhatPreparationFailed);
1659        notify->setInt32("err", err);
1660    }
1661
1662    notify->post();
1663
1664    mInPreparationPhase = false;
1665
1666    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1667    mSwitchDownMonitor->post();
1668}
1669
1670}  // namespace android
1671
1672