LiveSession.cpp revision 6300cbe99899da0103c910ba6a35c785261ce433
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mSubtitleGeneration(0),
67      mLastDequeuedTimeUs(0ll),
68      mRealTimeBaseUs(0ll),
69      mReconfigurationInProgress(false),
70      mSwitchInProgress(false),
71      mDisconnectReplyID(0),
72      mSeekReplyID(0),
73      mFirstTimeUsValid(false),
74      mFirstTimeUs(0),
75      mLastSeekTimeUs(0) {
76
77    mStreams[kAudioIndex] = StreamItem("audio");
78    mStreams[kVideoIndex] = StreamItem("video");
79    mStreams[kSubtitleIndex] = StreamItem("subtitles");
80
81    for (size_t i = 0; i < kMaxStreams; ++i) {
82        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
85        mBuffering[i] = false;
86    }
87}
88
89LiveSession::~LiveSession() {
90}
91
92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
93    ABuffer *discontinuity = new ABuffer(0);
94    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
95    discontinuity->meta()->setInt32("swapPacketSource", swap);
96    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
97    discontinuity->meta()->setInt64("timeUs", -1);
98    return discontinuity;
99}
100
101void LiveSession::swapPacketSource(StreamType stream) {
102    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
103    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
104    sp<AnotherPacketSource> tmp = aps;
105    aps = aps2;
106    aps2 = tmp;
107    aps2->clear();
108}
109
110status_t LiveSession::dequeueAccessUnit(
111        StreamType stream, sp<ABuffer> *accessUnit) {
112    if (!(mStreamMask & stream)) {
113        // return -EWOULDBLOCK to avoid halting the decoder
114        // when switching between audio/video and audio only.
115        return -EWOULDBLOCK;
116    }
117
118    status_t finalResult;
119    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
120    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
121        discontinuityQueue->dequeueAccessUnit(accessUnit);
122        // seeking, track switching
123        sp<AMessage> extra;
124        int64_t timeUs;
125        if ((*accessUnit)->meta()->findMessage("extra", &extra)
126                && extra != NULL
127                && extra->findInt64("timeUs", &timeUs)) {
128            // seeking only
129            mLastSeekTimeUs = timeUs;
130            mDiscontinuityOffsetTimesUs.clear();
131            mDiscontinuityAbsStartTimesUs.clear();
132        }
133        return INFO_DISCONTINUITY;
134    }
135
136    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
137
138    ssize_t idx = typeToIndex(stream);
139    if (!packetSource->hasBufferAvailable(&finalResult)) {
140        if (finalResult == OK) {
141            mBuffering[idx] = true;
142            return -EAGAIN;
143        } else {
144            return finalResult;
145        }
146    }
147
148    if (mBuffering[idx]) {
149        if (mSwitchInProgress
150                || packetSource->isFinished(0)
151                || packetSource->getEstimatedDurationUs() > 10000000ll) {
152            mBuffering[idx] = false;
153        }
154    }
155
156    if (mBuffering[idx]) {
157        return -EAGAIN;
158    }
159
160    // wait for counterpart
161    sp<AnotherPacketSource> otherSource;
162    uint32_t mask = mNewStreamMask & mStreamMask;
163    uint32_t fetchersMask  = 0;
164    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
165        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
166        fetchersMask |= fetcherMask;
167    }
168    mask &= fetchersMask;
169    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
170        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
171    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
172        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
173    }
174    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
175        return finalResult == OK ? -EAGAIN : finalResult;
176    }
177
178    status_t err = packetSource->dequeueAccessUnit(accessUnit);
179
180    size_t streamIdx;
181    const char *streamStr;
182    switch (stream) {
183        case STREAMTYPE_AUDIO:
184            streamIdx = kAudioIndex;
185            streamStr = "audio";
186            break;
187        case STREAMTYPE_VIDEO:
188            streamIdx = kVideoIndex;
189            streamStr = "video";
190            break;
191        case STREAMTYPE_SUBTITLES:
192            streamIdx = kSubtitleIndex;
193            streamStr = "subs";
194            break;
195        default:
196            TRESPASS();
197    }
198
199    StreamItem& strm = mStreams[streamIdx];
200    if (err == INFO_DISCONTINUITY) {
201        // adaptive streaming, discontinuities in the playlist
202        int32_t type;
203        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
204
205        sp<AMessage> extra;
206        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
207            extra.clear();
208        }
209
210        ALOGI("[%s] read discontinuity of type %d, extra = %s",
211              streamStr,
212              type,
213              extra == NULL ? "NULL" : extra->debugString().c_str());
214
215        int32_t swap;
216        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
217            int32_t switchGeneration;
218            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
219            {
220                Mutex::Autolock lock(mSwapMutex);
221                if (switchGeneration == mSwitchGeneration) {
222                    swapPacketSource(stream);
223                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
224                    msg->setInt32("stream", stream);
225                    msg->setInt32("switchGeneration", switchGeneration);
226                    msg->post();
227                }
228            }
229        } else {
230            size_t seq = strm.mCurDiscontinuitySeq;
231            int64_t offsetTimeUs;
232            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
233                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
234            } else {
235                offsetTimeUs = 0;
236            }
237
238            seq += 1;
239            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
240                int64_t firstTimeUs;
241                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
242                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
243                offsetTimeUs += strm.mLastSampleDurationUs;
244            } else {
245                offsetTimeUs += strm.mLastSampleDurationUs;
246            }
247
248            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
249        }
250    } else if (err == OK) {
251
252        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
253            int64_t timeUs;
254            int32_t discontinuitySeq = 0;
255            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
256            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
257            strm.mCurDiscontinuitySeq = discontinuitySeq;
258
259            int32_t discard = 0;
260            int64_t firstTimeUs;
261            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
262                int64_t durUs; // approximate sample duration
263                if (timeUs > strm.mLastDequeuedTimeUs) {
264                    durUs = timeUs - strm.mLastDequeuedTimeUs;
265                } else {
266                    durUs = strm.mLastDequeuedTimeUs - timeUs;
267                }
268                strm.mLastSampleDurationUs = durUs;
269                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
270            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
271                firstTimeUs = timeUs;
272            } else {
273                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
274                firstTimeUs = timeUs;
275            }
276
277            strm.mLastDequeuedTimeUs = timeUs;
278            if (timeUs >= firstTimeUs) {
279                timeUs -= firstTimeUs;
280            } else {
281                timeUs = 0;
282            }
283            timeUs += mLastSeekTimeUs;
284            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
285                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
286            }
287
288            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
289            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
290            mLastDequeuedTimeUs = timeUs;
291            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
292        } else if (stream == STREAMTYPE_SUBTITLES) {
293            int32_t subtitleGeneration;
294            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
295                    && subtitleGeneration != mSubtitleGeneration) {
296               return -EAGAIN;
297            };
298            (*accessUnit)->meta()->setInt32(
299                    "trackIndex", mPlaylist->getSelectedIndex());
300            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
301        }
302    } else {
303        ALOGI("[%s] encountered error %d", streamStr, err);
304    }
305
306    return err;
307}
308
309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
310    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
311    if (!(mStreamMask & stream)) {
312        return UNKNOWN_ERROR;
313    }
314
315    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
316
317    sp<MetaData> meta = packetSource->getFormat();
318
319    if (meta == NULL) {
320        return -EAGAIN;
321    }
322
323    return convertMetaDataToMessage(meta, format);
324}
325
326void LiveSession::connectAsync(
327        const char *url, const KeyedVector<String8, String8> *headers) {
328    sp<AMessage> msg = new AMessage(kWhatConnect, id());
329    msg->setString("url", url);
330
331    if (headers != NULL) {
332        msg->setPointer(
333                "headers",
334                new KeyedVector<String8, String8>(*headers));
335    }
336
337    msg->post();
338}
339
340status_t LiveSession::disconnect() {
341    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
342
343    sp<AMessage> response;
344    status_t err = msg->postAndAwaitResponse(&response);
345
346    return err;
347}
348
349status_t LiveSession::seekTo(int64_t timeUs) {
350    sp<AMessage> msg = new AMessage(kWhatSeek, id());
351    msg->setInt64("timeUs", timeUs);
352
353    sp<AMessage> response;
354    status_t err = msg->postAndAwaitResponse(&response);
355
356    return err;
357}
358
359void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
360    switch (msg->what()) {
361        case kWhatConnect:
362        {
363            onConnect(msg);
364            break;
365        }
366
367        case kWhatDisconnect:
368        {
369            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
370
371            if (mReconfigurationInProgress) {
372                break;
373            }
374
375            finishDisconnect();
376            break;
377        }
378
379        case kWhatSeek:
380        {
381            uint32_t seekReplyID;
382            CHECK(msg->senderAwaitsResponse(&seekReplyID));
383            mSeekReplyID = seekReplyID;
384            mSeekReply = new AMessage;
385
386            status_t err = onSeek(msg);
387
388            if (err != OK) {
389                msg->post(50000);
390            }
391            break;
392        }
393
394        case kWhatFetcherNotify:
395        {
396            int32_t what;
397            CHECK(msg->findInt32("what", &what));
398
399            switch (what) {
400                case PlaylistFetcher::kWhatStarted:
401                    break;
402                case PlaylistFetcher::kWhatPaused:
403                case PlaylistFetcher::kWhatStopped:
404                {
405                    if (what == PlaylistFetcher::kWhatStopped) {
406                        AString uri;
407                        CHECK(msg->findString("uri", &uri));
408                        if (mFetcherInfos.removeItem(uri) < 0) {
409                            // ignore duplicated kWhatStopped messages.
410                            break;
411                        }
412
413                        if (mSwitchInProgress) {
414                            tryToFinishBandwidthSwitch();
415                        }
416                    }
417
418                    if (mContinuation != NULL) {
419                        CHECK_GT(mContinuationCounter, 0);
420                        if (--mContinuationCounter == 0) {
421                            mContinuation->post();
422
423                            if (mSeekReplyID != 0) {
424                                CHECK(mSeekReply != NULL);
425                                mSeekReply->setInt32("err", OK);
426                                mSeekReply->postReply(mSeekReplyID);
427                                mSeekReplyID = 0;
428                                mSeekReply.clear();
429                            }
430                        }
431                    }
432                    break;
433                }
434
435                case PlaylistFetcher::kWhatDurationUpdate:
436                {
437                    AString uri;
438                    CHECK(msg->findString("uri", &uri));
439
440                    int64_t durationUs;
441                    CHECK(msg->findInt64("durationUs", &durationUs));
442
443                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
444                    info->mDurationUs = durationUs;
445                    break;
446                }
447
448                case PlaylistFetcher::kWhatError:
449                {
450                    status_t err;
451                    CHECK(msg->findInt32("err", &err));
452
453                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
454
455                    // handle EOS on subtitle tracks independently
456                    AString uri;
457                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
458                        ssize_t i = mFetcherInfos.indexOfKey(uri);
459                        if (i >= 0) {
460                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
461                            if (fetcher != NULL) {
462                                uint32_t type = fetcher->getStreamTypeMask();
463                                if (type == STREAMTYPE_SUBTITLES) {
464                                    mPacketSources.valueFor(
465                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
466                                    break;
467                                }
468                            }
469                        }
470                    }
471
472                    if (mInPreparationPhase) {
473                        postPrepared(err);
474                    }
475
476                    cancelBandwidthSwitch();
477
478                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
479
480                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
481
482                    mPacketSources.valueFor(
483                            STREAMTYPE_SUBTITLES)->signalEOS(err);
484
485                    sp<AMessage> notify = mNotify->dup();
486                    notify->setInt32("what", kWhatError);
487                    notify->setInt32("err", err);
488                    notify->post();
489                    break;
490                }
491
492                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
493                {
494                    AString uri;
495                    CHECK(msg->findString("uri", &uri));
496
497                    if (mFetcherInfos.indexOfKey(uri) < 0) {
498                        ALOGE("couldn't find uri");
499                        break;
500                    }
501                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
502                    info->mIsPrepared = true;
503
504                    if (mInPreparationPhase) {
505                        bool allFetchersPrepared = true;
506                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
507                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
508                                allFetchersPrepared = false;
509                                break;
510                            }
511                        }
512
513                        if (allFetchersPrepared) {
514                            postPrepared(OK);
515                        }
516                    }
517                    break;
518                }
519
520                case PlaylistFetcher::kWhatStartedAt:
521                {
522                    int32_t switchGeneration;
523                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
524
525                    if (switchGeneration != mSwitchGeneration) {
526                        break;
527                    }
528
529                    // Resume fetcher for the original variant; the resumed fetcher should
530                    // continue until the timestamps found in msg, which is stored by the
531                    // new fetcher to indicate where the new variant has started buffering.
532                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
533                        const FetcherInfo info = mFetcherInfos.valueAt(i);
534                        if (info.mToBeRemoved) {
535                            info.mFetcher->resumeUntilAsync(msg);
536                        }
537                    }
538                    break;
539                }
540
541                default:
542                    TRESPASS();
543            }
544
545            break;
546        }
547
548        case kWhatCheckBandwidth:
549        {
550            int32_t generation;
551            CHECK(msg->findInt32("generation", &generation));
552
553            if (generation != mCheckBandwidthGeneration) {
554                break;
555            }
556
557            onCheckBandwidth(msg);
558            break;
559        }
560
561        case kWhatChangeConfiguration:
562        {
563            onChangeConfiguration(msg);
564            break;
565        }
566
567        case kWhatChangeConfiguration2:
568        {
569            onChangeConfiguration2(msg);
570            break;
571        }
572
573        case kWhatChangeConfiguration3:
574        {
575            onChangeConfiguration3(msg);
576            break;
577        }
578
579        case kWhatFinishDisconnect2:
580        {
581            onFinishDisconnect2();
582            break;
583        }
584
585        case kWhatSwapped:
586        {
587            onSwapped(msg);
588            break;
589        }
590
591        case kWhatCheckSwitchDown:
592        {
593            onCheckSwitchDown();
594            break;
595        }
596
597        case kWhatSwitchDown:
598        {
599            onSwitchDown();
600            break;
601        }
602
603        default:
604            TRESPASS();
605            break;
606    }
607}
608
609// static
610int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
611    if (a->mBandwidth < b->mBandwidth) {
612        return -1;
613    } else if (a->mBandwidth == b->mBandwidth) {
614        return 0;
615    }
616
617    return 1;
618}
619
620// static
621LiveSession::StreamType LiveSession::indexToType(int idx) {
622    CHECK(idx >= 0 && idx < kMaxStreams);
623    return (StreamType)(1 << idx);
624}
625
626// static
627ssize_t LiveSession::typeToIndex(int32_t type) {
628    switch (type) {
629        case STREAMTYPE_AUDIO:
630            return 0;
631        case STREAMTYPE_VIDEO:
632            return 1;
633        case STREAMTYPE_SUBTITLES:
634            return 2;
635        default:
636            return -1;
637    };
638    return -1;
639}
640
641void LiveSession::onConnect(const sp<AMessage> &msg) {
642    AString url;
643    CHECK(msg->findString("url", &url));
644
645    KeyedVector<String8, String8> *headers = NULL;
646    if (!msg->findPointer("headers", (void **)&headers)) {
647        mExtraHeaders.clear();
648    } else {
649        mExtraHeaders = *headers;
650
651        delete headers;
652        headers = NULL;
653    }
654
655    // TODO currently we don't know if we are coming here from incognito mode
656    ALOGI("onConnect %s", uriDebugString(url).c_str());
657
658    mMasterURL = url;
659
660    bool dummy;
661    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
662
663    if (mPlaylist == NULL) {
664        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
665
666        postPrepared(ERROR_IO);
667        return;
668    }
669
670    // We trust the content provider to make a reasonable choice of preferred
671    // initial bandwidth by listing it first in the variant playlist.
672    // At startup we really don't have a good estimate on the available
673    // network bandwidth since we haven't tranferred any data yet. Once
674    // we have we can make a better informed choice.
675    size_t initialBandwidth = 0;
676    size_t initialBandwidthIndex = 0;
677
678    if (mPlaylist->isVariantPlaylist()) {
679        for (size_t i = 0; i < mPlaylist->size(); ++i) {
680            BandwidthItem item;
681
682            item.mPlaylistIndex = i;
683
684            sp<AMessage> meta;
685            AString uri;
686            mPlaylist->itemAt(i, &uri, &meta);
687
688            unsigned long bandwidth;
689            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
690
691            if (initialBandwidth == 0) {
692                initialBandwidth = item.mBandwidth;
693            }
694
695            mBandwidthItems.push(item);
696        }
697
698        CHECK_GT(mBandwidthItems.size(), 0u);
699
700        mBandwidthItems.sort(SortByBandwidth);
701
702        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
703            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
704                initialBandwidthIndex = i;
705                break;
706            }
707        }
708    } else {
709        // dummy item.
710        BandwidthItem item;
711        item.mPlaylistIndex = 0;
712        item.mBandwidth = 0;
713        mBandwidthItems.push(item);
714    }
715
716    mPlaylist->pickRandomMediaItems();
717    changeConfiguration(
718            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
719}
720
721void LiveSession::finishDisconnect() {
722    // No reconfiguration is currently pending, make sure none will trigger
723    // during disconnection either.
724    cancelCheckBandwidthEvent();
725
726    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
727    // (finishDisconnect, onFinishDisconnect2)
728    cancelBandwidthSwitch();
729
730    // cancel switch down monitor
731    mSwitchDownMonitor.clear();
732
733    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
734        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
735    }
736
737    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
738
739    mContinuationCounter = mFetcherInfos.size();
740    mContinuation = msg;
741
742    if (mContinuationCounter == 0) {
743        msg->post();
744    }
745}
746
747void LiveSession::onFinishDisconnect2() {
748    mContinuation.clear();
749
750    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
751    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
752
753    mPacketSources.valueFor(
754            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
755
756    sp<AMessage> response = new AMessage;
757    response->setInt32("err", OK);
758
759    response->postReply(mDisconnectReplyID);
760    mDisconnectReplyID = 0;
761}
762
763sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
764    ssize_t index = mFetcherInfos.indexOfKey(uri);
765
766    if (index >= 0) {
767        return NULL;
768    }
769
770    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
771    notify->setString("uri", uri);
772    notify->setInt32("switchGeneration", mSwitchGeneration);
773
774    FetcherInfo info;
775    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
776    info.mDurationUs = -1ll;
777    info.mIsPrepared = false;
778    info.mToBeRemoved = false;
779    looper()->registerHandler(info.mFetcher);
780
781    mFetcherInfos.add(uri, info);
782
783    return info.mFetcher;
784}
785
786/*
787 * Illustration of parameters:
788 *
789 * 0      `range_offset`
790 * +------------+-------------------------------------------------------+--+--+
791 * |            |                                 | next block to fetch |  |  |
792 * |            | `source` handle => `out` buffer |                     |  |  |
793 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
794 * |            |<----------- `range_length` / buffer capacity ----------->|  |
795 * |<------------------------------ file_size ------------------------------->|
796 *
797 * Special parameter values:
798 * - range_length == -1 means entire file
799 * - block_size == 0 means entire range
800 *
801 */
802ssize_t LiveSession::fetchFile(
803        const char *url, sp<ABuffer> *out,
804        int64_t range_offset, int64_t range_length,
805        uint32_t block_size, /* download block size */
806        sp<DataSource> *source, /* to return and reuse source */
807        String8 *actualUrl) {
808    off64_t size;
809    sp<DataSource> temp_source;
810    if (source == NULL) {
811        source = &temp_source;
812    }
813
814    if (*source == NULL) {
815        if (!strncasecmp(url, "file://", 7)) {
816            *source = new FileSource(url + 7);
817        } else if (strncasecmp(url, "http://", 7)
818                && strncasecmp(url, "https://", 8)) {
819            return ERROR_UNSUPPORTED;
820        } else {
821            KeyedVector<String8, String8> headers = mExtraHeaders;
822            if (range_offset > 0 || range_length >= 0) {
823                headers.add(
824                        String8("Range"),
825                        String8(
826                            StringPrintf(
827                                "bytes=%lld-%s",
828                                range_offset,
829                                range_length < 0
830                                    ? "" : StringPrintf("%lld",
831                                            range_offset + range_length - 1).c_str()).c_str()));
832            }
833            status_t err = mHTTPDataSource->connect(url, &headers);
834
835            if (err != OK) {
836                return err;
837            }
838
839            *source = mHTTPDataSource;
840        }
841    }
842
843    status_t getSizeErr = (*source)->getSize(&size);
844    if (getSizeErr != OK) {
845        size = 65536;
846    }
847
848    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
849    if (*out == NULL) {
850        buffer->setRange(0, 0);
851    }
852
853    ssize_t bytesRead = 0;
854    // adjust range_length if only reading partial block
855    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
856        range_length = buffer->size() + block_size;
857    }
858    for (;;) {
859        // Only resize when we don't know the size.
860        size_t bufferRemaining = buffer->capacity() - buffer->size();
861        if (bufferRemaining == 0 && getSizeErr != OK) {
862            bufferRemaining = 32768;
863
864            ALOGV("increasing download buffer to %zu bytes",
865                 buffer->size() + bufferRemaining);
866
867            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
868            memcpy(copy->data(), buffer->data(), buffer->size());
869            copy->setRange(0, buffer->size());
870
871            buffer = copy;
872        }
873
874        size_t maxBytesToRead = bufferRemaining;
875        if (range_length >= 0) {
876            int64_t bytesLeftInRange = range_length - buffer->size();
877            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
878                maxBytesToRead = bytesLeftInRange;
879
880                if (bytesLeftInRange == 0) {
881                    break;
882                }
883            }
884        }
885
886        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
887        // to help us break out of the loop.
888        ssize_t n = (*source)->readAt(
889                buffer->size(), buffer->data() + buffer->size(),
890                maxBytesToRead);
891
892        if (n < 0) {
893            return n;
894        }
895
896        if (n == 0) {
897            break;
898        }
899
900        buffer->setRange(0, buffer->size() + (size_t)n);
901        bytesRead += n;
902    }
903
904    *out = buffer;
905    if (actualUrl != NULL) {
906        *actualUrl = (*source)->getUri();
907        if (actualUrl->isEmpty()) {
908            *actualUrl = url;
909        }
910    }
911
912    return bytesRead;
913}
914
915sp<M3UParser> LiveSession::fetchPlaylist(
916        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
917    ALOGV("fetchPlaylist '%s'", url);
918
919    *unchanged = false;
920
921    sp<ABuffer> buffer;
922    String8 actualUrl;
923    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
924
925    if (err <= 0) {
926        return NULL;
927    }
928
929    // MD5 functionality is not available on the simulator, treat all
930    // playlists as changed.
931
932#if defined(HAVE_ANDROID_OS)
933    uint8_t hash[16];
934
935    MD5_CTX m;
936    MD5_Init(&m);
937    MD5_Update(&m, buffer->data(), buffer->size());
938
939    MD5_Final(hash, &m);
940
941    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
942        // playlist unchanged
943        *unchanged = true;
944
945        return NULL;
946    }
947
948    if (curPlaylistHash != NULL) {
949        memcpy(curPlaylistHash, hash, sizeof(hash));
950    }
951#endif
952
953    sp<M3UParser> playlist =
954        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
955
956    if (playlist->initCheck() != OK) {
957        ALOGE("failed to parse .m3u8 playlist");
958
959        return NULL;
960    }
961
962    return playlist;
963}
964
965static double uniformRand() {
966    return (double)rand() / RAND_MAX;
967}
968
969size_t LiveSession::getBandwidthIndex() {
970    if (mBandwidthItems.size() == 0) {
971        return 0;
972    }
973
974#if 1
975    char value[PROPERTY_VALUE_MAX];
976    ssize_t index = -1;
977    if (property_get("media.httplive.bw-index", value, NULL)) {
978        char *end;
979        index = strtol(value, &end, 10);
980        CHECK(end > value && *end == '\0');
981
982        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
983            index = mBandwidthItems.size() - 1;
984        }
985    }
986
987    if (index < 0) {
988        int32_t bandwidthBps;
989        if (mHTTPDataSource != NULL
990                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
991            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
992        } else {
993            ALOGV("no bandwidth estimate.");
994            return 0;  // Pick the lowest bandwidth stream by default.
995        }
996
997        char value[PROPERTY_VALUE_MAX];
998        if (property_get("media.httplive.max-bw", value, NULL)) {
999            char *end;
1000            long maxBw = strtoul(value, &end, 10);
1001            if (end > value && *end == '\0') {
1002                if (maxBw > 0 && bandwidthBps > maxBw) {
1003                    ALOGV("bandwidth capped to %ld bps", maxBw);
1004                    bandwidthBps = maxBw;
1005                }
1006            }
1007        }
1008
1009        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1010
1011        index = mBandwidthItems.size() - 1;
1012        while (index > 0) {
1013            // consider only 80% of the available bandwidth, but if we are switching up,
1014            // be even more conservative (70%) to avoid overestimating and immediately
1015            // switching back.
1016            size_t adjustedBandwidthBps = bandwidthBps;
1017            if (index > mCurBandwidthIndex) {
1018                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1019            } else {
1020                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1021            }
1022            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1023                break;
1024            }
1025            --index;
1026        }
1027    }
1028#elif 0
1029    // Change bandwidth at random()
1030    size_t index = uniformRand() * mBandwidthItems.size();
1031#elif 0
1032    // There's a 50% chance to stay on the current bandwidth and
1033    // a 50% chance to switch to the next higher bandwidth (wrapping around
1034    // to lowest)
1035    const size_t kMinIndex = 0;
1036
1037    static ssize_t mCurBandwidthIndex = -1;
1038
1039    size_t index;
1040    if (mCurBandwidthIndex < 0) {
1041        index = kMinIndex;
1042    } else if (uniformRand() < 0.5) {
1043        index = (size_t)mCurBandwidthIndex;
1044    } else {
1045        index = mCurBandwidthIndex + 1;
1046        if (index == mBandwidthItems.size()) {
1047            index = kMinIndex;
1048        }
1049    }
1050    mCurBandwidthIndex = index;
1051#elif 0
1052    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1053
1054    size_t index = mBandwidthItems.size() - 1;
1055    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1056        --index;
1057    }
1058#elif 1
1059    char value[PROPERTY_VALUE_MAX];
1060    size_t index;
1061    if (property_get("media.httplive.bw-index", value, NULL)) {
1062        char *end;
1063        index = strtoul(value, &end, 10);
1064        CHECK(end > value && *end == '\0');
1065
1066        if (index >= mBandwidthItems.size()) {
1067            index = mBandwidthItems.size() - 1;
1068        }
1069    } else {
1070        index = 0;
1071    }
1072#else
1073    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1074#endif
1075
1076    CHECK_GE(index, 0);
1077
1078    return index;
1079}
1080
1081int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1082    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1083    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1084    if (audioMeta != NULL) {
1085        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1086    }
1087
1088    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1089    if (videoMeta != NULL
1090            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1091        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1092            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1093        }
1094
1095    }
1096    return minSegmentStartTimeUs;
1097}
1098
1099status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1100    int64_t timeUs;
1101    CHECK(msg->findInt64("timeUs", &timeUs));
1102
1103    if (!mReconfigurationInProgress) {
1104        changeConfiguration(timeUs, mCurBandwidthIndex);
1105        return OK;
1106    } else {
1107        return -EWOULDBLOCK;
1108    }
1109}
1110
1111status_t LiveSession::getDuration(int64_t *durationUs) const {
1112    int64_t maxDurationUs = 0ll;
1113    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1114        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1115
1116        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1117            maxDurationUs = fetcherDurationUs;
1118        }
1119    }
1120
1121    *durationUs = maxDurationUs;
1122
1123    return OK;
1124}
1125
1126bool LiveSession::isSeekable() const {
1127    int64_t durationUs;
1128    return getDuration(&durationUs) == OK && durationUs >= 0;
1129}
1130
1131bool LiveSession::hasDynamicDuration() const {
1132    return false;
1133}
1134
1135size_t LiveSession::getTrackCount() const {
1136    if (mPlaylist == NULL) {
1137        return 0;
1138    } else {
1139        return mPlaylist->getTrackCount();
1140    }
1141}
1142
1143sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1144    if (mPlaylist == NULL) {
1145        return NULL;
1146    } else {
1147        return mPlaylist->getTrackInfo(trackIndex);
1148    }
1149}
1150
1151status_t LiveSession::selectTrack(size_t index, bool select) {
1152    if (mPlaylist == NULL) {
1153        return INVALID_OPERATION;
1154    }
1155
1156    ++mSubtitleGeneration;
1157    status_t err = mPlaylist->selectTrack(index, select);
1158    if (err == OK) {
1159        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1160        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1161        msg->setInt32("pickTrack", select);
1162        msg->post();
1163    }
1164    return err;
1165}
1166
1167bool LiveSession::canSwitchUp() {
1168    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1169    status_t err = OK;
1170    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1171        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1172        int64_t dur = source->getBufferedDurationUs(&err);
1173        if (err == OK && dur > 10000000) {
1174            return true;
1175        }
1176    }
1177    return false;
1178}
1179
1180void LiveSession::changeConfiguration(
1181        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1182    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1183    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1184    cancelBandwidthSwitch();
1185
1186    CHECK(!mReconfigurationInProgress);
1187    mReconfigurationInProgress = true;
1188
1189    mCurBandwidthIndex = bandwidthIndex;
1190
1191    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1192          timeUs, bandwidthIndex, pickTrack);
1193
1194    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1195    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1196
1197    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1198    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1199
1200    AString URIs[kMaxStreams];
1201    for (size_t i = 0; i < kMaxStreams; ++i) {
1202        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1203            streamMask |= indexToType(i);
1204        }
1205    }
1206
1207    // Step 1, stop and discard fetchers that are no longer needed.
1208    // Pause those that we'll reuse.
1209    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1210        const AString &uri = mFetcherInfos.keyAt(i);
1211
1212        bool discardFetcher = true;
1213
1214        // If we're seeking all current fetchers are discarded.
1215        if (timeUs < 0ll) {
1216            // delay fetcher removal if not picking tracks
1217            discardFetcher = pickTrack;
1218
1219            for (size_t j = 0; j < kMaxStreams; ++j) {
1220                StreamType type = indexToType(j);
1221                if ((streamMask & type) && uri == URIs[j]) {
1222                    resumeMask |= type;
1223                    streamMask &= ~type;
1224                    discardFetcher = false;
1225                }
1226            }
1227        }
1228
1229        if (discardFetcher) {
1230            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1231        } else {
1232            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1233        }
1234    }
1235
1236    sp<AMessage> msg;
1237    if (timeUs < 0ll) {
1238        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1239        msg = new AMessage(kWhatChangeConfiguration3, id());
1240    } else {
1241        msg = new AMessage(kWhatChangeConfiguration2, id());
1242    }
1243    msg->setInt32("streamMask", streamMask);
1244    msg->setInt32("resumeMask", resumeMask);
1245    msg->setInt32("pickTrack", pickTrack);
1246    msg->setInt64("timeUs", timeUs);
1247    for (size_t i = 0; i < kMaxStreams; ++i) {
1248        if ((streamMask | resumeMask) & indexToType(i)) {
1249            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1250        }
1251    }
1252
1253    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1254    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1255    // fetchers have completed their asynchronous operation, we'll post
1256    // mContinuation, which then is handled below in onChangeConfiguration2.
1257    mContinuationCounter = mFetcherInfos.size();
1258    mContinuation = msg;
1259
1260    if (mContinuationCounter == 0) {
1261        msg->post();
1262
1263        if (mSeekReplyID != 0) {
1264            CHECK(mSeekReply != NULL);
1265            mSeekReply->setInt32("err", OK);
1266            mSeekReply->postReply(mSeekReplyID);
1267            mSeekReplyID = 0;
1268            mSeekReply.clear();
1269        }
1270    }
1271}
1272
1273void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1274    if (!mReconfigurationInProgress) {
1275        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1276        msg->findInt32("pickTrack", &pickTrack);
1277        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1278        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1279    } else {
1280        msg->post(1000000ll); // retry in 1 sec
1281    }
1282}
1283
1284void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1285    mContinuation.clear();
1286
1287    // All fetchers are either suspended or have been removed now.
1288
1289    uint32_t streamMask, resumeMask;
1290    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1291    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1292
1293    // currently onChangeConfiguration2 is only called for seeking;
1294    // remove the following CHECK if using it else where.
1295    CHECK_EQ(resumeMask, 0);
1296    streamMask |= resumeMask;
1297
1298    AString URIs[kMaxStreams];
1299    for (size_t i = 0; i < kMaxStreams; ++i) {
1300        if (streamMask & indexToType(i)) {
1301            const AString &uriKey = mStreams[i].uriKey();
1302            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1303            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1304        }
1305    }
1306
1307    // Determine which decoders to shutdown on the player side,
1308    // a decoder has to be shutdown if either
1309    // 1) its streamtype was active before but now longer isn't.
1310    // or
1311    // 2) its streamtype was already active and still is but the URI
1312    //    has changed.
1313    uint32_t changedMask = 0;
1314    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1315        if (((mStreamMask & streamMask & indexToType(i))
1316                && !(URIs[i] == mStreams[i].mUri))
1317                || (mStreamMask & ~streamMask & indexToType(i))) {
1318            changedMask |= indexToType(i);
1319        }
1320    }
1321
1322    if (changedMask == 0) {
1323        // If nothing changed as far as the audio/video decoders
1324        // are concerned we can proceed.
1325        onChangeConfiguration3(msg);
1326        return;
1327    }
1328
1329    // Something changed, inform the player which will shutdown the
1330    // corresponding decoders and will post the reply once that's done.
1331    // Handling the reply will continue executing below in
1332    // onChangeConfiguration3.
1333    sp<AMessage> notify = mNotify->dup();
1334    notify->setInt32("what", kWhatStreamsChanged);
1335    notify->setInt32("changedMask", changedMask);
1336
1337    msg->setWhat(kWhatChangeConfiguration3);
1338    msg->setTarget(id());
1339
1340    notify->setMessage("reply", msg);
1341    notify->post();
1342}
1343
1344void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1345    mContinuation.clear();
1346    // All remaining fetchers are still suspended, the player has shutdown
1347    // any decoders that needed it.
1348
1349    uint32_t streamMask, resumeMask;
1350    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1351    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1352
1353    int64_t timeUs;
1354    int32_t pickTrack;
1355    bool switching = false;
1356    CHECK(msg->findInt64("timeUs", &timeUs));
1357    CHECK(msg->findInt32("pickTrack", &pickTrack));
1358
1359    if (timeUs < 0ll) {
1360        if (!pickTrack) {
1361            switching = true;
1362        }
1363        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1364    } else {
1365        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1366    }
1367
1368    for (size_t i = 0; i < kMaxStreams; ++i) {
1369        if (streamMask & indexToType(i)) {
1370            if (switching) {
1371                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1372            } else {
1373                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1374            }
1375        }
1376    }
1377
1378    mNewStreamMask = streamMask | resumeMask;
1379    if (switching) {
1380        mSwapMask = mStreamMask & ~resumeMask;
1381    }
1382
1383    // Of all existing fetchers:
1384    // * Resume fetchers that are still needed and assign them original packet sources.
1385    // * Mark otherwise unneeded fetchers for removal.
1386    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1387    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1388        const AString &uri = mFetcherInfos.keyAt(i);
1389
1390        sp<AnotherPacketSource> sources[kMaxStreams];
1391        for (size_t j = 0; j < kMaxStreams; ++j) {
1392            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1393                sources[j] = mPacketSources.valueFor(indexToType(j));
1394
1395                if (j != kSubtitleIndex) {
1396                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1397                    sp<AnotherPacketSource> discontinuityQueue;
1398                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1399                    discontinuityQueue->queueDiscontinuity(
1400                            ATSParser::DISCONTINUITY_NONE,
1401                            NULL,
1402                            true);
1403                }
1404            }
1405        }
1406
1407        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1408        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1409                || sources[kSubtitleIndex] != NULL) {
1410            info.mFetcher->startAsync(
1411                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1412        } else {
1413            info.mToBeRemoved = true;
1414        }
1415    }
1416
1417    // streamMask now only contains the types that need a new fetcher created.
1418
1419    if (streamMask != 0) {
1420        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1421    }
1422
1423    // Find out when the original fetchers have buffered up to and start the new fetchers
1424    // at a later timestamp.
1425    for (size_t i = 0; i < kMaxStreams; i++) {
1426        if (!(indexToType(i) & streamMask)) {
1427            continue;
1428        }
1429
1430        AString uri;
1431        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1432
1433        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1434        CHECK(fetcher != NULL);
1435
1436        int32_t latestSeq = -1;
1437        int64_t startTimeUs = -1;
1438        int64_t segmentStartTimeUs = -1ll;
1439        int32_t discontinuitySeq = -1;
1440        sp<AnotherPacketSource> sources[kMaxStreams];
1441
1442        if (i == kSubtitleIndex) {
1443            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1444        }
1445
1446        // TRICKY: looping from i as earlier streams are already removed from streamMask
1447        for (size_t j = i; j < kMaxStreams; ++j) {
1448            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1449            if ((streamMask & indexToType(j)) && uri == streamUri) {
1450                sources[j] = mPacketSources.valueFor(indexToType(j));
1451
1452                if (timeUs >= 0) {
1453                    sources[j]->clear();
1454                    startTimeUs = timeUs;
1455
1456                    sp<AnotherPacketSource> discontinuityQueue;
1457                    sp<AMessage> extra = new AMessage;
1458                    extra->setInt64("timeUs", timeUs);
1459                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1460                    discontinuityQueue->queueDiscontinuity(
1461                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1462                } else {
1463                    int32_t type;
1464                    int64_t srcSegmentStartTimeUs;
1465                    sp<AMessage> meta;
1466                    if (pickTrack) {
1467                        // selecting
1468                        meta = sources[j]->getLatestDequeuedMeta();
1469                    } else {
1470                        // adapting
1471                        meta = sources[j]->getLatestEnqueuedMeta();
1472                    }
1473
1474                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1475                        int64_t tmpUs;
1476                        CHECK(meta->findInt64("timeUs", &tmpUs));
1477                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1478                            startTimeUs = tmpUs;
1479                        }
1480
1481                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1482                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1483                            segmentStartTimeUs = tmpUs;
1484                        }
1485
1486                        int32_t seq;
1487                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1488                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1489                            discontinuitySeq = seq;
1490                        }
1491                    }
1492
1493                    if (pickTrack) {
1494                        // selecting track, queue discontinuities before content
1495                        sources[j]->clear();
1496                        if (j == kSubtitleIndex) {
1497                            break;
1498                        }
1499                        sp<AnotherPacketSource> discontinuityQueue;
1500                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1501                        discontinuityQueue->queueDiscontinuity(
1502                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1503                    } else {
1504                        // adapting, queue discontinuities after resume
1505                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1506                        sources[j]->clear();
1507                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1508                        if (extraStreams & indexToType(j)) {
1509                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1510                        }
1511                    }
1512                }
1513
1514                streamMask &= ~indexToType(j);
1515            }
1516        }
1517
1518        fetcher->startAsync(
1519                sources[kAudioIndex],
1520                sources[kVideoIndex],
1521                sources[kSubtitleIndex],
1522                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1523                segmentStartTimeUs,
1524                discontinuitySeq,
1525                switching);
1526    }
1527
1528    // All fetchers have now been started, the configuration change
1529    // has completed.
1530
1531    cancelCheckBandwidthEvent();
1532    scheduleCheckBandwidthEvent();
1533
1534    ALOGV("XXX configuration change completed.");
1535    mReconfigurationInProgress = false;
1536    if (switching) {
1537        mSwitchInProgress = true;
1538    } else {
1539        mStreamMask = mNewStreamMask;
1540    }
1541
1542    if (mDisconnectReplyID != 0) {
1543        finishDisconnect();
1544    }
1545}
1546
1547void LiveSession::onSwapped(const sp<AMessage> &msg) {
1548    int32_t switchGeneration;
1549    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1550    if (switchGeneration != mSwitchGeneration) {
1551        return;
1552    }
1553
1554    int32_t stream;
1555    CHECK(msg->findInt32("stream", &stream));
1556
1557    ssize_t idx = typeToIndex(stream);
1558    CHECK(idx >= 0);
1559    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1560        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1561    }
1562    mStreams[idx].mUri = mStreams[idx].mNewUri;
1563    mStreams[idx].mNewUri.clear();
1564
1565    mSwapMask &= ~stream;
1566    if (mSwapMask != 0) {
1567        return;
1568    }
1569
1570    // Check if new variant contains extra streams.
1571    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1572    while (extraStreams) {
1573        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1574        swapPacketSource(extraStream);
1575        extraStreams &= ~extraStream;
1576
1577        idx = typeToIndex(extraStream);
1578        CHECK(idx >= 0);
1579        if (mStreams[idx].mNewUri.empty()) {
1580            ALOGW("swapping extra stream type %d %s to empty stream",
1581                    extraStream, mStreams[idx].mUri.c_str());
1582        }
1583        mStreams[idx].mUri = mStreams[idx].mNewUri;
1584        mStreams[idx].mNewUri.clear();
1585    }
1586
1587    tryToFinishBandwidthSwitch();
1588}
1589
1590void LiveSession::onCheckSwitchDown() {
1591    if (mSwitchDownMonitor == NULL) {
1592        return;
1593    }
1594
1595    for (size_t i = 0; i < kMaxStreams; ++i) {
1596        int32_t targetDuration;
1597        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1598        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1599
1600        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1601            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1602            int64_t targetDurationUs = targetDuration * 1000000ll;
1603
1604            if (bufferedDurationUs < targetDurationUs / 3) {
1605                (new AMessage(kWhatSwitchDown, id()))->post();
1606                break;
1607            }
1608        }
1609    }
1610
1611    mSwitchDownMonitor->post(1000000ll);
1612}
1613
1614void LiveSession::onSwitchDown() {
1615    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1616        return;
1617    }
1618
1619    ssize_t bandwidthIndex = getBandwidthIndex();
1620    if (bandwidthIndex < mCurBandwidthIndex) {
1621        changeConfiguration(-1, bandwidthIndex, false);
1622        return;
1623    }
1624
1625    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1626}
1627
1628// Mark switch done when:
1629//   1. all old buffers are swapped out
1630void LiveSession::tryToFinishBandwidthSwitch() {
1631    if (!mSwitchInProgress) {
1632        return;
1633    }
1634
1635    bool needToRemoveFetchers = false;
1636    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1637        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1638            needToRemoveFetchers = true;
1639            break;
1640        }
1641    }
1642
1643    if (!needToRemoveFetchers && mSwapMask == 0) {
1644        ALOGI("mSwitchInProgress = false");
1645        mStreamMask = mNewStreamMask;
1646        mSwitchInProgress = false;
1647    }
1648}
1649
1650void LiveSession::scheduleCheckBandwidthEvent() {
1651    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1652    msg->setInt32("generation", mCheckBandwidthGeneration);
1653    msg->post(10000000ll);
1654}
1655
1656void LiveSession::cancelCheckBandwidthEvent() {
1657    ++mCheckBandwidthGeneration;
1658}
1659
1660void LiveSession::cancelBandwidthSwitch() {
1661    Mutex::Autolock lock(mSwapMutex);
1662    mSwitchGeneration++;
1663    mSwitchInProgress = false;
1664    mSwapMask = 0;
1665
1666    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1667        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1668        if (info.mToBeRemoved) {
1669            info.mToBeRemoved = false;
1670        }
1671    }
1672
1673    for (size_t i = 0; i < kMaxStreams; ++i) {
1674        if (!mStreams[i].mNewUri.empty()) {
1675            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1676            if (j < 0) {
1677                mStreams[i].mNewUri.clear();
1678                continue;
1679            }
1680
1681            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1682            info.mFetcher->stopAsync();
1683            mFetcherInfos.removeItemsAt(j);
1684            mStreams[i].mNewUri.clear();
1685        }
1686    }
1687}
1688
1689bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1690    if (mReconfigurationInProgress || mSwitchInProgress) {
1691        return false;
1692    }
1693
1694    if (mCurBandwidthIndex < 0) {
1695        return true;
1696    }
1697
1698    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1699        return false;
1700    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1701        return canSwitchUp();
1702    } else {
1703        return true;
1704    }
1705}
1706
1707void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1708    size_t bandwidthIndex = getBandwidthIndex();
1709    if (canSwitchBandwidthTo(bandwidthIndex)) {
1710        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1711    } else {
1712        // Come back and check again 10 seconds later in case there is nothing to do now.
1713        // If we DO change configuration, once that completes it'll schedule a new
1714        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1715        msg->post(10000000ll);
1716    }
1717}
1718
1719void LiveSession::postPrepared(status_t err) {
1720    CHECK(mInPreparationPhase);
1721
1722    sp<AMessage> notify = mNotify->dup();
1723    if (err == OK || err == ERROR_END_OF_STREAM) {
1724        notify->setInt32("what", kWhatPrepared);
1725    } else {
1726        notify->setInt32("what", kWhatPreparationFailed);
1727        notify->setInt32("err", err);
1728    }
1729
1730    notify->post();
1731
1732    mInPreparationPhase = false;
1733
1734    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1735    mSwitchDownMonitor->post();
1736}
1737
1738}  // namespace android
1739
1740