LiveSession.cpp revision 4604458dfe57b0e91a464aefafea50ae7b9876c1
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52// static
53// Number of recently-read bytes to use for bandwidth estimation
54const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
55// High water mark to start up switch or report prepared)
56const int64_t LiveSession::kHighWaterMark = 8000000ll;
57const int64_t LiveSession::kMidWaterMark = 5000000ll;
58const int64_t LiveSession::kLowWaterMark = 3000000ll;
59
60LiveSession::LiveSession(
61        const sp<AMessage> &notify, uint32_t flags,
62        const sp<IMediaHTTPService> &httpService)
63    : mNotify(notify),
64      mFlags(flags),
65      mHTTPService(httpService),
66      mInPreparationPhase(true),
67      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
68      mCurBandwidthIndex(-1),
69      mStreamMask(0),
70      mNewStreamMask(0),
71      mSwapMask(0),
72      mCheckBandwidthGeneration(0),
73      mSwitchGeneration(0),
74      mSubtitleGeneration(0),
75      mLastDequeuedTimeUs(0ll),
76      mRealTimeBaseUs(0ll),
77      mReconfigurationInProgress(false),
78      mSwitchInProgress(false),
79      mFirstTimeUsValid(false),
80      mFirstTimeUs(0),
81      mLastSeekTimeUs(0),
82      mPollBufferingGeneration(0) {
83
84    mStreams[kAudioIndex] = StreamItem("audio");
85    mStreams[kVideoIndex] = StreamItem("video");
86    mStreams[kSubtitleIndex] = StreamItem("subtitles");
87
88    for (size_t i = 0; i < kMaxStreams; ++i) {
89        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
90        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
91    }
92
93    size_t numHistoryItems = kBandwidthHistoryBytes /
94            PlaylistFetcher::kDownloadBlockSize + 1;
95    if (numHistoryItems < 5) {
96        numHistoryItems = 5;
97    }
98    mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
99}
100
101LiveSession::~LiveSession() {
102    if (mFetcherLooper != NULL) {
103        mFetcherLooper->stop();
104    }
105}
106
107sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
108    ABuffer *discontinuity = new ABuffer(0);
109    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
110    discontinuity->meta()->setInt32("swapPacketSource", swap);
111    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
112    discontinuity->meta()->setInt64("timeUs", -1);
113    return discontinuity;
114}
115
116void LiveSession::swapPacketSource(StreamType stream) {
117    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
118    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
119    sp<AnotherPacketSource> tmp = aps;
120    aps = aps2;
121    aps2 = tmp;
122    aps2->clear();
123}
124
125status_t LiveSession::dequeueAccessUnit(
126        StreamType stream, sp<ABuffer> *accessUnit) {
127    if (!(mStreamMask & stream)) {
128        // return -EWOULDBLOCK to avoid halting the decoder
129        // when switching between audio/video and audio only.
130        return -EWOULDBLOCK;
131    }
132
133    status_t finalResult = OK;
134    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
135
136    ssize_t idx = typeToIndex(stream);
137    if (!packetSource->hasBufferAvailable(&finalResult)) {
138        if (finalResult == OK) {
139            return -EAGAIN;
140        } else {
141            return finalResult;
142        }
143    }
144
145    // Do not let client pull data if we don't have format yet.
146    // We might only have a format discontinuity queued without actual data.
147    // When NuPlayerDecoder dequeues the format discontinuity, it will
148    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
149    // thinks it can do seamless change, so will not shutdown decoder.
150    // When the actual format arrives, it can't handle it and get stuck.
151    // TODO: We need a method to check if the packet source has any
152    //       data packets available, dequeuing should only start then.
153    sp<MetaData> format = packetSource->getFormat();
154    if (format == NULL) {
155        return -EAGAIN;
156    }
157    int32_t targetDuration = 0;
158    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
159    if (meta != NULL) {
160        meta->findInt32("targetDuration", &targetDuration);
161    }
162
163    int64_t targetDurationUs = targetDuration * 1000000ll;
164    if (targetDurationUs == 0 ||
165            targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
166        // Fetchers limit buffering to
167        // min(3 * targetDuration, kMinBufferedDurationUs)
168        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
169    }
170
171    // wait for counterpart
172    sp<AnotherPacketSource> otherSource;
173    uint32_t mask = mNewStreamMask & mStreamMask;
174    uint32_t fetchersMask  = 0;
175    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
176        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
177        fetchersMask |= fetcherMask;
178    }
179    mask &= fetchersMask;
180    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
181        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
182    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
183        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
184    }
185    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
186        return finalResult == OK ? -EAGAIN : finalResult;
187    }
188
189    status_t err = packetSource->dequeueAccessUnit(accessUnit);
190
191    size_t streamIdx;
192    const char *streamStr;
193    switch (stream) {
194        case STREAMTYPE_AUDIO:
195            streamIdx = kAudioIndex;
196            streamStr = "audio";
197            break;
198        case STREAMTYPE_VIDEO:
199            streamIdx = kVideoIndex;
200            streamStr = "video";
201            break;
202        case STREAMTYPE_SUBTITLES:
203            streamIdx = kSubtitleIndex;
204            streamStr = "subs";
205            break;
206        default:
207            TRESPASS();
208    }
209
210    StreamItem& strm = mStreams[streamIdx];
211    if (err == INFO_DISCONTINUITY) {
212        // adaptive streaming, discontinuities in the playlist
213        int32_t type;
214        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
215
216        sp<AMessage> extra;
217        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
218            extra.clear();
219        }
220
221        ALOGI("[%s] read discontinuity of type %d, extra = %s",
222              streamStr,
223              type,
224              extra == NULL ? "NULL" : extra->debugString().c_str());
225
226        int32_t swap;
227        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
228            int32_t switchGeneration;
229            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
230            {
231                Mutex::Autolock lock(mSwapMutex);
232                if (switchGeneration == mSwitchGeneration) {
233                    swapPacketSource(stream);
234                    sp<AMessage> msg = new AMessage(kWhatSwapped, this);
235                    msg->setInt32("stream", stream);
236                    msg->setInt32("switchGeneration", switchGeneration);
237                    msg->post();
238                }
239            }
240        } else {
241            size_t seq = strm.mCurDiscontinuitySeq;
242            int64_t offsetTimeUs;
243            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
244                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
245            } else {
246                offsetTimeUs = 0;
247            }
248
249            seq += 1;
250            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
251                int64_t firstTimeUs;
252                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
253                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
254                offsetTimeUs += strm.mLastSampleDurationUs;
255            } else {
256                offsetTimeUs += strm.mLastSampleDurationUs;
257            }
258
259            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
260        }
261    } else if (err == OK) {
262
263        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
264            int64_t timeUs;
265            int32_t discontinuitySeq = 0;
266            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
267            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
268            strm.mCurDiscontinuitySeq = discontinuitySeq;
269
270            int32_t discard = 0;
271            int64_t firstTimeUs;
272            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
273                int64_t durUs; // approximate sample duration
274                if (timeUs > strm.mLastDequeuedTimeUs) {
275                    durUs = timeUs - strm.mLastDequeuedTimeUs;
276                } else {
277                    durUs = strm.mLastDequeuedTimeUs - timeUs;
278                }
279                strm.mLastSampleDurationUs = durUs;
280                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
281            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
282                firstTimeUs = timeUs;
283            } else {
284                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
285                firstTimeUs = timeUs;
286            }
287
288            strm.mLastDequeuedTimeUs = timeUs;
289            if (timeUs >= firstTimeUs) {
290                timeUs -= firstTimeUs;
291            } else {
292                timeUs = 0;
293            }
294            timeUs += mLastSeekTimeUs;
295            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
296                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
297            }
298
299            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
300            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
301            mLastDequeuedTimeUs = timeUs;
302            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
303        } else if (stream == STREAMTYPE_SUBTITLES) {
304            int32_t subtitleGeneration;
305            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
306                    && subtitleGeneration != mSubtitleGeneration) {
307               return -EAGAIN;
308            };
309            (*accessUnit)->meta()->setInt32(
310                    "trackIndex", mPlaylist->getSelectedIndex());
311            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
312        }
313    } else {
314        ALOGI("[%s] encountered error %d", streamStr, err);
315    }
316
317    return err;
318}
319
320status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
321    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
322    if (!(mStreamMask & stream)) {
323        return UNKNOWN_ERROR;
324    }
325
326    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
327
328    sp<MetaData> meta = packetSource->getFormat();
329
330    if (meta == NULL) {
331        return -EAGAIN;
332    }
333
334    return convertMetaDataToMessage(meta, format);
335}
336
337void LiveSession::connectAsync(
338        const char *url, const KeyedVector<String8, String8> *headers) {
339    sp<AMessage> msg = new AMessage(kWhatConnect, this);
340    msg->setString("url", url);
341
342    if (headers != NULL) {
343        msg->setPointer(
344                "headers",
345                new KeyedVector<String8, String8>(*headers));
346    }
347
348    msg->post();
349}
350
351status_t LiveSession::disconnect() {
352    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
353
354    sp<AMessage> response;
355    status_t err = msg->postAndAwaitResponse(&response);
356
357    return err;
358}
359
360status_t LiveSession::seekTo(int64_t timeUs) {
361    sp<AMessage> msg = new AMessage(kWhatSeek, this);
362    msg->setInt64("timeUs", timeUs);
363
364    sp<AMessage> response;
365    status_t err = msg->postAndAwaitResponse(&response);
366
367    return err;
368}
369
370void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
371    switch (msg->what()) {
372        case kWhatConnect:
373        {
374            onConnect(msg);
375            break;
376        }
377
378        case kWhatDisconnect:
379        {
380            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
381
382            if (mReconfigurationInProgress) {
383                break;
384            }
385
386            finishDisconnect();
387            break;
388        }
389
390        case kWhatSeek:
391        {
392            sp<AReplyToken> seekReplyID;
393            CHECK(msg->senderAwaitsResponse(&seekReplyID));
394            mSeekReplyID = seekReplyID;
395            mSeekReply = new AMessage;
396
397            status_t err = onSeek(msg);
398
399            if (err != OK) {
400                msg->post(50000);
401            }
402            break;
403        }
404
405        case kWhatFetcherNotify:
406        {
407            int32_t what;
408            CHECK(msg->findInt32("what", &what));
409
410            switch (what) {
411                case PlaylistFetcher::kWhatStarted:
412                    break;
413                case PlaylistFetcher::kWhatPaused:
414                case PlaylistFetcher::kWhatStopped:
415                {
416                    if (what == PlaylistFetcher::kWhatStopped) {
417                        AString uri;
418                        CHECK(msg->findString("uri", &uri));
419                        ssize_t index = mFetcherInfos.indexOfKey(uri);
420                        if (index < 0) {
421                            // ignore duplicated kWhatStopped messages.
422                            break;
423                        }
424
425                        mFetcherLooper->unregisterHandler(
426                                mFetcherInfos[index].mFetcher->id());
427                        mFetcherInfos.removeItemsAt(index);
428
429                        if (mSwitchInProgress) {
430                            tryToFinishBandwidthSwitch();
431                        }
432                    }
433
434                    if (mContinuation != NULL) {
435                        CHECK_GT(mContinuationCounter, 0);
436                        if (--mContinuationCounter == 0) {
437                            mContinuation->post();
438                        }
439                    }
440                    break;
441                }
442
443                case PlaylistFetcher::kWhatDurationUpdate:
444                {
445                    AString uri;
446                    CHECK(msg->findString("uri", &uri));
447
448                    int64_t durationUs;
449                    CHECK(msg->findInt64("durationUs", &durationUs));
450
451                    ssize_t index = mFetcherInfos.indexOfKey(uri);
452                    if (index >= 0) {
453                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
454                        info->mDurationUs = durationUs;
455                    }
456                    break;
457                }
458
459                case PlaylistFetcher::kWhatError:
460                {
461                    status_t err;
462                    CHECK(msg->findInt32("err", &err));
463
464                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
465
466                    // handle EOS on subtitle tracks independently
467                    AString uri;
468                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
469                        ssize_t i = mFetcherInfos.indexOfKey(uri);
470                        if (i >= 0) {
471                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
472                            if (fetcher != NULL) {
473                                uint32_t type = fetcher->getStreamTypeMask();
474                                if (type == STREAMTYPE_SUBTITLES) {
475                                    mPacketSources.valueFor(
476                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
477                                    break;
478                                }
479                            }
480                        }
481                    }
482
483                    if (mInPreparationPhase) {
484                        postPrepared(err);
485                    }
486
487                    cancelBandwidthSwitch();
488
489                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
490
491                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
492
493                    mPacketSources.valueFor(
494                            STREAMTYPE_SUBTITLES)->signalEOS(err);
495
496                    sp<AMessage> notify = mNotify->dup();
497                    notify->setInt32("what", kWhatError);
498                    notify->setInt32("err", err);
499                    notify->post();
500                    break;
501                }
502
503                case PlaylistFetcher::kWhatStartedAt:
504                {
505                    int32_t switchGeneration;
506                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
507
508                    if (switchGeneration != mSwitchGeneration) {
509                        break;
510                    }
511
512                    // Resume fetcher for the original variant; the resumed fetcher should
513                    // continue until the timestamps found in msg, which is stored by the
514                    // new fetcher to indicate where the new variant has started buffering.
515                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
516                        const FetcherInfo info = mFetcherInfos.valueAt(i);
517                        if (info.mToBeRemoved) {
518                            info.mFetcher->resumeUntilAsync(msg);
519                        }
520                    }
521                    break;
522                }
523
524                default:
525                    TRESPASS();
526            }
527
528            break;
529        }
530
531        case kWhatChangeConfiguration:
532        {
533            onChangeConfiguration(msg);
534            break;
535        }
536
537        case kWhatChangeConfiguration2:
538        {
539            onChangeConfiguration2(msg);
540            break;
541        }
542
543        case kWhatChangeConfiguration3:
544        {
545            onChangeConfiguration3(msg);
546            break;
547        }
548
549        case kWhatFinishDisconnect2:
550        {
551            onFinishDisconnect2();
552            break;
553        }
554
555        case kWhatSwapped:
556        {
557            onSwapped(msg);
558            break;
559        }
560
561        case kWhatPollBuffering:
562        {
563            int32_t generation;
564            CHECK(msg->findInt32("generation", &generation));
565            if (generation == mPollBufferingGeneration) {
566                onPollBuffering();
567            }
568            break;
569        }
570
571        default:
572            TRESPASS();
573            break;
574    }
575}
576
577// static
578int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
579    if (a->mBandwidth < b->mBandwidth) {
580        return -1;
581    } else if (a->mBandwidth == b->mBandwidth) {
582        return 0;
583    }
584
585    return 1;
586}
587
588// static
589LiveSession::StreamType LiveSession::indexToType(int idx) {
590    CHECK(idx >= 0 && idx < kMaxStreams);
591    return (StreamType)(1 << idx);
592}
593
594// static
595ssize_t LiveSession::typeToIndex(int32_t type) {
596    switch (type) {
597        case STREAMTYPE_AUDIO:
598            return 0;
599        case STREAMTYPE_VIDEO:
600            return 1;
601        case STREAMTYPE_SUBTITLES:
602            return 2;
603        default:
604            return -1;
605    };
606    return -1;
607}
608
609void LiveSession::onConnect(const sp<AMessage> &msg) {
610    AString url;
611    CHECK(msg->findString("url", &url));
612
613    KeyedVector<String8, String8> *headers = NULL;
614    if (!msg->findPointer("headers", (void **)&headers)) {
615        mExtraHeaders.clear();
616    } else {
617        mExtraHeaders = *headers;
618
619        delete headers;
620        headers = NULL;
621    }
622
623    // TODO currently we don't know if we are coming here from incognito mode
624    ALOGI("onConnect %s", uriDebugString(url).c_str());
625
626    mMasterURL = url;
627
628    bool dummy;
629    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
630
631    if (mPlaylist == NULL) {
632        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
633
634        postPrepared(ERROR_IO);
635        return;
636    }
637
638    // create looper for fetchers
639    if (mFetcherLooper == NULL) {
640        mFetcherLooper = new ALooper();
641
642        mFetcherLooper->setName("Fetcher");
643        mFetcherLooper->start(false, false);
644    }
645
646    // We trust the content provider to make a reasonable choice of preferred
647    // initial bandwidth by listing it first in the variant playlist.
648    // At startup we really don't have a good estimate on the available
649    // network bandwidth since we haven't tranferred any data yet. Once
650    // we have we can make a better informed choice.
651    size_t initialBandwidth = 0;
652    size_t initialBandwidthIndex = 0;
653
654    if (mPlaylist->isVariantPlaylist()) {
655        for (size_t i = 0; i < mPlaylist->size(); ++i) {
656            BandwidthItem item;
657
658            item.mPlaylistIndex = i;
659
660            sp<AMessage> meta;
661            AString uri;
662            mPlaylist->itemAt(i, &uri, &meta);
663
664            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
665
666            if (initialBandwidth == 0) {
667                initialBandwidth = item.mBandwidth;
668            }
669
670            mBandwidthItems.push(item);
671        }
672
673        CHECK_GT(mBandwidthItems.size(), 0u);
674
675        mBandwidthItems.sort(SortByBandwidth);
676
677        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
678            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
679                initialBandwidthIndex = i;
680                break;
681            }
682        }
683    } else {
684        // dummy item.
685        BandwidthItem item;
686        item.mPlaylistIndex = 0;
687        item.mBandwidth = 0;
688        mBandwidthItems.push(item);
689    }
690
691    mPlaylist->pickRandomMediaItems();
692    changeConfiguration(
693            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
694
695    schedulePollBuffering();
696}
697
698void LiveSession::finishDisconnect() {
699    // No reconfiguration is currently pending, make sure none will trigger
700    // during disconnection either.
701
702    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
703    // (finishDisconnect, onFinishDisconnect2)
704    cancelBandwidthSwitch();
705
706    // cancel buffer polling
707    cancelPollBuffering();
708
709    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
710        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
711    }
712
713    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
714
715    mContinuationCounter = mFetcherInfos.size();
716    mContinuation = msg;
717
718    if (mContinuationCounter == 0) {
719        msg->post();
720    }
721}
722
723void LiveSession::onFinishDisconnect2() {
724    mContinuation.clear();
725
726    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
727    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
728
729    mPacketSources.valueFor(
730            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
731
732    sp<AMessage> response = new AMessage;
733    response->setInt32("err", OK);
734
735    response->postReply(mDisconnectReplyID);
736    mDisconnectReplyID.clear();
737}
738
739sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
740    ssize_t index = mFetcherInfos.indexOfKey(uri);
741
742    if (index >= 0) {
743        return NULL;
744    }
745
746    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
747    notify->setString("uri", uri);
748    notify->setInt32("switchGeneration", mSwitchGeneration);
749
750    FetcherInfo info;
751    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
752    info.mDurationUs = -1ll;
753    info.mIsPrepared = false;
754    info.mToBeRemoved = false;
755    mFetcherLooper->registerHandler(info.mFetcher);
756
757    mFetcherInfos.add(uri, info);
758
759    return info.mFetcher;
760}
761
762/*
763 * Illustration of parameters:
764 *
765 * 0      `range_offset`
766 * +------------+-------------------------------------------------------+--+--+
767 * |            |                                 | next block to fetch |  |  |
768 * |            | `source` handle => `out` buffer |                     |  |  |
769 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
770 * |            |<----------- `range_length` / buffer capacity ----------->|  |
771 * |<------------------------------ file_size ------------------------------->|
772 *
773 * Special parameter values:
774 * - range_length == -1 means entire file
775 * - block_size == 0 means entire range
776 *
777 */
778ssize_t LiveSession::fetchFile(
779        const char *url, sp<ABuffer> *out,
780        int64_t range_offset, int64_t range_length,
781        uint32_t block_size, /* download block size */
782        sp<DataSource> *source, /* to return and reuse source */
783        String8 *actualUrl) {
784    off64_t size;
785    sp<DataSource> temp_source;
786    if (source == NULL) {
787        source = &temp_source;
788    }
789
790    if (*source == NULL) {
791        if (!strncasecmp(url, "file://", 7)) {
792            *source = new FileSource(url + 7);
793        } else if (strncasecmp(url, "http://", 7)
794                && strncasecmp(url, "https://", 8)) {
795            return ERROR_UNSUPPORTED;
796        } else {
797            KeyedVector<String8, String8> headers = mExtraHeaders;
798            if (range_offset > 0 || range_length >= 0) {
799                headers.add(
800                        String8("Range"),
801                        String8(
802                            AStringPrintf(
803                                "bytes=%lld-%s",
804                                range_offset,
805                                range_length < 0
806                                    ? "" : AStringPrintf("%lld",
807                                            range_offset + range_length - 1).c_str()).c_str()));
808            }
809            status_t err = mHTTPDataSource->connect(url, &headers);
810
811            if (err != OK) {
812                return err;
813            }
814
815            *source = mHTTPDataSource;
816        }
817    }
818
819    status_t getSizeErr = (*source)->getSize(&size);
820    if (getSizeErr != OK) {
821        size = 65536;
822    }
823
824    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
825    if (*out == NULL) {
826        buffer->setRange(0, 0);
827    }
828
829    ssize_t bytesRead = 0;
830    // adjust range_length if only reading partial block
831    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
832        range_length = buffer->size() + block_size;
833    }
834    for (;;) {
835        // Only resize when we don't know the size.
836        size_t bufferRemaining = buffer->capacity() - buffer->size();
837        if (bufferRemaining == 0 && getSizeErr != OK) {
838            size_t bufferIncrement = buffer->size() / 2;
839            if (bufferIncrement < 32768) {
840                bufferIncrement = 32768;
841            }
842            bufferRemaining = bufferIncrement;
843
844            ALOGV("increasing download buffer to %zu bytes",
845                 buffer->size() + bufferRemaining);
846
847            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
848            memcpy(copy->data(), buffer->data(), buffer->size());
849            copy->setRange(0, buffer->size());
850
851            buffer = copy;
852        }
853
854        size_t maxBytesToRead = bufferRemaining;
855        if (range_length >= 0) {
856            int64_t bytesLeftInRange = range_length - buffer->size();
857            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
858                maxBytesToRead = bytesLeftInRange;
859
860                if (bytesLeftInRange == 0) {
861                    break;
862                }
863            }
864        }
865
866        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
867        // to help us break out of the loop.
868        ssize_t n = (*source)->readAt(
869                buffer->size(), buffer->data() + buffer->size(),
870                maxBytesToRead);
871
872        if (n < 0) {
873            return n;
874        }
875
876        if (n == 0) {
877            break;
878        }
879
880        buffer->setRange(0, buffer->size() + (size_t)n);
881        bytesRead += n;
882    }
883
884    *out = buffer;
885    if (actualUrl != NULL) {
886        *actualUrl = (*source)->getUri();
887        if (actualUrl->isEmpty()) {
888            *actualUrl = url;
889        }
890    }
891
892    return bytesRead;
893}
894
895sp<M3UParser> LiveSession::fetchPlaylist(
896        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
897    ALOGV("fetchPlaylist '%s'", url);
898
899    *unchanged = false;
900
901    sp<ABuffer> buffer;
902    String8 actualUrl;
903    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
904
905    if (err <= 0) {
906        return NULL;
907    }
908
909    // MD5 functionality is not available on the simulator, treat all
910    // playlists as changed.
911
912#if defined(HAVE_ANDROID_OS)
913    uint8_t hash[16];
914
915    MD5_CTX m;
916    MD5_Init(&m);
917    MD5_Update(&m, buffer->data(), buffer->size());
918
919    MD5_Final(hash, &m);
920
921    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
922        // playlist unchanged
923        *unchanged = true;
924
925        return NULL;
926    }
927
928    if (curPlaylistHash != NULL) {
929        memcpy(curPlaylistHash, hash, sizeof(hash));
930    }
931#endif
932
933    sp<M3UParser> playlist =
934        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
935
936    if (playlist->initCheck() != OK) {
937        ALOGE("failed to parse .m3u8 playlist");
938
939        return NULL;
940    }
941
942    return playlist;
943}
944
945#if 0
946static double uniformRand() {
947    return (double)rand() / RAND_MAX;
948}
949#endif
950
951size_t LiveSession::getBandwidthIndex() {
952    if (mBandwidthItems.size() == 0) {
953        return 0;
954    }
955
956#if 1
957    char value[PROPERTY_VALUE_MAX];
958    ssize_t index = -1;
959    if (property_get("media.httplive.bw-index", value, NULL)) {
960        char *end;
961        index = strtol(value, &end, 10);
962        CHECK(end > value && *end == '\0');
963
964        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
965            index = mBandwidthItems.size() - 1;
966        }
967    }
968
969    if (index < 0) {
970        int32_t bandwidthBps;
971        if (mHTTPDataSource != NULL
972                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
973            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
974        } else {
975            ALOGV("no bandwidth estimate.");
976            return 0;  // Pick the lowest bandwidth stream by default.
977        }
978
979        char value[PROPERTY_VALUE_MAX];
980        if (property_get("media.httplive.max-bw", value, NULL)) {
981            char *end;
982            long maxBw = strtoul(value, &end, 10);
983            if (end > value && *end == '\0') {
984                if (maxBw > 0 && bandwidthBps > maxBw) {
985                    ALOGV("bandwidth capped to %ld bps", maxBw);
986                    bandwidthBps = maxBw;
987                }
988            }
989        }
990
991        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
992
993        index = mBandwidthItems.size() - 1;
994        while (index > 0) {
995            // consider only 80% of the available bandwidth, but if we are switching up,
996            // be even more conservative (70%) to avoid overestimating and immediately
997            // switching back.
998            size_t adjustedBandwidthBps = bandwidthBps;
999            if (index > mCurBandwidthIndex) {
1000                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1001            } else {
1002                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1003            }
1004            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1005                break;
1006            }
1007            --index;
1008        }
1009    }
1010#elif 0
1011    // Change bandwidth at random()
1012    size_t index = uniformRand() * mBandwidthItems.size();
1013#elif 0
1014    // There's a 50% chance to stay on the current bandwidth and
1015    // a 50% chance to switch to the next higher bandwidth (wrapping around
1016    // to lowest)
1017    const size_t kMinIndex = 0;
1018
1019    static ssize_t mCurBandwidthIndex = -1;
1020
1021    size_t index;
1022    if (mCurBandwidthIndex < 0) {
1023        index = kMinIndex;
1024    } else if (uniformRand() < 0.5) {
1025        index = (size_t)mCurBandwidthIndex;
1026    } else {
1027        index = mCurBandwidthIndex + 1;
1028        if (index == mBandwidthItems.size()) {
1029            index = kMinIndex;
1030        }
1031    }
1032    mCurBandwidthIndex = index;
1033#elif 0
1034    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1035
1036    size_t index = mBandwidthItems.size() - 1;
1037    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1038        --index;
1039    }
1040#elif 1
1041    char value[PROPERTY_VALUE_MAX];
1042    size_t index;
1043    if (property_get("media.httplive.bw-index", value, NULL)) {
1044        char *end;
1045        index = strtoul(value, &end, 10);
1046        CHECK(end > value && *end == '\0');
1047
1048        if (index >= mBandwidthItems.size()) {
1049            index = mBandwidthItems.size() - 1;
1050        }
1051    } else {
1052        index = 0;
1053    }
1054#else
1055    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1056#endif
1057
1058    CHECK_GE(index, 0);
1059
1060    return index;
1061}
1062
1063int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1064    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1065    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1066    if (audioMeta != NULL) {
1067        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1068    }
1069
1070    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1071    if (videoMeta != NULL
1072            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1073        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1074            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1075        }
1076
1077    }
1078    return minSegmentStartTimeUs;
1079}
1080
1081status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1082    int64_t timeUs;
1083    CHECK(msg->findInt64("timeUs", &timeUs));
1084
1085    if (!mReconfigurationInProgress) {
1086        changeConfiguration(timeUs, mCurBandwidthIndex);
1087        return OK;
1088    } else {
1089        return -EWOULDBLOCK;
1090    }
1091}
1092
1093status_t LiveSession::getDuration(int64_t *durationUs) const {
1094    int64_t maxDurationUs = -1ll;
1095    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1096        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1097
1098        if (fetcherDurationUs > maxDurationUs) {
1099            maxDurationUs = fetcherDurationUs;
1100        }
1101    }
1102
1103    *durationUs = maxDurationUs;
1104
1105    return OK;
1106}
1107
1108bool LiveSession::isSeekable() const {
1109    int64_t durationUs;
1110    return getDuration(&durationUs) == OK && durationUs >= 0;
1111}
1112
1113bool LiveSession::hasDynamicDuration() const {
1114    return false;
1115}
1116
1117size_t LiveSession::getTrackCount() const {
1118    if (mPlaylist == NULL) {
1119        return 0;
1120    } else {
1121        return mPlaylist->getTrackCount();
1122    }
1123}
1124
1125sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1126    if (mPlaylist == NULL) {
1127        return NULL;
1128    } else {
1129        return mPlaylist->getTrackInfo(trackIndex);
1130    }
1131}
1132
1133status_t LiveSession::selectTrack(size_t index, bool select) {
1134    if (mPlaylist == NULL) {
1135        return INVALID_OPERATION;
1136    }
1137
1138    ++mSubtitleGeneration;
1139    status_t err = mPlaylist->selectTrack(index, select);
1140    if (err == OK) {
1141        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1142        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1143        msg->setInt32("pickTrack", select);
1144        msg->post();
1145    }
1146    return err;
1147}
1148
1149ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1150    if (mPlaylist == NULL) {
1151        return -1;
1152    } else {
1153        return mPlaylist->getSelectedTrack(type);
1154    }
1155}
1156
1157void LiveSession::changeConfiguration(
1158        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1159    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1160    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1161    cancelBandwidthSwitch();
1162
1163    CHECK(!mReconfigurationInProgress);
1164    mReconfigurationInProgress = true;
1165
1166    mCurBandwidthIndex = bandwidthIndex;
1167
1168    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1169          timeUs, bandwidthIndex, pickTrack);
1170
1171    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1172    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1173
1174    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1175    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1176
1177    AString URIs[kMaxStreams];
1178    for (size_t i = 0; i < kMaxStreams; ++i) {
1179        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1180            streamMask |= indexToType(i);
1181        }
1182    }
1183
1184    // Step 1, stop and discard fetchers that are no longer needed.
1185    // Pause those that we'll reuse.
1186    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1187        const AString &uri = mFetcherInfos.keyAt(i);
1188
1189        bool discardFetcher = true;
1190
1191        if (timeUs < 0ll) {
1192            // delay fetcher removal if not picking tracks
1193            discardFetcher = pickTrack;
1194
1195        }
1196
1197        for (size_t j = 0; j < kMaxStreams; ++j) {
1198            StreamType type = indexToType(j);
1199            if ((streamMask & type) && uri == URIs[j]) {
1200                resumeMask |= type;
1201                streamMask &= ~type;
1202                discardFetcher = false;
1203            }
1204        }
1205
1206        if (discardFetcher) {
1207            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1208        } else {
1209            // if we're seeking, pause immediately (no need to finish the segment)
1210            bool immediate = (timeUs >= 0ll);
1211            mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate);
1212        }
1213    }
1214
1215    sp<AMessage> msg;
1216    if (timeUs < 0ll) {
1217        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1218        msg = new AMessage(kWhatChangeConfiguration3, this);
1219    } else {
1220        msg = new AMessage(kWhatChangeConfiguration2, this);
1221    }
1222    msg->setInt32("streamMask", streamMask);
1223    msg->setInt32("resumeMask", resumeMask);
1224    msg->setInt32("pickTrack", pickTrack);
1225    msg->setInt64("timeUs", timeUs);
1226    for (size_t i = 0; i < kMaxStreams; ++i) {
1227        if ((streamMask | resumeMask) & indexToType(i)) {
1228            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1229        }
1230    }
1231
1232    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1233    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1234    // fetchers have completed their asynchronous operation, we'll post
1235    // mContinuation, which then is handled below in onChangeConfiguration2.
1236    mContinuationCounter = mFetcherInfos.size();
1237    mContinuation = msg;
1238
1239    if (mContinuationCounter == 0) {
1240        msg->post();
1241    }
1242}
1243
1244void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1245    if (!mReconfigurationInProgress) {
1246        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1247        msg->findInt32("pickTrack", &pickTrack);
1248        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1249        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1250    } else {
1251        msg->post(1000000ll); // retry in 1 sec
1252    }
1253}
1254
1255void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1256    mContinuation.clear();
1257
1258    // All fetchers are either suspended or have been removed now.
1259
1260    // If we're seeking, clear all packet sources before we report
1261    // seek complete, to prevent decoder from pulling stale data.
1262    int64_t timeUs;
1263    CHECK(msg->findInt64("timeUs", &timeUs));
1264
1265    if (timeUs >= 0) {
1266        mLastSeekTimeUs = timeUs;
1267
1268        for (size_t i = 0; i < mPacketSources.size(); i++) {
1269            mPacketSources.editValueAt(i)->clear();
1270        }
1271
1272        mDiscontinuityOffsetTimesUs.clear();
1273        mDiscontinuityAbsStartTimesUs.clear();
1274
1275        if (mSeekReplyID != NULL) {
1276            CHECK(mSeekReply != NULL);
1277            mSeekReply->setInt32("err", OK);
1278            mSeekReply->postReply(mSeekReplyID);
1279            mSeekReplyID.clear();
1280            mSeekReply.clear();
1281        }
1282    }
1283
1284    uint32_t streamMask, resumeMask;
1285    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1286    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1287
1288    streamMask |= resumeMask;
1289
1290    AString URIs[kMaxStreams];
1291    for (size_t i = 0; i < kMaxStreams; ++i) {
1292        if (streamMask & indexToType(i)) {
1293            const AString &uriKey = mStreams[i].uriKey();
1294            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1295            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1296        }
1297    }
1298
1299    uint32_t changedMask = 0;
1300    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1301        // stream URI could change even if onChangeConfiguration2 is only
1302        // used for seek. Seek could happen during a bw switch, in this
1303        // case bw switch will be cancelled, but the seekTo position will
1304        // fetch from the new URI.
1305        if ((mStreamMask & streamMask & indexToType(i))
1306                && !mStreams[i].mUri.empty()
1307                && !(URIs[i] == mStreams[i].mUri)) {
1308            ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1309                    mStreams[i].mUri.c_str(), URIs[i].c_str());
1310            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1311            source->queueDiscontinuity(
1312                    ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1313        }
1314        // Determine which decoders to shutdown on the player side,
1315        // a decoder has to be shutdown if its streamtype was active
1316        // before but now longer isn't.
1317        if ((mStreamMask & ~streamMask & indexToType(i))) {
1318            changedMask |= indexToType(i);
1319        }
1320    }
1321
1322    if (changedMask == 0) {
1323        // If nothing changed as far as the audio/video decoders
1324        // are concerned we can proceed.
1325        onChangeConfiguration3(msg);
1326        return;
1327    }
1328
1329    // Something changed, inform the player which will shutdown the
1330    // corresponding decoders and will post the reply once that's done.
1331    // Handling the reply will continue executing below in
1332    // onChangeConfiguration3.
1333    sp<AMessage> notify = mNotify->dup();
1334    notify->setInt32("what", kWhatStreamsChanged);
1335    notify->setInt32("changedMask", changedMask);
1336
1337    msg->setWhat(kWhatChangeConfiguration3);
1338    msg->setTarget(this);
1339
1340    notify->setMessage("reply", msg);
1341    notify->post();
1342}
1343
1344void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1345    mContinuation.clear();
1346    // All remaining fetchers are still suspended, the player has shutdown
1347    // any decoders that needed it.
1348
1349    uint32_t streamMask, resumeMask;
1350    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1351    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1352
1353    int64_t timeUs;
1354    int32_t pickTrack;
1355    bool switching = false;
1356    CHECK(msg->findInt64("timeUs", &timeUs));
1357    CHECK(msg->findInt32("pickTrack", &pickTrack));
1358
1359    if (timeUs < 0ll) {
1360        if (!pickTrack) {
1361            switching = true;
1362        }
1363        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1364    } else {
1365        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1366    }
1367
1368    for (size_t i = 0; i < kMaxStreams; ++i) {
1369        if (streamMask & indexToType(i)) {
1370            if (switching) {
1371                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1372            } else {
1373                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1374            }
1375        }
1376    }
1377
1378    mNewStreamMask = streamMask | resumeMask;
1379    if (switching) {
1380        mSwapMask = mStreamMask & ~resumeMask;
1381    }
1382
1383    // Of all existing fetchers:
1384    // * Resume fetchers that are still needed and assign them original packet sources.
1385    // * Mark otherwise unneeded fetchers for removal.
1386    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1387    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1388        const AString &uri = mFetcherInfos.keyAt(i);
1389
1390        sp<AnotherPacketSource> sources[kMaxStreams];
1391        for (size_t j = 0; j < kMaxStreams; ++j) {
1392            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1393                sources[j] = mPacketSources.valueFor(indexToType(j));
1394            }
1395        }
1396        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1397        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1398                || sources[kSubtitleIndex] != NULL) {
1399            info.mFetcher->startAsync(
1400                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs);
1401        } else {
1402            info.mToBeRemoved = true;
1403        }
1404    }
1405
1406    // streamMask now only contains the types that need a new fetcher created.
1407
1408    if (streamMask != 0) {
1409        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1410    }
1411
1412    // Find out when the original fetchers have buffered up to and start the new fetchers
1413    // at a later timestamp.
1414    for (size_t i = 0; i < kMaxStreams; i++) {
1415        if (!(indexToType(i) & streamMask)) {
1416            continue;
1417        }
1418
1419        AString uri;
1420        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1421
1422        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1423        CHECK(fetcher != NULL);
1424
1425        int64_t startTimeUs = -1;
1426        int64_t segmentStartTimeUs = -1ll;
1427        int32_t discontinuitySeq = -1;
1428        sp<AnotherPacketSource> sources[kMaxStreams];
1429
1430        if (i == kSubtitleIndex) {
1431            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1432        }
1433
1434        // TRICKY: looping from i as earlier streams are already removed from streamMask
1435        for (size_t j = i; j < kMaxStreams; ++j) {
1436            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1437            if ((streamMask & indexToType(j)) && uri == streamUri) {
1438                sources[j] = mPacketSources.valueFor(indexToType(j));
1439
1440                if (timeUs >= 0) {
1441                    startTimeUs = timeUs;
1442                } else {
1443                    int32_t type;
1444                    sp<AMessage> meta;
1445                    if (pickTrack) {
1446                        // selecting
1447                        meta = sources[j]->getLatestDequeuedMeta();
1448                    } else {
1449                        // adapting
1450                        meta = sources[j]->getLatestEnqueuedMeta();
1451                    }
1452
1453                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1454                        int64_t tmpUs;
1455                        int64_t tmpSegmentUs;
1456
1457                        CHECK(meta->findInt64("timeUs", &tmpUs));
1458                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
1459                        if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
1460                            startTimeUs = tmpUs;
1461                            segmentStartTimeUs = tmpSegmentUs;
1462                        } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
1463                            startTimeUs = tmpUs;
1464                        }
1465
1466                        int32_t seq;
1467                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1468                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1469                            discontinuitySeq = seq;
1470                        }
1471                    }
1472
1473                    if (pickTrack) {
1474                        // selecting track, queue discontinuities before content
1475                        sources[j]->clear();
1476                        if (j == kSubtitleIndex) {
1477                            break;
1478                        }
1479
1480                        ALOGV("stream[%zu]: queue format change", j);
1481
1482                        sources[j]->queueDiscontinuity(
1483                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1484                    } else {
1485                        // adapting, queue discontinuities after resume
1486                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1487                        sources[j]->clear();
1488                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1489                        if (extraStreams & indexToType(j)) {
1490                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1491                        }
1492                    }
1493                }
1494
1495                streamMask &= ~indexToType(j);
1496            }
1497        }
1498
1499        fetcher->startAsync(
1500                sources[kAudioIndex],
1501                sources[kVideoIndex],
1502                sources[kSubtitleIndex],
1503                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1504                segmentStartTimeUs,
1505                discontinuitySeq,
1506                switching);
1507    }
1508
1509    // All fetchers have now been started, the configuration change
1510    // has completed.
1511
1512    ALOGV("XXX configuration change completed.");
1513    mReconfigurationInProgress = false;
1514    if (switching) {
1515        mSwitchInProgress = true;
1516    } else {
1517        mStreamMask = mNewStreamMask;
1518    }
1519
1520    if (mDisconnectReplyID != NULL) {
1521        finishDisconnect();
1522    }
1523}
1524
1525void LiveSession::onSwapped(const sp<AMessage> &msg) {
1526    int32_t switchGeneration;
1527    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1528    if (switchGeneration != mSwitchGeneration) {
1529        return;
1530    }
1531
1532    int32_t stream;
1533    CHECK(msg->findInt32("stream", &stream));
1534
1535    ssize_t idx = typeToIndex(stream);
1536    CHECK(idx >= 0);
1537    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1538        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1539    }
1540    mStreams[idx].mUri = mStreams[idx].mNewUri;
1541    mStreams[idx].mNewUri.clear();
1542
1543    mSwapMask &= ~stream;
1544    if (mSwapMask != 0) {
1545        return;
1546    }
1547
1548    // Check if new variant contains extra streams.
1549    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1550    while (extraStreams) {
1551        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1552        swapPacketSource(extraStream);
1553        extraStreams &= ~extraStream;
1554
1555        idx = typeToIndex(extraStream);
1556        CHECK(idx >= 0);
1557        if (mStreams[idx].mNewUri.empty()) {
1558            ALOGW("swapping extra stream type %d %s to empty stream",
1559                    extraStream, mStreams[idx].mUri.c_str());
1560        }
1561        mStreams[idx].mUri = mStreams[idx].mNewUri;
1562        mStreams[idx].mNewUri.clear();
1563    }
1564
1565    tryToFinishBandwidthSwitch();
1566}
1567
1568void LiveSession::schedulePollBuffering() {
1569    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1570    msg->setInt32("generation", mPollBufferingGeneration);
1571    msg->post(1000000ll);
1572}
1573
1574void LiveSession::cancelPollBuffering() {
1575    ++mPollBufferingGeneration;
1576}
1577
1578void LiveSession::onPollBuffering() {
1579    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1580            "mInPreparationPhase %d, mStreamMask 0x%x",
1581        mSwitchInProgress, mReconfigurationInProgress,
1582        mInPreparationPhase, mStreamMask);
1583
1584    bool low, mid, high;
1585    if (checkBuffering(low, mid, high)) {
1586        if (mInPreparationPhase && mid) {
1587            postPrepared(OK);
1588        }
1589
1590        // don't switch before we report prepared
1591        if (!mInPreparationPhase && (low || high)) {
1592            switchBandwidthIfNeeded(high);
1593        }
1594    }
1595
1596    schedulePollBuffering();
1597}
1598
1599// Mark switch done when:
1600//   1. all old buffers are swapped out
1601void LiveSession::tryToFinishBandwidthSwitch() {
1602    if (!mSwitchInProgress) {
1603        return;
1604    }
1605
1606    bool needToRemoveFetchers = false;
1607    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1608        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1609            needToRemoveFetchers = true;
1610            break;
1611        }
1612    }
1613
1614    if (!needToRemoveFetchers && mSwapMask == 0) {
1615        ALOGI("mSwitchInProgress = false");
1616        mStreamMask = mNewStreamMask;
1617        mSwitchInProgress = false;
1618    }
1619}
1620
1621void LiveSession::cancelBandwidthSwitch() {
1622    Mutex::Autolock lock(mSwapMutex);
1623    mSwitchGeneration++;
1624    mSwitchInProgress = false;
1625    mSwapMask = 0;
1626
1627    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1628        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1629        if (info.mToBeRemoved) {
1630            info.mToBeRemoved = false;
1631        }
1632    }
1633
1634    for (size_t i = 0; i < kMaxStreams; ++i) {
1635        if (!mStreams[i].mNewUri.empty()) {
1636            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1637            if (j < 0) {
1638                mStreams[i].mNewUri.clear();
1639                continue;
1640            }
1641
1642            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1643            info.mFetcher->stopAsync();
1644            mFetcherInfos.removeItemsAt(j);
1645            mStreams[i].mNewUri.clear();
1646        }
1647    }
1648}
1649
1650bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
1651    low = mid = high = false;
1652
1653    if (mSwitchInProgress || mReconfigurationInProgress) {
1654        ALOGV("Switch/Reconfig in progress, defer buffer polling");
1655        return false;
1656    }
1657
1658    // TODO: Fine tune low/high mark.
1659    //       We also need to pause playback if buffering is too low.
1660    //       Currently during underflow, we depend on decoder to starve
1661    //       to pause, but A/V could have different buffering left,
1662    //       they're not paused together.
1663    // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE
1664
1665    // Switch down if any of the fetchers are below low mark;
1666    // Switch up   if all of the fetchers are over high mark.
1667    size_t activeCount, lowCount, midCount, highCount;
1668    activeCount = lowCount = midCount = highCount = 0;
1669    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1670        // we don't check subtitles for buffering level
1671        if (!(mStreamMask & mPacketSources.keyAt(i)
1672                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
1673            continue;
1674        }
1675        // ignore streams that never had any packet queued.
1676        // (it's possible that the variant only has audio or video)
1677        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1678        if (meta == NULL) {
1679            continue;
1680        }
1681
1682        ++activeCount;
1683        int64_t bufferedDurationUs =
1684                mPacketSources[i]->getEstimatedDurationUs();
1685        ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs);
1686        if (bufferedDurationUs < kLowWaterMark) {
1687            ++lowCount;
1688            break;
1689        } else if (bufferedDurationUs > kHighWaterMark) {
1690            ++midCount;
1691            ++highCount;
1692        } else if (bufferedDurationUs > kMidWaterMark) {
1693            ++midCount;
1694        }
1695    }
1696
1697    if (activeCount > 0) {
1698        high = (highCount == activeCount);
1699        mid = (midCount == activeCount);
1700        low = (lowCount > 0);
1701        return true;
1702    }
1703
1704    return false;
1705}
1706
1707void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) {
1708    ssize_t bandwidthIndex = getBandwidthIndex();
1709
1710    if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
1711            || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) {
1712        changeConfiguration(-1, bandwidthIndex, false);
1713    }
1714}
1715
1716void LiveSession::postPrepared(status_t err) {
1717    CHECK(mInPreparationPhase);
1718
1719    sp<AMessage> notify = mNotify->dup();
1720    if (err == OK || err == ERROR_END_OF_STREAM) {
1721        notify->setInt32("what", kWhatPrepared);
1722    } else {
1723        notify->setInt32("what", kWhatPreparationFailed);
1724        notify->setInt32("err", err);
1725    }
1726
1727    notify->post();
1728
1729    mInPreparationPhase = false;
1730}
1731
1732
1733}  // namespace android
1734
1735