LiveSession.cpp revision ec5206c99694d263ac099bf2c37f8119f43f74f1
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mSubtitleGeneration(0),
67      mLastDequeuedTimeUs(0ll),
68      mRealTimeBaseUs(0ll),
69      mReconfigurationInProgress(false),
70      mSwitchInProgress(false),
71      mDisconnectReplyID(0),
72      mSeekReplyID(0),
73      mFirstTimeUsValid(false),
74      mFirstTimeUs(0),
75      mLastSeekTimeUs(0) {
76
77    mStreams[kAudioIndex] = StreamItem("audio");
78    mStreams[kVideoIndex] = StreamItem("video");
79    mStreams[kSubtitleIndex] = StreamItem("subtitles");
80
81    for (size_t i = 0; i < kMaxStreams; ++i) {
82        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
85        mBuffering[i] = false;
86    }
87}
88
89LiveSession::~LiveSession() {
90}
91
92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
93    ABuffer *discontinuity = new ABuffer(0);
94    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
95    discontinuity->meta()->setInt32("swapPacketSource", swap);
96    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
97    discontinuity->meta()->setInt64("timeUs", -1);
98    return discontinuity;
99}
100
101void LiveSession::swapPacketSource(StreamType stream) {
102    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
103    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
104    sp<AnotherPacketSource> tmp = aps;
105    aps = aps2;
106    aps2 = tmp;
107    aps2->clear();
108}
109
110status_t LiveSession::dequeueAccessUnit(
111        StreamType stream, sp<ABuffer> *accessUnit) {
112    if (!(mStreamMask & stream)) {
113        // return -EWOULDBLOCK to avoid halting the decoder
114        // when switching between audio/video and audio only.
115        return -EWOULDBLOCK;
116    }
117
118    status_t finalResult;
119    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
120    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
121        discontinuityQueue->dequeueAccessUnit(accessUnit);
122        // seeking, track switching
123        sp<AMessage> extra;
124        int64_t timeUs;
125        if ((*accessUnit)->meta()->findMessage("extra", &extra)
126                && extra != NULL
127                && extra->findInt64("timeUs", &timeUs)) {
128            // seeking only
129            mLastSeekTimeUs = timeUs;
130            mDiscontinuityOffsetTimesUs.clear();
131            mDiscontinuityAbsStartTimesUs.clear();
132        }
133        return INFO_DISCONTINUITY;
134    }
135
136    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
137
138    ssize_t idx = typeToIndex(stream);
139    if (!packetSource->hasBufferAvailable(&finalResult)) {
140        if (finalResult == OK) {
141            mBuffering[idx] = true;
142            return -EAGAIN;
143        } else {
144            return finalResult;
145        }
146    }
147
148    if (mBuffering[idx]) {
149        if (mSwitchInProgress
150                || packetSource->isFinished(0)
151                || packetSource->getEstimatedDurationUs() > 10000000ll) {
152            mBuffering[idx] = false;
153        }
154    }
155
156    if (mBuffering[idx]) {
157        return -EAGAIN;
158    }
159
160    // wait for counterpart
161    sp<AnotherPacketSource> otherSource;
162    uint32_t mask = mNewStreamMask & mStreamMask;
163    uint32_t fetchersMask  = 0;
164    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
165        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
166        fetchersMask |= fetcherMask;
167    }
168    mask &= fetchersMask;
169    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
170        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
171    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
172        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
173    }
174    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
175        return finalResult == OK ? -EAGAIN : finalResult;
176    }
177
178    status_t err = packetSource->dequeueAccessUnit(accessUnit);
179
180    size_t streamIdx;
181    const char *streamStr;
182    switch (stream) {
183        case STREAMTYPE_AUDIO:
184            streamIdx = kAudioIndex;
185            streamStr = "audio";
186            break;
187        case STREAMTYPE_VIDEO:
188            streamIdx = kVideoIndex;
189            streamStr = "video";
190            break;
191        case STREAMTYPE_SUBTITLES:
192            streamIdx = kSubtitleIndex;
193            streamStr = "subs";
194            break;
195        default:
196            TRESPASS();
197    }
198
199    StreamItem& strm = mStreams[streamIdx];
200    if (err == INFO_DISCONTINUITY) {
201        // adaptive streaming, discontinuities in the playlist
202        int32_t type;
203        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
204
205        sp<AMessage> extra;
206        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
207            extra.clear();
208        }
209
210        ALOGI("[%s] read discontinuity of type %d, extra = %s",
211              streamStr,
212              type,
213              extra == NULL ? "NULL" : extra->debugString().c_str());
214
215        int32_t swap;
216        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
217            int32_t switchGeneration;
218            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
219            {
220                Mutex::Autolock lock(mSwapMutex);
221                if (switchGeneration == mSwitchGeneration) {
222                    swapPacketSource(stream);
223                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
224                    msg->setInt32("stream", stream);
225                    msg->setInt32("switchGeneration", switchGeneration);
226                    msg->post();
227                }
228            }
229        } else {
230            size_t seq = strm.mCurDiscontinuitySeq;
231            int64_t offsetTimeUs;
232            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
233                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
234            } else {
235                offsetTimeUs = 0;
236            }
237
238            seq += 1;
239            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
240                int64_t firstTimeUs;
241                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
242                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
243                offsetTimeUs += strm.mLastSampleDurationUs;
244            } else {
245                offsetTimeUs += strm.mLastSampleDurationUs;
246            }
247
248            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
249        }
250    } else if (err == OK) {
251
252        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
253            int64_t timeUs;
254            int32_t discontinuitySeq = 0;
255            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
256            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
257            strm.mCurDiscontinuitySeq = discontinuitySeq;
258
259            int32_t discard = 0;
260            int64_t firstTimeUs;
261            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
262                int64_t durUs; // approximate sample duration
263                if (timeUs > strm.mLastDequeuedTimeUs) {
264                    durUs = timeUs - strm.mLastDequeuedTimeUs;
265                } else {
266                    durUs = strm.mLastDequeuedTimeUs - timeUs;
267                }
268                strm.mLastSampleDurationUs = durUs;
269                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
270            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
271                firstTimeUs = timeUs;
272            } else {
273                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
274                firstTimeUs = timeUs;
275            }
276
277            strm.mLastDequeuedTimeUs = timeUs;
278            if (timeUs >= firstTimeUs) {
279                timeUs -= firstTimeUs;
280            } else {
281                timeUs = 0;
282            }
283            timeUs += mLastSeekTimeUs;
284            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
285                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
286            }
287
288            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
289            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
290            mLastDequeuedTimeUs = timeUs;
291            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
292        } else if (stream == STREAMTYPE_SUBTITLES) {
293            int32_t subtitleGeneration;
294            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
295                    && subtitleGeneration != mSubtitleGeneration) {
296               return -EAGAIN;
297            };
298            (*accessUnit)->meta()->setInt32(
299                    "trackIndex", mPlaylist->getSelectedIndex());
300            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
301        }
302    } else {
303        ALOGI("[%s] encountered error %d", streamStr, err);
304    }
305
306    return err;
307}
308
309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
310    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
311    if (!(mStreamMask & stream)) {
312        return UNKNOWN_ERROR;
313    }
314
315    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
316
317    sp<MetaData> meta = packetSource->getFormat();
318
319    if (meta == NULL) {
320        return -EAGAIN;
321    }
322
323    return convertMetaDataToMessage(meta, format);
324}
325
326void LiveSession::connectAsync(
327        const char *url, const KeyedVector<String8, String8> *headers) {
328    sp<AMessage> msg = new AMessage(kWhatConnect, id());
329    msg->setString("url", url);
330
331    if (headers != NULL) {
332        msg->setPointer(
333                "headers",
334                new KeyedVector<String8, String8>(*headers));
335    }
336
337    msg->post();
338}
339
340status_t LiveSession::disconnect() {
341    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
342
343    sp<AMessage> response;
344    status_t err = msg->postAndAwaitResponse(&response);
345
346    return err;
347}
348
349status_t LiveSession::seekTo(int64_t timeUs) {
350    sp<AMessage> msg = new AMessage(kWhatSeek, id());
351    msg->setInt64("timeUs", timeUs);
352
353    sp<AMessage> response;
354    status_t err = msg->postAndAwaitResponse(&response);
355
356    return err;
357}
358
359void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
360    switch (msg->what()) {
361        case kWhatConnect:
362        {
363            onConnect(msg);
364            break;
365        }
366
367        case kWhatDisconnect:
368        {
369            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
370
371            if (mReconfigurationInProgress) {
372                break;
373            }
374
375            finishDisconnect();
376            break;
377        }
378
379        case kWhatSeek:
380        {
381            uint32_t seekReplyID;
382            CHECK(msg->senderAwaitsResponse(&seekReplyID));
383            mSeekReplyID = seekReplyID;
384            mSeekReply = new AMessage;
385
386            status_t err = onSeek(msg);
387
388            if (err != OK) {
389                msg->post(50000);
390            }
391            break;
392        }
393
394        case kWhatFetcherNotify:
395        {
396            int32_t what;
397            CHECK(msg->findInt32("what", &what));
398
399            switch (what) {
400                case PlaylistFetcher::kWhatStarted:
401                    break;
402                case PlaylistFetcher::kWhatPaused:
403                case PlaylistFetcher::kWhatStopped:
404                {
405                    if (what == PlaylistFetcher::kWhatStopped) {
406                        AString uri;
407                        CHECK(msg->findString("uri", &uri));
408                        if (mFetcherInfos.removeItem(uri) < 0) {
409                            // ignore duplicated kWhatStopped messages.
410                            break;
411                        }
412
413                        if (mSwitchInProgress) {
414                            tryToFinishBandwidthSwitch();
415                        }
416                    }
417
418                    if (mContinuation != NULL) {
419                        CHECK_GT(mContinuationCounter, 0);
420                        if (--mContinuationCounter == 0) {
421                            mContinuation->post();
422
423                            if (mSeekReplyID != 0) {
424                                CHECK(mSeekReply != NULL);
425                                mSeekReply->setInt32("err", OK);
426                                mSeekReply->postReply(mSeekReplyID);
427                                mSeekReplyID = 0;
428                                mSeekReply.clear();
429                            }
430                        }
431                    }
432                    break;
433                }
434
435                case PlaylistFetcher::kWhatDurationUpdate:
436                {
437                    AString uri;
438                    CHECK(msg->findString("uri", &uri));
439
440                    int64_t durationUs;
441                    CHECK(msg->findInt64("durationUs", &durationUs));
442
443                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
444                    info->mDurationUs = durationUs;
445                    break;
446                }
447
448                case PlaylistFetcher::kWhatError:
449                {
450                    status_t err;
451                    CHECK(msg->findInt32("err", &err));
452
453                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
454
455                    // handle EOS on subtitle tracks independently
456                    AString uri;
457                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
458                        ssize_t i = mFetcherInfos.indexOfKey(uri);
459                        if (i >= 0) {
460                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
461                            if (fetcher != NULL) {
462                                uint32_t type = fetcher->getStreamTypeMask();
463                                if (type == STREAMTYPE_SUBTITLES) {
464                                    mPacketSources.valueFor(
465                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
466                                    break;
467                                }
468                            }
469                        }
470                    }
471
472                    if (mInPreparationPhase) {
473                        postPrepared(err);
474                    }
475
476                    cancelBandwidthSwitch();
477
478                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
479
480                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
481
482                    mPacketSources.valueFor(
483                            STREAMTYPE_SUBTITLES)->signalEOS(err);
484
485                    sp<AMessage> notify = mNotify->dup();
486                    notify->setInt32("what", kWhatError);
487                    notify->setInt32("err", err);
488                    notify->post();
489                    break;
490                }
491
492                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
493                {
494                    AString uri;
495                    CHECK(msg->findString("uri", &uri));
496
497                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
498                    info->mIsPrepared = true;
499
500                    if (mInPreparationPhase) {
501                        bool allFetchersPrepared = true;
502                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
503                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
504                                allFetchersPrepared = false;
505                                break;
506                            }
507                        }
508
509                        if (allFetchersPrepared) {
510                            postPrepared(OK);
511                        }
512                    }
513                    break;
514                }
515
516                case PlaylistFetcher::kWhatStartedAt:
517                {
518                    int32_t switchGeneration;
519                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
520
521                    if (switchGeneration != mSwitchGeneration) {
522                        break;
523                    }
524
525                    // Resume fetcher for the original variant; the resumed fetcher should
526                    // continue until the timestamps found in msg, which is stored by the
527                    // new fetcher to indicate where the new variant has started buffering.
528                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
529                        const FetcherInfo info = mFetcherInfos.valueAt(i);
530                        if (info.mToBeRemoved) {
531                            info.mFetcher->resumeUntilAsync(msg);
532                        }
533                    }
534                    break;
535                }
536
537                default:
538                    TRESPASS();
539            }
540
541            break;
542        }
543
544        case kWhatCheckBandwidth:
545        {
546            int32_t generation;
547            CHECK(msg->findInt32("generation", &generation));
548
549            if (generation != mCheckBandwidthGeneration) {
550                break;
551            }
552
553            onCheckBandwidth(msg);
554            break;
555        }
556
557        case kWhatChangeConfiguration:
558        {
559            onChangeConfiguration(msg);
560            break;
561        }
562
563        case kWhatChangeConfiguration2:
564        {
565            onChangeConfiguration2(msg);
566            break;
567        }
568
569        case kWhatChangeConfiguration3:
570        {
571            onChangeConfiguration3(msg);
572            break;
573        }
574
575        case kWhatFinishDisconnect2:
576        {
577            onFinishDisconnect2();
578            break;
579        }
580
581        case kWhatSwapped:
582        {
583            onSwapped(msg);
584            break;
585        }
586
587        case kWhatCheckSwitchDown:
588        {
589            onCheckSwitchDown();
590            break;
591        }
592
593        case kWhatSwitchDown:
594        {
595            onSwitchDown();
596            break;
597        }
598
599        default:
600            TRESPASS();
601            break;
602    }
603}
604
605// static
606int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
607    if (a->mBandwidth < b->mBandwidth) {
608        return -1;
609    } else if (a->mBandwidth == b->mBandwidth) {
610        return 0;
611    }
612
613    return 1;
614}
615
616// static
617LiveSession::StreamType LiveSession::indexToType(int idx) {
618    CHECK(idx >= 0 && idx < kMaxStreams);
619    return (StreamType)(1 << idx);
620}
621
622// static
623ssize_t LiveSession::typeToIndex(int32_t type) {
624    switch (type) {
625        case STREAMTYPE_AUDIO:
626            return 0;
627        case STREAMTYPE_VIDEO:
628            return 1;
629        case STREAMTYPE_SUBTITLES:
630            return 2;
631        default:
632            return -1;
633    };
634    return -1;
635}
636
637void LiveSession::onConnect(const sp<AMessage> &msg) {
638    AString url;
639    CHECK(msg->findString("url", &url));
640
641    KeyedVector<String8, String8> *headers = NULL;
642    if (!msg->findPointer("headers", (void **)&headers)) {
643        mExtraHeaders.clear();
644    } else {
645        mExtraHeaders = *headers;
646
647        delete headers;
648        headers = NULL;
649    }
650
651    // TODO currently we don't know if we are coming here from incognito mode
652    ALOGI("onConnect %s", uriDebugString(url).c_str());
653
654    mMasterURL = url;
655
656    bool dummy;
657    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
658
659    if (mPlaylist == NULL) {
660        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
661
662        postPrepared(ERROR_IO);
663        return;
664    }
665
666    // We trust the content provider to make a reasonable choice of preferred
667    // initial bandwidth by listing it first in the variant playlist.
668    // At startup we really don't have a good estimate on the available
669    // network bandwidth since we haven't tranferred any data yet. Once
670    // we have we can make a better informed choice.
671    size_t initialBandwidth = 0;
672    size_t initialBandwidthIndex = 0;
673
674    if (mPlaylist->isVariantPlaylist()) {
675        for (size_t i = 0; i < mPlaylist->size(); ++i) {
676            BandwidthItem item;
677
678            item.mPlaylistIndex = i;
679
680            sp<AMessage> meta;
681            AString uri;
682            mPlaylist->itemAt(i, &uri, &meta);
683
684            unsigned long bandwidth;
685            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
686
687            if (initialBandwidth == 0) {
688                initialBandwidth = item.mBandwidth;
689            }
690
691            mBandwidthItems.push(item);
692        }
693
694        CHECK_GT(mBandwidthItems.size(), 0u);
695
696        mBandwidthItems.sort(SortByBandwidth);
697
698        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
699            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
700                initialBandwidthIndex = i;
701                break;
702            }
703        }
704    } else {
705        // dummy item.
706        BandwidthItem item;
707        item.mPlaylistIndex = 0;
708        item.mBandwidth = 0;
709        mBandwidthItems.push(item);
710    }
711
712    mPlaylist->pickRandomMediaItems();
713    changeConfiguration(
714            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
715}
716
717void LiveSession::finishDisconnect() {
718    // No reconfiguration is currently pending, make sure none will trigger
719    // during disconnection either.
720    cancelCheckBandwidthEvent();
721
722    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
723    // (finishDisconnect, onFinishDisconnect2)
724    cancelBandwidthSwitch();
725
726    // cancel switch down monitor
727    mSwitchDownMonitor.clear();
728
729    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
730        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
731    }
732
733    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
734
735    mContinuationCounter = mFetcherInfos.size();
736    mContinuation = msg;
737
738    if (mContinuationCounter == 0) {
739        msg->post();
740    }
741}
742
743void LiveSession::onFinishDisconnect2() {
744    mContinuation.clear();
745
746    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
747    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
748
749    mPacketSources.valueFor(
750            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
751
752    sp<AMessage> response = new AMessage;
753    response->setInt32("err", OK);
754
755    response->postReply(mDisconnectReplyID);
756    mDisconnectReplyID = 0;
757}
758
759sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
760    ssize_t index = mFetcherInfos.indexOfKey(uri);
761
762    if (index >= 0) {
763        return NULL;
764    }
765
766    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
767    notify->setString("uri", uri);
768    notify->setInt32("switchGeneration", mSwitchGeneration);
769
770    FetcherInfo info;
771    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
772    info.mDurationUs = -1ll;
773    info.mIsPrepared = false;
774    info.mToBeRemoved = false;
775    looper()->registerHandler(info.mFetcher);
776
777    mFetcherInfos.add(uri, info);
778
779    return info.mFetcher;
780}
781
782/*
783 * Illustration of parameters:
784 *
785 * 0      `range_offset`
786 * +------------+-------------------------------------------------------+--+--+
787 * |            |                                 | next block to fetch |  |  |
788 * |            | `source` handle => `out` buffer |                     |  |  |
789 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
790 * |            |<----------- `range_length` / buffer capacity ----------->|  |
791 * |<------------------------------ file_size ------------------------------->|
792 *
793 * Special parameter values:
794 * - range_length == -1 means entire file
795 * - block_size == 0 means entire range
796 *
797 */
798ssize_t LiveSession::fetchFile(
799        const char *url, sp<ABuffer> *out,
800        int64_t range_offset, int64_t range_length,
801        uint32_t block_size, /* download block size */
802        sp<DataSource> *source, /* to return and reuse source */
803        String8 *actualUrl) {
804    off64_t size;
805    sp<DataSource> temp_source;
806    if (source == NULL) {
807        source = &temp_source;
808    }
809
810    if (*source == NULL) {
811        if (!strncasecmp(url, "file://", 7)) {
812            *source = new FileSource(url + 7);
813        } else if (strncasecmp(url, "http://", 7)
814                && strncasecmp(url, "https://", 8)) {
815            return ERROR_UNSUPPORTED;
816        } else {
817            KeyedVector<String8, String8> headers = mExtraHeaders;
818            if (range_offset > 0 || range_length >= 0) {
819                headers.add(
820                        String8("Range"),
821                        String8(
822                            StringPrintf(
823                                "bytes=%lld-%s",
824                                range_offset,
825                                range_length < 0
826                                    ? "" : StringPrintf("%lld",
827                                            range_offset + range_length - 1).c_str()).c_str()));
828            }
829            status_t err = mHTTPDataSource->connect(url, &headers);
830
831            if (err != OK) {
832                return err;
833            }
834
835            *source = mHTTPDataSource;
836        }
837    }
838
839    status_t getSizeErr = (*source)->getSize(&size);
840    if (getSizeErr != OK) {
841        size = 65536;
842    }
843
844    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
845    if (*out == NULL) {
846        buffer->setRange(0, 0);
847    }
848
849    ssize_t bytesRead = 0;
850    // adjust range_length if only reading partial block
851    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
852        range_length = buffer->size() + block_size;
853    }
854    for (;;) {
855        // Only resize when we don't know the size.
856        size_t bufferRemaining = buffer->capacity() - buffer->size();
857        if (bufferRemaining == 0 && getSizeErr != OK) {
858            bufferRemaining = 32768;
859
860            ALOGV("increasing download buffer to %zu bytes",
861                 buffer->size() + bufferRemaining);
862
863            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
864            memcpy(copy->data(), buffer->data(), buffer->size());
865            copy->setRange(0, buffer->size());
866
867            buffer = copy;
868        }
869
870        size_t maxBytesToRead = bufferRemaining;
871        if (range_length >= 0) {
872            int64_t bytesLeftInRange = range_length - buffer->size();
873            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
874                maxBytesToRead = bytesLeftInRange;
875
876                if (bytesLeftInRange == 0) {
877                    break;
878                }
879            }
880        }
881
882        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
883        // to help us break out of the loop.
884        ssize_t n = (*source)->readAt(
885                buffer->size(), buffer->data() + buffer->size(),
886                maxBytesToRead);
887
888        if (n < 0) {
889            return n;
890        }
891
892        if (n == 0) {
893            break;
894        }
895
896        buffer->setRange(0, buffer->size() + (size_t)n);
897        bytesRead += n;
898    }
899
900    *out = buffer;
901    if (actualUrl != NULL) {
902        *actualUrl = (*source)->getUri();
903        if (actualUrl->isEmpty()) {
904            *actualUrl = url;
905        }
906    }
907
908    return bytesRead;
909}
910
911sp<M3UParser> LiveSession::fetchPlaylist(
912        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
913    ALOGV("fetchPlaylist '%s'", url);
914
915    *unchanged = false;
916
917    sp<ABuffer> buffer;
918    String8 actualUrl;
919    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
920
921    if (err <= 0) {
922        return NULL;
923    }
924
925    // MD5 functionality is not available on the simulator, treat all
926    // playlists as changed.
927
928#if defined(HAVE_ANDROID_OS)
929    uint8_t hash[16];
930
931    MD5_CTX m;
932    MD5_Init(&m);
933    MD5_Update(&m, buffer->data(), buffer->size());
934
935    MD5_Final(hash, &m);
936
937    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
938        // playlist unchanged
939        *unchanged = true;
940
941        return NULL;
942    }
943
944    if (curPlaylistHash != NULL) {
945        memcpy(curPlaylistHash, hash, sizeof(hash));
946    }
947#endif
948
949    sp<M3UParser> playlist =
950        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
951
952    if (playlist->initCheck() != OK) {
953        ALOGE("failed to parse .m3u8 playlist");
954
955        return NULL;
956    }
957
958    return playlist;
959}
960
961static double uniformRand() {
962    return (double)rand() / RAND_MAX;
963}
964
965size_t LiveSession::getBandwidthIndex() {
966    if (mBandwidthItems.size() == 0) {
967        return 0;
968    }
969
970#if 1
971    char value[PROPERTY_VALUE_MAX];
972    ssize_t index = -1;
973    if (property_get("media.httplive.bw-index", value, NULL)) {
974        char *end;
975        index = strtol(value, &end, 10);
976        CHECK(end > value && *end == '\0');
977
978        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
979            index = mBandwidthItems.size() - 1;
980        }
981    }
982
983    if (index < 0) {
984        int32_t bandwidthBps;
985        if (mHTTPDataSource != NULL
986                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
987            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
988        } else {
989            ALOGV("no bandwidth estimate.");
990            return 0;  // Pick the lowest bandwidth stream by default.
991        }
992
993        char value[PROPERTY_VALUE_MAX];
994        if (property_get("media.httplive.max-bw", value, NULL)) {
995            char *end;
996            long maxBw = strtoul(value, &end, 10);
997            if (end > value && *end == '\0') {
998                if (maxBw > 0 && bandwidthBps > maxBw) {
999                    ALOGV("bandwidth capped to %ld bps", maxBw);
1000                    bandwidthBps = maxBw;
1001                }
1002            }
1003        }
1004
1005        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1006
1007        index = mBandwidthItems.size() - 1;
1008        while (index > 0) {
1009            // consider only 80% of the available bandwidth, but if we are switching up,
1010            // be even more conservative (70%) to avoid overestimating and immediately
1011            // switching back.
1012            size_t adjustedBandwidthBps = bandwidthBps;
1013            if (index > mCurBandwidthIndex) {
1014                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1015            } else {
1016                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1017            }
1018            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1019                break;
1020            }
1021            --index;
1022        }
1023    }
1024#elif 0
1025    // Change bandwidth at random()
1026    size_t index = uniformRand() * mBandwidthItems.size();
1027#elif 0
1028    // There's a 50% chance to stay on the current bandwidth and
1029    // a 50% chance to switch to the next higher bandwidth (wrapping around
1030    // to lowest)
1031    const size_t kMinIndex = 0;
1032
1033    static ssize_t mCurBandwidthIndex = -1;
1034
1035    size_t index;
1036    if (mCurBandwidthIndex < 0) {
1037        index = kMinIndex;
1038    } else if (uniformRand() < 0.5) {
1039        index = (size_t)mCurBandwidthIndex;
1040    } else {
1041        index = mCurBandwidthIndex + 1;
1042        if (index == mBandwidthItems.size()) {
1043            index = kMinIndex;
1044        }
1045    }
1046    mCurBandwidthIndex = index;
1047#elif 0
1048    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1049
1050    size_t index = mBandwidthItems.size() - 1;
1051    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1052        --index;
1053    }
1054#elif 1
1055    char value[PROPERTY_VALUE_MAX];
1056    size_t index;
1057    if (property_get("media.httplive.bw-index", value, NULL)) {
1058        char *end;
1059        index = strtoul(value, &end, 10);
1060        CHECK(end > value && *end == '\0');
1061
1062        if (index >= mBandwidthItems.size()) {
1063            index = mBandwidthItems.size() - 1;
1064        }
1065    } else {
1066        index = 0;
1067    }
1068#else
1069    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1070#endif
1071
1072    CHECK_GE(index, 0);
1073
1074    return index;
1075}
1076
1077int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1078    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1079    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1080    if (audioMeta != NULL) {
1081        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1082    }
1083
1084    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1085    if (videoMeta != NULL
1086            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1087        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1088            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1089        }
1090
1091    }
1092    return minSegmentStartTimeUs;
1093}
1094
1095status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1096    int64_t timeUs;
1097    CHECK(msg->findInt64("timeUs", &timeUs));
1098
1099    if (!mReconfigurationInProgress) {
1100        changeConfiguration(timeUs, mCurBandwidthIndex);
1101        return OK;
1102    } else {
1103        return -EWOULDBLOCK;
1104    }
1105}
1106
1107status_t LiveSession::getDuration(int64_t *durationUs) const {
1108    int64_t maxDurationUs = 0ll;
1109    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1110        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1111
1112        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1113            maxDurationUs = fetcherDurationUs;
1114        }
1115    }
1116
1117    *durationUs = maxDurationUs;
1118
1119    return OK;
1120}
1121
1122bool LiveSession::isSeekable() const {
1123    int64_t durationUs;
1124    return getDuration(&durationUs) == OK && durationUs >= 0;
1125}
1126
1127bool LiveSession::hasDynamicDuration() const {
1128    return false;
1129}
1130
1131size_t LiveSession::getTrackCount() const {
1132    if (mPlaylist == NULL) {
1133        return 0;
1134    } else {
1135        return mPlaylist->getTrackCount();
1136    }
1137}
1138
1139sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1140    if (mPlaylist == NULL) {
1141        return NULL;
1142    } else {
1143        return mPlaylist->getTrackInfo(trackIndex);
1144    }
1145}
1146
1147status_t LiveSession::selectTrack(size_t index, bool select) {
1148    if (mPlaylist == NULL) {
1149        return INVALID_OPERATION;
1150    }
1151
1152    ++mSubtitleGeneration;
1153    status_t err = mPlaylist->selectTrack(index, select);
1154    if (err == OK) {
1155        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1156        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1157        msg->setInt32("pickTrack", select);
1158        msg->post();
1159    }
1160    return err;
1161}
1162
1163bool LiveSession::canSwitchUp() {
1164    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1165    status_t err = OK;
1166    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1167        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1168        int64_t dur = source->getBufferedDurationUs(&err);
1169        if (err == OK && dur > 10000000) {
1170            return true;
1171        }
1172    }
1173    return false;
1174}
1175
1176void LiveSession::changeConfiguration(
1177        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1178    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1179    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1180    cancelBandwidthSwitch();
1181
1182    CHECK(!mReconfigurationInProgress);
1183    mReconfigurationInProgress = true;
1184
1185    mCurBandwidthIndex = bandwidthIndex;
1186
1187    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1188          timeUs, bandwidthIndex, pickTrack);
1189
1190    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1191    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1192
1193    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1194    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1195
1196    AString URIs[kMaxStreams];
1197    for (size_t i = 0; i < kMaxStreams; ++i) {
1198        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1199            streamMask |= indexToType(i);
1200        }
1201    }
1202
1203    // Step 1, stop and discard fetchers that are no longer needed.
1204    // Pause those that we'll reuse.
1205    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1206        const AString &uri = mFetcherInfos.keyAt(i);
1207
1208        bool discardFetcher = true;
1209
1210        // If we're seeking all current fetchers are discarded.
1211        if (timeUs < 0ll) {
1212            // delay fetcher removal if not picking tracks
1213            discardFetcher = pickTrack;
1214
1215            for (size_t j = 0; j < kMaxStreams; ++j) {
1216                StreamType type = indexToType(j);
1217                if ((streamMask & type) && uri == URIs[j]) {
1218                    resumeMask |= type;
1219                    streamMask &= ~type;
1220                    discardFetcher = false;
1221                }
1222            }
1223        }
1224
1225        if (discardFetcher) {
1226            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1227        } else {
1228            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1229        }
1230    }
1231
1232    sp<AMessage> msg;
1233    if (timeUs < 0ll) {
1234        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1235        msg = new AMessage(kWhatChangeConfiguration3, id());
1236    } else {
1237        msg = new AMessage(kWhatChangeConfiguration2, id());
1238    }
1239    msg->setInt32("streamMask", streamMask);
1240    msg->setInt32("resumeMask", resumeMask);
1241    msg->setInt32("pickTrack", pickTrack);
1242    msg->setInt64("timeUs", timeUs);
1243    for (size_t i = 0; i < kMaxStreams; ++i) {
1244        if ((streamMask | resumeMask) & indexToType(i)) {
1245            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1246        }
1247    }
1248
1249    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1250    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1251    // fetchers have completed their asynchronous operation, we'll post
1252    // mContinuation, which then is handled below in onChangeConfiguration2.
1253    mContinuationCounter = mFetcherInfos.size();
1254    mContinuation = msg;
1255
1256    if (mContinuationCounter == 0) {
1257        msg->post();
1258
1259        if (mSeekReplyID != 0) {
1260            CHECK(mSeekReply != NULL);
1261            mSeekReply->setInt32("err", OK);
1262            mSeekReply->postReply(mSeekReplyID);
1263            mSeekReplyID = 0;
1264            mSeekReply.clear();
1265        }
1266    }
1267}
1268
1269void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1270    if (!mReconfigurationInProgress) {
1271        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1272        msg->findInt32("pickTrack", &pickTrack);
1273        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1274        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1275    } else {
1276        msg->post(1000000ll); // retry in 1 sec
1277    }
1278}
1279
1280void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1281    mContinuation.clear();
1282
1283    // All fetchers are either suspended or have been removed now.
1284
1285    uint32_t streamMask, resumeMask;
1286    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1287    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1288
1289    // currently onChangeConfiguration2 is only called for seeking;
1290    // remove the following CHECK if using it else where.
1291    CHECK_EQ(resumeMask, 0);
1292    streamMask |= resumeMask;
1293
1294    AString URIs[kMaxStreams];
1295    for (size_t i = 0; i < kMaxStreams; ++i) {
1296        if (streamMask & indexToType(i)) {
1297            const AString &uriKey = mStreams[i].uriKey();
1298            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1299            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1300        }
1301    }
1302
1303    // Determine which decoders to shutdown on the player side,
1304    // a decoder has to be shutdown if either
1305    // 1) its streamtype was active before but now longer isn't.
1306    // or
1307    // 2) its streamtype was already active and still is but the URI
1308    //    has changed.
1309    uint32_t changedMask = 0;
1310    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1311        if (((mStreamMask & streamMask & indexToType(i))
1312                && !(URIs[i] == mStreams[i].mUri))
1313                || (mStreamMask & ~streamMask & indexToType(i))) {
1314            changedMask |= indexToType(i);
1315        }
1316    }
1317
1318    if (changedMask == 0) {
1319        // If nothing changed as far as the audio/video decoders
1320        // are concerned we can proceed.
1321        onChangeConfiguration3(msg);
1322        return;
1323    }
1324
1325    // Something changed, inform the player which will shutdown the
1326    // corresponding decoders and will post the reply once that's done.
1327    // Handling the reply will continue executing below in
1328    // onChangeConfiguration3.
1329    sp<AMessage> notify = mNotify->dup();
1330    notify->setInt32("what", kWhatStreamsChanged);
1331    notify->setInt32("changedMask", changedMask);
1332
1333    msg->setWhat(kWhatChangeConfiguration3);
1334    msg->setTarget(id());
1335
1336    notify->setMessage("reply", msg);
1337    notify->post();
1338}
1339
1340void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1341    mContinuation.clear();
1342    // All remaining fetchers are still suspended, the player has shutdown
1343    // any decoders that needed it.
1344
1345    uint32_t streamMask, resumeMask;
1346    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1347    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1348
1349    int64_t timeUs;
1350    int32_t pickTrack;
1351    bool switching = false;
1352    CHECK(msg->findInt64("timeUs", &timeUs));
1353    CHECK(msg->findInt32("pickTrack", &pickTrack));
1354
1355    if (timeUs < 0ll) {
1356        if (!pickTrack) {
1357            switching = true;
1358        }
1359        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1360    } else {
1361        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1362    }
1363
1364    for (size_t i = 0; i < kMaxStreams; ++i) {
1365        if (streamMask & indexToType(i)) {
1366            if (switching) {
1367                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1368            } else {
1369                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1370            }
1371        }
1372    }
1373
1374    mNewStreamMask = streamMask | resumeMask;
1375    if (switching) {
1376        mSwapMask = mStreamMask & ~resumeMask;
1377    }
1378
1379    // Of all existing fetchers:
1380    // * Resume fetchers that are still needed and assign them original packet sources.
1381    // * Mark otherwise unneeded fetchers for removal.
1382    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1383    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1384        const AString &uri = mFetcherInfos.keyAt(i);
1385
1386        sp<AnotherPacketSource> sources[kMaxStreams];
1387        for (size_t j = 0; j < kMaxStreams; ++j) {
1388            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1389                sources[j] = mPacketSources.valueFor(indexToType(j));
1390
1391                if (j != kSubtitleIndex) {
1392                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1393                    sp<AnotherPacketSource> discontinuityQueue;
1394                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1395                    discontinuityQueue->queueDiscontinuity(
1396                            ATSParser::DISCONTINUITY_NONE,
1397                            NULL,
1398                            true);
1399                }
1400            }
1401        }
1402
1403        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1404        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1405                || sources[kSubtitleIndex] != NULL) {
1406            info.mFetcher->startAsync(
1407                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1408        } else {
1409            info.mToBeRemoved = true;
1410        }
1411    }
1412
1413    // streamMask now only contains the types that need a new fetcher created.
1414
1415    if (streamMask != 0) {
1416        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1417    }
1418
1419    // Find out when the original fetchers have buffered up to and start the new fetchers
1420    // at a later timestamp.
1421    for (size_t i = 0; i < kMaxStreams; i++) {
1422        if (!(indexToType(i) & streamMask)) {
1423            continue;
1424        }
1425
1426        AString uri;
1427        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1428
1429        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1430        CHECK(fetcher != NULL);
1431
1432        int32_t latestSeq = -1;
1433        int64_t startTimeUs = -1;
1434        int64_t segmentStartTimeUs = -1ll;
1435        int32_t discontinuitySeq = -1;
1436        sp<AnotherPacketSource> sources[kMaxStreams];
1437
1438        if (i == kSubtitleIndex) {
1439            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1440        }
1441
1442        // TRICKY: looping from i as earlier streams are already removed from streamMask
1443        for (size_t j = i; j < kMaxStreams; ++j) {
1444            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1445            if ((streamMask & indexToType(j)) && uri == streamUri) {
1446                sources[j] = mPacketSources.valueFor(indexToType(j));
1447
1448                if (timeUs >= 0) {
1449                    sources[j]->clear();
1450                    startTimeUs = timeUs;
1451
1452                    sp<AnotherPacketSource> discontinuityQueue;
1453                    sp<AMessage> extra = new AMessage;
1454                    extra->setInt64("timeUs", timeUs);
1455                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1456                    discontinuityQueue->queueDiscontinuity(
1457                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1458                } else {
1459                    int32_t type;
1460                    int64_t srcSegmentStartTimeUs;
1461                    sp<AMessage> meta;
1462                    if (pickTrack) {
1463                        // selecting
1464                        meta = sources[j]->getLatestDequeuedMeta();
1465                    } else {
1466                        // adapting
1467                        meta = sources[j]->getLatestEnqueuedMeta();
1468                    }
1469
1470                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1471                        int64_t tmpUs;
1472                        CHECK(meta->findInt64("timeUs", &tmpUs));
1473                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1474                            startTimeUs = tmpUs;
1475                        }
1476
1477                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1478                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1479                            segmentStartTimeUs = tmpUs;
1480                        }
1481
1482                        int32_t seq;
1483                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1484                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1485                            discontinuitySeq = seq;
1486                        }
1487                    }
1488
1489                    if (pickTrack) {
1490                        // selecting track, queue discontinuities before content
1491                        sources[j]->clear();
1492                        if (j == kSubtitleIndex) {
1493                            break;
1494                        }
1495                        sp<AnotherPacketSource> discontinuityQueue;
1496                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1497                        discontinuityQueue->queueDiscontinuity(
1498                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1499                    } else {
1500                        // adapting, queue discontinuities after resume
1501                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1502                        sources[j]->clear();
1503                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1504                        if (extraStreams & indexToType(j)) {
1505                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1506                        }
1507                    }
1508                }
1509
1510                streamMask &= ~indexToType(j);
1511            }
1512        }
1513
1514        fetcher->startAsync(
1515                sources[kAudioIndex],
1516                sources[kVideoIndex],
1517                sources[kSubtitleIndex],
1518                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1519                segmentStartTimeUs,
1520                discontinuitySeq,
1521                switching);
1522    }
1523
1524    // All fetchers have now been started, the configuration change
1525    // has completed.
1526
1527    cancelCheckBandwidthEvent();
1528    scheduleCheckBandwidthEvent();
1529
1530    ALOGV("XXX configuration change completed.");
1531    mReconfigurationInProgress = false;
1532    if (switching) {
1533        mSwitchInProgress = true;
1534    } else {
1535        mStreamMask = mNewStreamMask;
1536    }
1537
1538    if (mDisconnectReplyID != 0) {
1539        finishDisconnect();
1540    }
1541}
1542
1543void LiveSession::onSwapped(const sp<AMessage> &msg) {
1544    int32_t switchGeneration;
1545    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1546    if (switchGeneration != mSwitchGeneration) {
1547        return;
1548    }
1549
1550    int32_t stream;
1551    CHECK(msg->findInt32("stream", &stream));
1552
1553    ssize_t idx = typeToIndex(stream);
1554    CHECK(idx >= 0);
1555    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1556        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1557    }
1558    mStreams[idx].mUri = mStreams[idx].mNewUri;
1559    mStreams[idx].mNewUri.clear();
1560
1561    mSwapMask &= ~stream;
1562    if (mSwapMask != 0) {
1563        return;
1564    }
1565
1566    // Check if new variant contains extra streams.
1567    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1568    while (extraStreams) {
1569        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1570        swapPacketSource(extraStream);
1571        extraStreams &= ~extraStream;
1572
1573        idx = typeToIndex(extraStream);
1574        CHECK(idx >= 0);
1575        if (mStreams[idx].mNewUri.empty()) {
1576            ALOGW("swapping extra stream type %d %s to empty stream",
1577                    extraStream, mStreams[idx].mUri.c_str());
1578        }
1579        mStreams[idx].mUri = mStreams[idx].mNewUri;
1580        mStreams[idx].mNewUri.clear();
1581    }
1582
1583    tryToFinishBandwidthSwitch();
1584}
1585
1586void LiveSession::onCheckSwitchDown() {
1587    if (mSwitchDownMonitor == NULL) {
1588        return;
1589    }
1590
1591    for (size_t i = 0; i < kMaxStreams; ++i) {
1592        int32_t targetDuration;
1593        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1594        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1595
1596        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1597            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1598            int64_t targetDurationUs = targetDuration * 1000000ll;
1599
1600            if (bufferedDurationUs < targetDurationUs / 3) {
1601                (new AMessage(kWhatSwitchDown, id()))->post();
1602                break;
1603            }
1604        }
1605    }
1606
1607    mSwitchDownMonitor->post(1000000ll);
1608}
1609
1610void LiveSession::onSwitchDown() {
1611    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1612        return;
1613    }
1614
1615    ssize_t bandwidthIndex = getBandwidthIndex();
1616    if (bandwidthIndex < mCurBandwidthIndex) {
1617        changeConfiguration(-1, bandwidthIndex, false);
1618        return;
1619    }
1620
1621    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1622}
1623
1624// Mark switch done when:
1625//   1. all old buffers are swapped out
1626void LiveSession::tryToFinishBandwidthSwitch() {
1627    if (!mSwitchInProgress) {
1628        return;
1629    }
1630
1631    bool needToRemoveFetchers = false;
1632    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1633        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1634            needToRemoveFetchers = true;
1635            break;
1636        }
1637    }
1638
1639    if (!needToRemoveFetchers && mSwapMask == 0) {
1640        ALOGI("mSwitchInProgress = false");
1641        mStreamMask = mNewStreamMask;
1642        mSwitchInProgress = false;
1643    }
1644}
1645
1646void LiveSession::scheduleCheckBandwidthEvent() {
1647    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1648    msg->setInt32("generation", mCheckBandwidthGeneration);
1649    msg->post(10000000ll);
1650}
1651
1652void LiveSession::cancelCheckBandwidthEvent() {
1653    ++mCheckBandwidthGeneration;
1654}
1655
1656void LiveSession::cancelBandwidthSwitch() {
1657    Mutex::Autolock lock(mSwapMutex);
1658    mSwitchGeneration++;
1659    mSwitchInProgress = false;
1660    mSwapMask = 0;
1661
1662    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1663        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1664        if (info.mToBeRemoved) {
1665            info.mToBeRemoved = false;
1666        }
1667    }
1668
1669    for (size_t i = 0; i < kMaxStreams; ++i) {
1670        if (!mStreams[i].mNewUri.empty()) {
1671            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1672            if (j < 0) {
1673                mStreams[i].mNewUri.clear();
1674                continue;
1675            }
1676
1677            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1678            info.mFetcher->stopAsync();
1679            mFetcherInfos.removeItemsAt(j);
1680            mStreams[i].mNewUri.clear();
1681        }
1682    }
1683}
1684
1685bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1686    if (mReconfigurationInProgress || mSwitchInProgress) {
1687        return false;
1688    }
1689
1690    if (mCurBandwidthIndex < 0) {
1691        return true;
1692    }
1693
1694    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1695        return false;
1696    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1697        return canSwitchUp();
1698    } else {
1699        return true;
1700    }
1701}
1702
1703void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1704    size_t bandwidthIndex = getBandwidthIndex();
1705    if (canSwitchBandwidthTo(bandwidthIndex)) {
1706        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1707    } else {
1708        // Come back and check again 10 seconds later in case there is nothing to do now.
1709        // If we DO change configuration, once that completes it'll schedule a new
1710        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1711        msg->post(10000000ll);
1712    }
1713}
1714
1715void LiveSession::postPrepared(status_t err) {
1716    CHECK(mInPreparationPhase);
1717
1718    sp<AMessage> notify = mNotify->dup();
1719    if (err == OK || err == ERROR_END_OF_STREAM) {
1720        notify->setInt32("what", kWhatPrepared);
1721    } else {
1722        notify->setInt32("what", kWhatPreparationFailed);
1723        notify->setInt32("err", err);
1724    }
1725
1726    notify->post();
1727
1728    mInPreparationPhase = false;
1729
1730    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1731    mSwitchDownMonitor->post();
1732}
1733
1734}  // namespace android
1735
1736