LiveSession.cpp revision 98d53011c390ab0c3cb8d5d9e47251876174d5d4
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52// static
53// Number of recently-read bytes to use for bandwidth estimation
54const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
55// High water mark to start up switch or report prepared)
56const int64_t LiveSession::kHighWaterMark = 8000000ll;
57const int64_t LiveSession::kMidWaterMark = 5000000ll;
58const int64_t LiveSession::kLowWaterMark = 3000000ll;
59
60LiveSession::LiveSession(
61        const sp<AMessage> &notify, uint32_t flags,
62        const sp<IMediaHTTPService> &httpService)
63    : mNotify(notify),
64      mFlags(flags),
65      mHTTPService(httpService),
66      mInPreparationPhase(true),
67      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
68      mCurBandwidthIndex(-1),
69      mStreamMask(0),
70      mNewStreamMask(0),
71      mSwapMask(0),
72      mCheckBandwidthGeneration(0),
73      mSwitchGeneration(0),
74      mSubtitleGeneration(0),
75      mLastDequeuedTimeUs(0ll),
76      mRealTimeBaseUs(0ll),
77      mReconfigurationInProgress(false),
78      mSwitchInProgress(false),
79      mDisconnectReplyID(0),
80      mSeekReplyID(0),
81      mFirstTimeUsValid(false),
82      mFirstTimeUs(0),
83      mLastSeekTimeUs(0),
84      mPollBufferingGeneration(0) {
85
86    mStreams[kAudioIndex] = StreamItem("audio");
87    mStreams[kVideoIndex] = StreamItem("video");
88    mStreams[kSubtitleIndex] = StreamItem("subtitles");
89
90    for (size_t i = 0; i < kMaxStreams; ++i) {
91        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
92        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
93        mBuffering[i] = false;
94    }
95
96    size_t numHistoryItems = kBandwidthHistoryBytes /
97            PlaylistFetcher::kDownloadBlockSize + 1;
98    if (numHistoryItems < 5) {
99        numHistoryItems = 5;
100    }
101    mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
102}
103
104LiveSession::~LiveSession() {
105    if (mFetcherLooper != NULL) {
106        mFetcherLooper->stop();
107    }
108}
109
110sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
111    ABuffer *discontinuity = new ABuffer(0);
112    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
113    discontinuity->meta()->setInt32("swapPacketSource", swap);
114    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
115    discontinuity->meta()->setInt64("timeUs", -1);
116    return discontinuity;
117}
118
119void LiveSession::swapPacketSource(StreamType stream) {
120    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
121    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
122    sp<AnotherPacketSource> tmp = aps;
123    aps = aps2;
124    aps2 = tmp;
125    aps2->clear();
126}
127
128status_t LiveSession::dequeueAccessUnit(
129        StreamType stream, sp<ABuffer> *accessUnit) {
130    if (!(mStreamMask & stream)) {
131        // return -EWOULDBLOCK to avoid halting the decoder
132        // when switching between audio/video and audio only.
133        return -EWOULDBLOCK;
134    }
135
136    status_t finalResult = OK;
137    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
138
139    ssize_t idx = typeToIndex(stream);
140    if (!packetSource->hasBufferAvailable(&finalResult)) {
141        if (finalResult == OK) {
142            mBuffering[idx] = true;
143            return -EAGAIN;
144        } else {
145            return finalResult;
146        }
147    }
148
149    int32_t targetDuration = 0;
150    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
151    if (meta != NULL) {
152        meta->findInt32("targetDuration", &targetDuration);
153    }
154
155    int64_t targetDurationUs = targetDuration * 1000000ll;
156    if (targetDurationUs == 0 ||
157            targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
158        // Fetchers limit buffering to
159        // min(3 * targetDuration, kMinBufferedDurationUs)
160        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
161    }
162
163    if (mBuffering[idx]) {
164        if (mSwitchInProgress
165                || packetSource->isFinished(0)
166                || packetSource->hasBufferAvailable(&finalResult)) {
167            mBuffering[idx] = false;
168        }
169    }
170
171    if (mBuffering[idx]) {
172        return -EAGAIN;
173    }
174
175    // wait for counterpart
176    sp<AnotherPacketSource> otherSource;
177    uint32_t mask = mNewStreamMask & mStreamMask;
178    uint32_t fetchersMask  = 0;
179    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
180        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
181        fetchersMask |= fetcherMask;
182    }
183    mask &= fetchersMask;
184    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
185        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
186    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
187        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
188    }
189    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
190        return finalResult == OK ? -EAGAIN : finalResult;
191    }
192
193    status_t err = packetSource->dequeueAccessUnit(accessUnit);
194
195    size_t streamIdx;
196    const char *streamStr;
197    switch (stream) {
198        case STREAMTYPE_AUDIO:
199            streamIdx = kAudioIndex;
200            streamStr = "audio";
201            break;
202        case STREAMTYPE_VIDEO:
203            streamIdx = kVideoIndex;
204            streamStr = "video";
205            break;
206        case STREAMTYPE_SUBTITLES:
207            streamIdx = kSubtitleIndex;
208            streamStr = "subs";
209            break;
210        default:
211            TRESPASS();
212    }
213
214    StreamItem& strm = mStreams[streamIdx];
215    if (err == INFO_DISCONTINUITY) {
216        // adaptive streaming, discontinuities in the playlist
217        int32_t type;
218        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
219
220        sp<AMessage> extra;
221        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
222            extra.clear();
223        }
224
225        ALOGI("[%s] read discontinuity of type %d, extra = %s",
226              streamStr,
227              type,
228              extra == NULL ? "NULL" : extra->debugString().c_str());
229
230        int32_t swap;
231        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
232            int32_t switchGeneration;
233            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
234            {
235                Mutex::Autolock lock(mSwapMutex);
236                if (switchGeneration == mSwitchGeneration) {
237                    swapPacketSource(stream);
238                    sp<AMessage> msg = new AMessage(kWhatSwapped, this);
239                    msg->setInt32("stream", stream);
240                    msg->setInt32("switchGeneration", switchGeneration);
241                    msg->post();
242                }
243            }
244        } else {
245            size_t seq = strm.mCurDiscontinuitySeq;
246            int64_t offsetTimeUs;
247            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
248                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
249            } else {
250                offsetTimeUs = 0;
251            }
252
253            seq += 1;
254            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
255                int64_t firstTimeUs;
256                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
257                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
258                offsetTimeUs += strm.mLastSampleDurationUs;
259            } else {
260                offsetTimeUs += strm.mLastSampleDurationUs;
261            }
262
263            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
264        }
265    } else if (err == OK) {
266
267        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
268            int64_t timeUs;
269            int32_t discontinuitySeq = 0;
270            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
271            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
272            strm.mCurDiscontinuitySeq = discontinuitySeq;
273
274            int32_t discard = 0;
275            int64_t firstTimeUs;
276            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
277                int64_t durUs; // approximate sample duration
278                if (timeUs > strm.mLastDequeuedTimeUs) {
279                    durUs = timeUs - strm.mLastDequeuedTimeUs;
280                } else {
281                    durUs = strm.mLastDequeuedTimeUs - timeUs;
282                }
283                strm.mLastSampleDurationUs = durUs;
284                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
285            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
286                firstTimeUs = timeUs;
287            } else {
288                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
289                firstTimeUs = timeUs;
290            }
291
292            strm.mLastDequeuedTimeUs = timeUs;
293            if (timeUs >= firstTimeUs) {
294                timeUs -= firstTimeUs;
295            } else {
296                timeUs = 0;
297            }
298            timeUs += mLastSeekTimeUs;
299            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
300                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
301            }
302
303            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
304            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
305            mLastDequeuedTimeUs = timeUs;
306            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
307        } else if (stream == STREAMTYPE_SUBTITLES) {
308            int32_t subtitleGeneration;
309            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
310                    && subtitleGeneration != mSubtitleGeneration) {
311               return -EAGAIN;
312            };
313            (*accessUnit)->meta()->setInt32(
314                    "trackIndex", mPlaylist->getSelectedIndex());
315            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
316        }
317    } else {
318        ALOGI("[%s] encountered error %d", streamStr, err);
319    }
320
321    return err;
322}
323
324status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
325    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
326    if (!(mStreamMask & stream)) {
327        return UNKNOWN_ERROR;
328    }
329
330    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
331
332    sp<MetaData> meta = packetSource->getFormat();
333
334    if (meta == NULL) {
335        return -EAGAIN;
336    }
337
338    return convertMetaDataToMessage(meta, format);
339}
340
341void LiveSession::connectAsync(
342        const char *url, const KeyedVector<String8, String8> *headers) {
343    sp<AMessage> msg = new AMessage(kWhatConnect, this);
344    msg->setString("url", url);
345
346    if (headers != NULL) {
347        msg->setPointer(
348                "headers",
349                new KeyedVector<String8, String8>(*headers));
350    }
351
352    msg->post();
353}
354
355status_t LiveSession::disconnect() {
356    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
357
358    sp<AMessage> response;
359    status_t err = msg->postAndAwaitResponse(&response);
360
361    return err;
362}
363
364status_t LiveSession::seekTo(int64_t timeUs) {
365    sp<AMessage> msg = new AMessage(kWhatSeek, this);
366    msg->setInt64("timeUs", timeUs);
367
368    sp<AMessage> response;
369    status_t err = msg->postAndAwaitResponse(&response);
370
371    return err;
372}
373
374void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
375    switch (msg->what()) {
376        case kWhatConnect:
377        {
378            onConnect(msg);
379            break;
380        }
381
382        case kWhatDisconnect:
383        {
384            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
385
386            if (mReconfigurationInProgress) {
387                break;
388            }
389
390            finishDisconnect();
391            break;
392        }
393
394        case kWhatSeek:
395        {
396            sp<AReplyToken> seekReplyID;
397            CHECK(msg->senderAwaitsResponse(&seekReplyID));
398            mSeekReplyID = seekReplyID;
399            mSeekReply = new AMessage;
400
401            status_t err = onSeek(msg);
402
403            if (err != OK) {
404                msg->post(50000);
405            }
406            break;
407        }
408
409        case kWhatFetcherNotify:
410        {
411            int32_t what;
412            CHECK(msg->findInt32("what", &what));
413
414            switch (what) {
415                case PlaylistFetcher::kWhatStarted:
416                    break;
417                case PlaylistFetcher::kWhatPaused:
418                case PlaylistFetcher::kWhatStopped:
419                {
420                    if (what == PlaylistFetcher::kWhatStopped) {
421                        AString uri;
422                        CHECK(msg->findString("uri", &uri));
423                        ssize_t index = mFetcherInfos.indexOfKey(uri);
424                        if (index < 0) {
425                            // ignore duplicated kWhatStopped messages.
426                            break;
427                        }
428
429                        mFetcherLooper->unregisterHandler(
430                                mFetcherInfos[index].mFetcher->id());
431                        mFetcherInfos.removeItemsAt(index);
432
433                        if (mSwitchInProgress) {
434                            tryToFinishBandwidthSwitch();
435                        }
436                    }
437
438                    if (mContinuation != NULL) {
439                        CHECK_GT(mContinuationCounter, 0);
440                        if (--mContinuationCounter == 0) {
441                            mContinuation->post();
442                        }
443                    }
444                    break;
445                }
446
447                case PlaylistFetcher::kWhatDurationUpdate:
448                {
449                    AString uri;
450                    CHECK(msg->findString("uri", &uri));
451
452                    int64_t durationUs;
453                    CHECK(msg->findInt64("durationUs", &durationUs));
454
455                    ssize_t index = mFetcherInfos.indexOfKey(uri);
456                    if (index >= 0) {
457                        FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
458                        info->mDurationUs = durationUs;
459                    }
460                    break;
461                }
462
463                case PlaylistFetcher::kWhatError:
464                {
465                    status_t err;
466                    CHECK(msg->findInt32("err", &err));
467
468                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
469
470                    // handle EOS on subtitle tracks independently
471                    AString uri;
472                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
473                        ssize_t i = mFetcherInfos.indexOfKey(uri);
474                        if (i >= 0) {
475                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
476                            if (fetcher != NULL) {
477                                uint32_t type = fetcher->getStreamTypeMask();
478                                if (type == STREAMTYPE_SUBTITLES) {
479                                    mPacketSources.valueFor(
480                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
481                                    break;
482                                }
483                            }
484                        }
485                    }
486
487                    if (mInPreparationPhase) {
488                        postPrepared(err);
489                    }
490
491                    cancelBandwidthSwitch();
492
493                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
494
495                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
496
497                    mPacketSources.valueFor(
498                            STREAMTYPE_SUBTITLES)->signalEOS(err);
499
500                    sp<AMessage> notify = mNotify->dup();
501                    notify->setInt32("what", kWhatError);
502                    notify->setInt32("err", err);
503                    notify->post();
504                    break;
505                }
506
507                case PlaylistFetcher::kWhatStartedAt:
508                {
509                    int32_t switchGeneration;
510                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
511
512                    if (switchGeneration != mSwitchGeneration) {
513                        break;
514                    }
515
516                    // Resume fetcher for the original variant; the resumed fetcher should
517                    // continue until the timestamps found in msg, which is stored by the
518                    // new fetcher to indicate where the new variant has started buffering.
519                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
520                        const FetcherInfo info = mFetcherInfos.valueAt(i);
521                        if (info.mToBeRemoved) {
522                            info.mFetcher->resumeUntilAsync(msg);
523                        }
524                    }
525                    break;
526                }
527
528                default:
529                    TRESPASS();
530            }
531
532            break;
533        }
534
535        case kWhatChangeConfiguration:
536        {
537            onChangeConfiguration(msg);
538            break;
539        }
540
541        case kWhatChangeConfiguration2:
542        {
543            onChangeConfiguration2(msg);
544            break;
545        }
546
547        case kWhatChangeConfiguration3:
548        {
549            onChangeConfiguration3(msg);
550            break;
551        }
552
553        case kWhatFinishDisconnect2:
554        {
555            onFinishDisconnect2();
556            break;
557        }
558
559        case kWhatSwapped:
560        {
561            onSwapped(msg);
562            break;
563        }
564
565        case kWhatPollBuffering:
566        {
567            int32_t generation;
568            CHECK(msg->findInt32("generation", &generation));
569            if (generation == mPollBufferingGeneration) {
570                onPollBuffering();
571            }
572            break;
573        }
574
575        default:
576            TRESPASS();
577            break;
578    }
579}
580
581// static
582int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
583    if (a->mBandwidth < b->mBandwidth) {
584        return -1;
585    } else if (a->mBandwidth == b->mBandwidth) {
586        return 0;
587    }
588
589    return 1;
590}
591
592// static
593LiveSession::StreamType LiveSession::indexToType(int idx) {
594    CHECK(idx >= 0 && idx < kMaxStreams);
595    return (StreamType)(1 << idx);
596}
597
598// static
599ssize_t LiveSession::typeToIndex(int32_t type) {
600    switch (type) {
601        case STREAMTYPE_AUDIO:
602            return 0;
603        case STREAMTYPE_VIDEO:
604            return 1;
605        case STREAMTYPE_SUBTITLES:
606            return 2;
607        default:
608            return -1;
609    };
610    return -1;
611}
612
613void LiveSession::onConnect(const sp<AMessage> &msg) {
614    AString url;
615    CHECK(msg->findString("url", &url));
616
617    KeyedVector<String8, String8> *headers = NULL;
618    if (!msg->findPointer("headers", (void **)&headers)) {
619        mExtraHeaders.clear();
620    } else {
621        mExtraHeaders = *headers;
622
623        delete headers;
624        headers = NULL;
625    }
626
627    // TODO currently we don't know if we are coming here from incognito mode
628    ALOGI("onConnect %s", uriDebugString(url).c_str());
629
630    mMasterURL = url;
631
632    bool dummy;
633    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
634
635    if (mPlaylist == NULL) {
636        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
637
638        postPrepared(ERROR_IO);
639        return;
640    }
641
642    // create looper for fetchers
643    if (mFetcherLooper == NULL) {
644        mFetcherLooper = new ALooper();
645
646        mFetcherLooper->setName("Fetcher");
647        mFetcherLooper->start(false, false);
648    }
649
650    // We trust the content provider to make a reasonable choice of preferred
651    // initial bandwidth by listing it first in the variant playlist.
652    // At startup we really don't have a good estimate on the available
653    // network bandwidth since we haven't tranferred any data yet. Once
654    // we have we can make a better informed choice.
655    size_t initialBandwidth = 0;
656    size_t initialBandwidthIndex = 0;
657
658    if (mPlaylist->isVariantPlaylist()) {
659        for (size_t i = 0; i < mPlaylist->size(); ++i) {
660            BandwidthItem item;
661
662            item.mPlaylistIndex = i;
663
664            sp<AMessage> meta;
665            AString uri;
666            mPlaylist->itemAt(i, &uri, &meta);
667
668            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
669
670            if (initialBandwidth == 0) {
671                initialBandwidth = item.mBandwidth;
672            }
673
674            mBandwidthItems.push(item);
675        }
676
677        CHECK_GT(mBandwidthItems.size(), 0u);
678
679        mBandwidthItems.sort(SortByBandwidth);
680
681        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
682            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
683                initialBandwidthIndex = i;
684                break;
685            }
686        }
687    } else {
688        // dummy item.
689        BandwidthItem item;
690        item.mPlaylistIndex = 0;
691        item.mBandwidth = 0;
692        mBandwidthItems.push(item);
693    }
694
695    mPlaylist->pickRandomMediaItems();
696    changeConfiguration(
697            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
698
699    schedulePollBuffering();
700}
701
702void LiveSession::finishDisconnect() {
703    // No reconfiguration is currently pending, make sure none will trigger
704    // during disconnection either.
705
706    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
707    // (finishDisconnect, onFinishDisconnect2)
708    cancelBandwidthSwitch();
709
710    // cancel buffer polling
711    cancelPollBuffering();
712
713    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
714        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
715    }
716
717    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
718
719    mContinuationCounter = mFetcherInfos.size();
720    mContinuation = msg;
721
722    if (mContinuationCounter == 0) {
723        msg->post();
724    }
725}
726
727void LiveSession::onFinishDisconnect2() {
728    mContinuation.clear();
729
730    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
731    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
732
733    mPacketSources.valueFor(
734            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
735
736    sp<AMessage> response = new AMessage;
737    response->setInt32("err", OK);
738
739    response->postReply(mDisconnectReplyID);
740    mDisconnectReplyID = 0;
741}
742
743sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
744    ssize_t index = mFetcherInfos.indexOfKey(uri);
745
746    if (index >= 0) {
747        return NULL;
748    }
749
750    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
751    notify->setString("uri", uri);
752    notify->setInt32("switchGeneration", mSwitchGeneration);
753
754    FetcherInfo info;
755    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
756    info.mDurationUs = -1ll;
757    info.mIsPrepared = false;
758    info.mToBeRemoved = false;
759    mFetcherLooper->registerHandler(info.mFetcher);
760
761    mFetcherInfos.add(uri, info);
762
763    return info.mFetcher;
764}
765
766/*
767 * Illustration of parameters:
768 *
769 * 0      `range_offset`
770 * +------------+-------------------------------------------------------+--+--+
771 * |            |                                 | next block to fetch |  |  |
772 * |            | `source` handle => `out` buffer |                     |  |  |
773 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
774 * |            |<----------- `range_length` / buffer capacity ----------->|  |
775 * |<------------------------------ file_size ------------------------------->|
776 *
777 * Special parameter values:
778 * - range_length == -1 means entire file
779 * - block_size == 0 means entire range
780 *
781 */
782ssize_t LiveSession::fetchFile(
783        const char *url, sp<ABuffer> *out,
784        int64_t range_offset, int64_t range_length,
785        uint32_t block_size, /* download block size */
786        sp<DataSource> *source, /* to return and reuse source */
787        String8 *actualUrl) {
788    off64_t size;
789    sp<DataSource> temp_source;
790    if (source == NULL) {
791        source = &temp_source;
792    }
793
794    if (*source == NULL) {
795        if (!strncasecmp(url, "file://", 7)) {
796            *source = new FileSource(url + 7);
797        } else if (strncasecmp(url, "http://", 7)
798                && strncasecmp(url, "https://", 8)) {
799            return ERROR_UNSUPPORTED;
800        } else {
801            KeyedVector<String8, String8> headers = mExtraHeaders;
802            if (range_offset > 0 || range_length >= 0) {
803                headers.add(
804                        String8("Range"),
805                        String8(
806                            AStringPrintf(
807                                "bytes=%lld-%s",
808                                range_offset,
809                                range_length < 0
810                                    ? "" : AStringPrintf("%lld",
811                                            range_offset + range_length - 1).c_str()).c_str()));
812            }
813            status_t err = mHTTPDataSource->connect(url, &headers);
814
815            if (err != OK) {
816                return err;
817            }
818
819            *source = mHTTPDataSource;
820        }
821    }
822
823    status_t getSizeErr = (*source)->getSize(&size);
824    if (getSizeErr != OK) {
825        size = 65536;
826    }
827
828    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
829    if (*out == NULL) {
830        buffer->setRange(0, 0);
831    }
832
833    ssize_t bytesRead = 0;
834    // adjust range_length if only reading partial block
835    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
836        range_length = buffer->size() + block_size;
837    }
838    for (;;) {
839        // Only resize when we don't know the size.
840        size_t bufferRemaining = buffer->capacity() - buffer->size();
841        if (bufferRemaining == 0 && getSizeErr != OK) {
842            size_t bufferIncrement = buffer->size() / 2;
843            if (bufferIncrement < 32768) {
844                bufferIncrement = 32768;
845            }
846            bufferRemaining = bufferIncrement;
847
848            ALOGV("increasing download buffer to %zu bytes",
849                 buffer->size() + bufferRemaining);
850
851            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
852            memcpy(copy->data(), buffer->data(), buffer->size());
853            copy->setRange(0, buffer->size());
854
855            buffer = copy;
856        }
857
858        size_t maxBytesToRead = bufferRemaining;
859        if (range_length >= 0) {
860            int64_t bytesLeftInRange = range_length - buffer->size();
861            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
862                maxBytesToRead = bytesLeftInRange;
863
864                if (bytesLeftInRange == 0) {
865                    break;
866                }
867            }
868        }
869
870        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
871        // to help us break out of the loop.
872        ssize_t n = (*source)->readAt(
873                buffer->size(), buffer->data() + buffer->size(),
874                maxBytesToRead);
875
876        if (n < 0) {
877            return n;
878        }
879
880        if (n == 0) {
881            break;
882        }
883
884        buffer->setRange(0, buffer->size() + (size_t)n);
885        bytesRead += n;
886    }
887
888    *out = buffer;
889    if (actualUrl != NULL) {
890        *actualUrl = (*source)->getUri();
891        if (actualUrl->isEmpty()) {
892            *actualUrl = url;
893        }
894    }
895
896    return bytesRead;
897}
898
899sp<M3UParser> LiveSession::fetchPlaylist(
900        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
901    ALOGV("fetchPlaylist '%s'", url);
902
903    *unchanged = false;
904
905    sp<ABuffer> buffer;
906    String8 actualUrl;
907    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
908
909    if (err <= 0) {
910        return NULL;
911    }
912
913    // MD5 functionality is not available on the simulator, treat all
914    // playlists as changed.
915
916#if defined(HAVE_ANDROID_OS)
917    uint8_t hash[16];
918
919    MD5_CTX m;
920    MD5_Init(&m);
921    MD5_Update(&m, buffer->data(), buffer->size());
922
923    MD5_Final(hash, &m);
924
925    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
926        // playlist unchanged
927        *unchanged = true;
928
929        return NULL;
930    }
931
932    if (curPlaylistHash != NULL) {
933        memcpy(curPlaylistHash, hash, sizeof(hash));
934    }
935#endif
936
937    sp<M3UParser> playlist =
938        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
939
940    if (playlist->initCheck() != OK) {
941        ALOGE("failed to parse .m3u8 playlist");
942
943        return NULL;
944    }
945
946    return playlist;
947}
948
949#if 0
950static double uniformRand() {
951    return (double)rand() / RAND_MAX;
952}
953#endif
954
955size_t LiveSession::getBandwidthIndex() {
956    if (mBandwidthItems.size() == 0) {
957        return 0;
958    }
959
960#if 1
961    char value[PROPERTY_VALUE_MAX];
962    ssize_t index = -1;
963    if (property_get("media.httplive.bw-index", value, NULL)) {
964        char *end;
965        index = strtol(value, &end, 10);
966        CHECK(end > value && *end == '\0');
967
968        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
969            index = mBandwidthItems.size() - 1;
970        }
971    }
972
973    if (index < 0) {
974        int32_t bandwidthBps;
975        if (mHTTPDataSource != NULL
976                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
977            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
978        } else {
979            ALOGV("no bandwidth estimate.");
980            return 0;  // Pick the lowest bandwidth stream by default.
981        }
982
983        char value[PROPERTY_VALUE_MAX];
984        if (property_get("media.httplive.max-bw", value, NULL)) {
985            char *end;
986            long maxBw = strtoul(value, &end, 10);
987            if (end > value && *end == '\0') {
988                if (maxBw > 0 && bandwidthBps > maxBw) {
989                    ALOGV("bandwidth capped to %ld bps", maxBw);
990                    bandwidthBps = maxBw;
991                }
992            }
993        }
994
995        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
996
997        index = mBandwidthItems.size() - 1;
998        while (index > 0) {
999            // consider only 80% of the available bandwidth, but if we are switching up,
1000            // be even more conservative (70%) to avoid overestimating and immediately
1001            // switching back.
1002            size_t adjustedBandwidthBps = bandwidthBps;
1003            if (index > mCurBandwidthIndex) {
1004                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1005            } else {
1006                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1007            }
1008            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1009                break;
1010            }
1011            --index;
1012        }
1013    }
1014#elif 0
1015    // Change bandwidth at random()
1016    size_t index = uniformRand() * mBandwidthItems.size();
1017#elif 0
1018    // There's a 50% chance to stay on the current bandwidth and
1019    // a 50% chance to switch to the next higher bandwidth (wrapping around
1020    // to lowest)
1021    const size_t kMinIndex = 0;
1022
1023    static ssize_t mCurBandwidthIndex = -1;
1024
1025    size_t index;
1026    if (mCurBandwidthIndex < 0) {
1027        index = kMinIndex;
1028    } else if (uniformRand() < 0.5) {
1029        index = (size_t)mCurBandwidthIndex;
1030    } else {
1031        index = mCurBandwidthIndex + 1;
1032        if (index == mBandwidthItems.size()) {
1033            index = kMinIndex;
1034        }
1035    }
1036    mCurBandwidthIndex = index;
1037#elif 0
1038    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1039
1040    size_t index = mBandwidthItems.size() - 1;
1041    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1042        --index;
1043    }
1044#elif 1
1045    char value[PROPERTY_VALUE_MAX];
1046    size_t index;
1047    if (property_get("media.httplive.bw-index", value, NULL)) {
1048        char *end;
1049        index = strtoul(value, &end, 10);
1050        CHECK(end > value && *end == '\0');
1051
1052        if (index >= mBandwidthItems.size()) {
1053            index = mBandwidthItems.size() - 1;
1054        }
1055    } else {
1056        index = 0;
1057    }
1058#else
1059    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1060#endif
1061
1062    CHECK_GE(index, 0);
1063
1064    return index;
1065}
1066
1067int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1068    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1069    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1070    if (audioMeta != NULL) {
1071        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1072    }
1073
1074    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1075    if (videoMeta != NULL
1076            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1077        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1078            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1079        }
1080
1081    }
1082    return minSegmentStartTimeUs;
1083}
1084
1085status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1086    int64_t timeUs;
1087    CHECK(msg->findInt64("timeUs", &timeUs));
1088
1089    if (!mReconfigurationInProgress) {
1090        changeConfiguration(timeUs, mCurBandwidthIndex);
1091        return OK;
1092    } else {
1093        return -EWOULDBLOCK;
1094    }
1095}
1096
1097status_t LiveSession::getDuration(int64_t *durationUs) const {
1098    int64_t maxDurationUs = -1ll;
1099    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1100        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1101
1102        if (fetcherDurationUs > maxDurationUs) {
1103            maxDurationUs = fetcherDurationUs;
1104        }
1105    }
1106
1107    *durationUs = maxDurationUs;
1108
1109    return OK;
1110}
1111
1112bool LiveSession::isSeekable() const {
1113    int64_t durationUs;
1114    return getDuration(&durationUs) == OK && durationUs >= 0;
1115}
1116
1117bool LiveSession::hasDynamicDuration() const {
1118    return false;
1119}
1120
1121size_t LiveSession::getTrackCount() const {
1122    if (mPlaylist == NULL) {
1123        return 0;
1124    } else {
1125        return mPlaylist->getTrackCount();
1126    }
1127}
1128
1129sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1130    if (mPlaylist == NULL) {
1131        return NULL;
1132    } else {
1133        return mPlaylist->getTrackInfo(trackIndex);
1134    }
1135}
1136
1137status_t LiveSession::selectTrack(size_t index, bool select) {
1138    if (mPlaylist == NULL) {
1139        return INVALID_OPERATION;
1140    }
1141
1142    ++mSubtitleGeneration;
1143    status_t err = mPlaylist->selectTrack(index, select);
1144    if (err == OK) {
1145        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1146        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1147        msg->setInt32("pickTrack", select);
1148        msg->post();
1149    }
1150    return err;
1151}
1152
1153ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1154    if (mPlaylist == NULL) {
1155        return -1;
1156    } else {
1157        return mPlaylist->getSelectedTrack(type);
1158    }
1159}
1160
1161void LiveSession::changeConfiguration(
1162        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1163    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1164    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1165    cancelBandwidthSwitch();
1166
1167    CHECK(!mReconfigurationInProgress);
1168    mReconfigurationInProgress = true;
1169
1170    mCurBandwidthIndex = bandwidthIndex;
1171
1172    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1173          timeUs, bandwidthIndex, pickTrack);
1174
1175    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1176    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1177
1178    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1179    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1180
1181    AString URIs[kMaxStreams];
1182    for (size_t i = 0; i < kMaxStreams; ++i) {
1183        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1184            streamMask |= indexToType(i);
1185        }
1186    }
1187
1188    // Step 1, stop and discard fetchers that are no longer needed.
1189    // Pause those that we'll reuse.
1190    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1191        const AString &uri = mFetcherInfos.keyAt(i);
1192
1193        bool discardFetcher = true;
1194
1195        // If we're seeking all current fetchers are discarded.
1196        if (timeUs < 0ll) {
1197            // delay fetcher removal if not picking tracks
1198            discardFetcher = pickTrack;
1199
1200            for (size_t j = 0; j < kMaxStreams; ++j) {
1201                StreamType type = indexToType(j);
1202                if ((streamMask & type) && uri == URIs[j]) {
1203                    resumeMask |= type;
1204                    streamMask &= ~type;
1205                    discardFetcher = false;
1206                }
1207            }
1208        }
1209
1210        if (discardFetcher) {
1211            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1212        } else {
1213            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1214        }
1215    }
1216
1217    sp<AMessage> msg;
1218    if (timeUs < 0ll) {
1219        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1220        msg = new AMessage(kWhatChangeConfiguration3, this);
1221    } else {
1222        msg = new AMessage(kWhatChangeConfiguration2, this);
1223    }
1224    msg->setInt32("streamMask", streamMask);
1225    msg->setInt32("resumeMask", resumeMask);
1226    msg->setInt32("pickTrack", pickTrack);
1227    msg->setInt64("timeUs", timeUs);
1228    for (size_t i = 0; i < kMaxStreams; ++i) {
1229        if ((streamMask | resumeMask) & indexToType(i)) {
1230            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1231        }
1232    }
1233
1234    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1235    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1236    // fetchers have completed their asynchronous operation, we'll post
1237    // mContinuation, which then is handled below in onChangeConfiguration2.
1238    mContinuationCounter = mFetcherInfos.size();
1239    mContinuation = msg;
1240
1241    if (mContinuationCounter == 0) {
1242        msg->post();
1243    }
1244}
1245
1246void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1247    if (!mReconfigurationInProgress) {
1248        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1249        msg->findInt32("pickTrack", &pickTrack);
1250        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1251        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1252    } else {
1253        msg->post(1000000ll); // retry in 1 sec
1254    }
1255}
1256
1257void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1258    mContinuation.clear();
1259
1260    // All fetchers are either suspended or have been removed now.
1261
1262    // If we're seeking, clear all packet sources before we report
1263    // seek complete, to prevent decoder from pulling stale data.
1264    int64_t timeUs;
1265    CHECK(msg->findInt64("timeUs", &timeUs));
1266
1267    if (timeUs >= 0) {
1268        mLastSeekTimeUs = timeUs;
1269
1270        for (size_t i = 0; i < mPacketSources.size(); i++) {
1271            mPacketSources.editValueAt(i)->clear();
1272        }
1273
1274        mDiscontinuityOffsetTimesUs.clear();
1275        mDiscontinuityAbsStartTimesUs.clear();
1276
1277        if (mSeekReplyID != 0) {
1278            CHECK(mSeekReply != NULL);
1279            mSeekReply->setInt32("err", OK);
1280            mSeekReply->postReply(mSeekReplyID);
1281            mSeekReplyID = 0;
1282            mSeekReply.clear();
1283        }
1284    }
1285
1286    uint32_t streamMask, resumeMask;
1287    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1288    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1289
1290    // currently onChangeConfiguration2 is only called for seeking;
1291    // remove the following CHECK if using it else where.
1292    CHECK_EQ(resumeMask, 0);
1293    streamMask |= resumeMask;
1294
1295    AString URIs[kMaxStreams];
1296    for (size_t i = 0; i < kMaxStreams; ++i) {
1297        if (streamMask & indexToType(i)) {
1298            const AString &uriKey = mStreams[i].uriKey();
1299            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1300            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1301        }
1302    }
1303
1304    // Determine which decoders to shutdown on the player side,
1305    // a decoder has to be shutdown if either
1306    // 1) its streamtype was active before but now longer isn't.
1307    // or
1308    // 2) its streamtype was already active and still is but the URI
1309    //    has changed.
1310    uint32_t changedMask = 0;
1311    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1312        if (((mStreamMask & streamMask & indexToType(i))
1313                && !(URIs[i] == mStreams[i].mUri))
1314                || (mStreamMask & ~streamMask & indexToType(i))) {
1315            changedMask |= indexToType(i);
1316        }
1317    }
1318
1319    if (changedMask == 0) {
1320        // If nothing changed as far as the audio/video decoders
1321        // are concerned we can proceed.
1322        onChangeConfiguration3(msg);
1323        return;
1324    }
1325
1326    // Something changed, inform the player which will shutdown the
1327    // corresponding decoders and will post the reply once that's done.
1328    // Handling the reply will continue executing below in
1329    // onChangeConfiguration3.
1330    sp<AMessage> notify = mNotify->dup();
1331    notify->setInt32("what", kWhatStreamsChanged);
1332    notify->setInt32("changedMask", changedMask);
1333
1334    msg->setWhat(kWhatChangeConfiguration3);
1335    msg->setTarget(this);
1336
1337    notify->setMessage("reply", msg);
1338    notify->post();
1339}
1340
1341void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1342    mContinuation.clear();
1343    // All remaining fetchers are still suspended, the player has shutdown
1344    // any decoders that needed it.
1345
1346    uint32_t streamMask, resumeMask;
1347    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1348    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1349
1350    int64_t timeUs;
1351    int32_t pickTrack;
1352    bool switching = false;
1353    CHECK(msg->findInt64("timeUs", &timeUs));
1354    CHECK(msg->findInt32("pickTrack", &pickTrack));
1355
1356    if (timeUs < 0ll) {
1357        if (!pickTrack) {
1358            switching = true;
1359        }
1360        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1361    } else {
1362        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1363    }
1364
1365    for (size_t i = 0; i < kMaxStreams; ++i) {
1366        if (streamMask & indexToType(i)) {
1367            if (switching) {
1368                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1369            } else {
1370                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1371            }
1372        }
1373    }
1374
1375    mNewStreamMask = streamMask | resumeMask;
1376    if (switching) {
1377        mSwapMask = mStreamMask & ~resumeMask;
1378    }
1379
1380    // Of all existing fetchers:
1381    // * Resume fetchers that are still needed and assign them original packet sources.
1382    // * Mark otherwise unneeded fetchers for removal.
1383    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1384    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1385        const AString &uri = mFetcherInfos.keyAt(i);
1386
1387        sp<AnotherPacketSource> sources[kMaxStreams];
1388        for (size_t j = 0; j < kMaxStreams; ++j) {
1389            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1390                sources[j] = mPacketSources.valueFor(indexToType(j));
1391            }
1392        }
1393        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1394        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1395                || sources[kSubtitleIndex] != NULL) {
1396            info.mFetcher->startAsync(
1397                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1398        } else {
1399            info.mToBeRemoved = true;
1400        }
1401    }
1402
1403    // streamMask now only contains the types that need a new fetcher created.
1404
1405    if (streamMask != 0) {
1406        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1407    }
1408
1409    // Find out when the original fetchers have buffered up to and start the new fetchers
1410    // at a later timestamp.
1411    for (size_t i = 0; i < kMaxStreams; i++) {
1412        if (!(indexToType(i) & streamMask)) {
1413            continue;
1414        }
1415
1416        AString uri;
1417        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1418
1419        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1420        CHECK(fetcher != NULL);
1421
1422        int64_t startTimeUs = -1;
1423        int64_t segmentStartTimeUs = -1ll;
1424        int32_t discontinuitySeq = -1;
1425        sp<AnotherPacketSource> sources[kMaxStreams];
1426
1427        if (i == kSubtitleIndex) {
1428            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1429        }
1430
1431        // TRICKY: looping from i as earlier streams are already removed from streamMask
1432        for (size_t j = i; j < kMaxStreams; ++j) {
1433            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1434            if ((streamMask & indexToType(j)) && uri == streamUri) {
1435                sources[j] = mPacketSources.valueFor(indexToType(j));
1436
1437                if (timeUs >= 0) {
1438                    startTimeUs = timeUs;
1439                } else {
1440                    int32_t type;
1441                    sp<AMessage> meta;
1442                    if (pickTrack) {
1443                        // selecting
1444                        meta = sources[j]->getLatestDequeuedMeta();
1445                    } else {
1446                        // adapting
1447                        meta = sources[j]->getLatestEnqueuedMeta();
1448                    }
1449
1450                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1451                        int64_t tmpUs;
1452                        int64_t tmpSegmentUs;
1453
1454                        CHECK(meta->findInt64("timeUs", &tmpUs));
1455                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
1456                        if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
1457                            startTimeUs = tmpUs;
1458                            segmentStartTimeUs = tmpSegmentUs;
1459                        } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
1460                            startTimeUs = tmpUs;
1461                        }
1462
1463                        int32_t seq;
1464                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1465                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1466                            discontinuitySeq = seq;
1467                        }
1468                    }
1469
1470                    if (pickTrack) {
1471                        // selecting track, queue discontinuities before content
1472                        sources[j]->clear();
1473                        if (j == kSubtitleIndex) {
1474                            break;
1475                        }
1476
1477                        ALOGV("stream[%d]: queue format change", j);
1478
1479                        sources[j]->queueDiscontinuity(
1480                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1481                    } else {
1482                        // adapting, queue discontinuities after resume
1483                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1484                        sources[j]->clear();
1485                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1486                        if (extraStreams & indexToType(j)) {
1487                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1488                        }
1489                    }
1490                }
1491
1492                streamMask &= ~indexToType(j);
1493            }
1494        }
1495
1496        fetcher->startAsync(
1497                sources[kAudioIndex],
1498                sources[kVideoIndex],
1499                sources[kSubtitleIndex],
1500                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1501                segmentStartTimeUs,
1502                discontinuitySeq,
1503                switching);
1504    }
1505
1506    // All fetchers have now been started, the configuration change
1507    // has completed.
1508
1509    ALOGV("XXX configuration change completed.");
1510    mReconfigurationInProgress = false;
1511    if (switching) {
1512        mSwitchInProgress = true;
1513    } else {
1514        mStreamMask = mNewStreamMask;
1515    }
1516
1517    if (mDisconnectReplyID != 0) {
1518        finishDisconnect();
1519    }
1520}
1521
1522void LiveSession::onSwapped(const sp<AMessage> &msg) {
1523    int32_t switchGeneration;
1524    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1525    if (switchGeneration != mSwitchGeneration) {
1526        return;
1527    }
1528
1529    int32_t stream;
1530    CHECK(msg->findInt32("stream", &stream));
1531
1532    ssize_t idx = typeToIndex(stream);
1533    CHECK(idx >= 0);
1534    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1535        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1536    }
1537    mStreams[idx].mUri = mStreams[idx].mNewUri;
1538    mStreams[idx].mNewUri.clear();
1539
1540    mSwapMask &= ~stream;
1541    if (mSwapMask != 0) {
1542        return;
1543    }
1544
1545    // Check if new variant contains extra streams.
1546    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1547    while (extraStreams) {
1548        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1549        swapPacketSource(extraStream);
1550        extraStreams &= ~extraStream;
1551
1552        idx = typeToIndex(extraStream);
1553        CHECK(idx >= 0);
1554        if (mStreams[idx].mNewUri.empty()) {
1555            ALOGW("swapping extra stream type %d %s to empty stream",
1556                    extraStream, mStreams[idx].mUri.c_str());
1557        }
1558        mStreams[idx].mUri = mStreams[idx].mNewUri;
1559        mStreams[idx].mNewUri.clear();
1560    }
1561
1562    tryToFinishBandwidthSwitch();
1563}
1564
1565void LiveSession::schedulePollBuffering() {
1566    sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
1567    msg->setInt32("generation", mPollBufferingGeneration);
1568    msg->post(1000000ll);
1569}
1570
1571void LiveSession::cancelPollBuffering() {
1572    ++mPollBufferingGeneration;
1573}
1574
1575void LiveSession::onPollBuffering() {
1576    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
1577            "mInPreparationPhase %d, mStreamMask 0x%x",
1578        mSwitchInProgress, mReconfigurationInProgress,
1579        mInPreparationPhase, mStreamMask);
1580
1581    bool low, mid, high;
1582    if (checkBuffering(low, mid, high)) {
1583        if (mInPreparationPhase && mid) {
1584            postPrepared(OK);
1585        }
1586
1587        // don't switch before we report prepared
1588        if (!mInPreparationPhase && (low || high)) {
1589            switchBandwidthIfNeeded(high);
1590        }
1591    }
1592
1593    schedulePollBuffering();
1594}
1595
1596// Mark switch done when:
1597//   1. all old buffers are swapped out
1598void LiveSession::tryToFinishBandwidthSwitch() {
1599    if (!mSwitchInProgress) {
1600        return;
1601    }
1602
1603    bool needToRemoveFetchers = false;
1604    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1605        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1606            needToRemoveFetchers = true;
1607            break;
1608        }
1609    }
1610
1611    if (!needToRemoveFetchers && mSwapMask == 0) {
1612        ALOGI("mSwitchInProgress = false");
1613        mStreamMask = mNewStreamMask;
1614        mSwitchInProgress = false;
1615    }
1616}
1617
1618void LiveSession::cancelBandwidthSwitch() {
1619    Mutex::Autolock lock(mSwapMutex);
1620    mSwitchGeneration++;
1621    mSwitchInProgress = false;
1622    mSwapMask = 0;
1623
1624    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1625        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1626        if (info.mToBeRemoved) {
1627            info.mToBeRemoved = false;
1628        }
1629    }
1630
1631    for (size_t i = 0; i < kMaxStreams; ++i) {
1632        if (!mStreams[i].mNewUri.empty()) {
1633            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1634            if (j < 0) {
1635                mStreams[i].mNewUri.clear();
1636                continue;
1637            }
1638
1639            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1640            info.mFetcher->stopAsync();
1641            mFetcherInfos.removeItemsAt(j);
1642            mStreams[i].mNewUri.clear();
1643        }
1644    }
1645}
1646
1647bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
1648    low = mid = high = false;
1649
1650    if (mSwitchInProgress || mReconfigurationInProgress) {
1651        ALOGV("Switch/Reconfig in progress, defer buffer polling");
1652        return false;
1653    }
1654
1655    // TODO: Fine tune low/high mark.
1656    //       We also need to pause playback if buffering is too low.
1657    //       Currently during underflow, we depend on decoder to starve
1658    //       to pause, but A/V could have different buffering left,
1659    //       they're not paused together.
1660    // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE
1661
1662    // Switch down if any of the fetchers are below low mark;
1663    // Switch up   if all of the fetchers are over high mark.
1664    size_t activeCount, lowCount, midCount, highCount;
1665    activeCount = lowCount = midCount = highCount = 0;
1666    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1667        // we don't check subtitles for buffering level
1668        if (!(mStreamMask & mPacketSources.keyAt(i)
1669                & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
1670            continue;
1671        }
1672        // ignore streams that never had any packet queued.
1673        // (it's possible that the variant only has audio or video)
1674        sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1675        if (meta == NULL) {
1676            continue;
1677        }
1678
1679        ++activeCount;
1680        int64_t bufferedDurationUs =
1681                mPacketSources[i]->getEstimatedDurationUs();
1682        ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs);
1683        if (bufferedDurationUs < kLowWaterMark) {
1684            ++lowCount;
1685            break;
1686        } else if (bufferedDurationUs > kHighWaterMark) {
1687            ++midCount;
1688            ++highCount;
1689        } else if (bufferedDurationUs > kMidWaterMark) {
1690            ++midCount;
1691        }
1692    }
1693
1694    if (activeCount > 0) {
1695        high = (highCount == activeCount);
1696        mid = (midCount == activeCount);
1697        low = (lowCount > 0);
1698        return true;
1699    }
1700
1701    return false;
1702}
1703
1704void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) {
1705    ssize_t bandwidthIndex = getBandwidthIndex();
1706
1707    if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
1708            || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) {
1709        changeConfiguration(-1, bandwidthIndex, false);
1710    }
1711}
1712
1713void LiveSession::postPrepared(status_t err) {
1714    CHECK(mInPreparationPhase);
1715
1716    sp<AMessage> notify = mNotify->dup();
1717    if (err == OK || err == ERROR_END_OF_STREAM) {
1718        notify->setInt32("what", kWhatPrepared);
1719    } else {
1720        notify->setInt32("what", kWhatPreparationFailed);
1721        notify->setInt32("err", err);
1722    }
1723
1724    notify->post();
1725
1726    mInPreparationPhase = false;
1727}
1728
1729
1730}  // namespace android
1731
1732