LiveSession.cpp revision 174609765fb9c8cbd6aeb61f489746c3570bfee2
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mSubtitleGeneration(0),
67      mLastDequeuedTimeUs(0ll),
68      mRealTimeBaseUs(0ll),
69      mReconfigurationInProgress(false),
70      mSwitchInProgress(false),
71      mDisconnectReplyID(0),
72      mSeekReplyID(0),
73      mFirstTimeUsValid(false),
74      mFirstTimeUs(0),
75      mLastSeekTimeUs(0) {
76
77    mStreams[kAudioIndex] = StreamItem("audio");
78    mStreams[kVideoIndex] = StreamItem("video");
79    mStreams[kSubtitleIndex] = StreamItem("subtitles");
80
81    for (size_t i = 0; i < kMaxStreams; ++i) {
82        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
85        mBuffering[i] = false;
86    }
87}
88
89LiveSession::~LiveSession() {
90}
91
92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
93    ABuffer *discontinuity = new ABuffer(0);
94    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
95    discontinuity->meta()->setInt32("swapPacketSource", swap);
96    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
97    discontinuity->meta()->setInt64("timeUs", -1);
98    return discontinuity;
99}
100
101void LiveSession::swapPacketSource(StreamType stream) {
102    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
103    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
104    sp<AnotherPacketSource> tmp = aps;
105    aps = aps2;
106    aps2 = tmp;
107    aps2->clear();
108}
109
110status_t LiveSession::dequeueAccessUnit(
111        StreamType stream, sp<ABuffer> *accessUnit) {
112    if (!(mStreamMask & stream)) {
113        // return -EWOULDBLOCK to avoid halting the decoder
114        // when switching between audio/video and audio only.
115        return -EWOULDBLOCK;
116    }
117
118    status_t finalResult;
119    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
120    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
121        discontinuityQueue->dequeueAccessUnit(accessUnit);
122        // seeking, track switching
123        sp<AMessage> extra;
124        int64_t timeUs;
125        if ((*accessUnit)->meta()->findMessage("extra", &extra)
126                && extra != NULL
127                && extra->findInt64("timeUs", &timeUs)) {
128            // seeking only
129            mLastSeekTimeUs = timeUs;
130            mDiscontinuityOffsetTimesUs.clear();
131            mDiscontinuityAbsStartTimesUs.clear();
132        }
133        return INFO_DISCONTINUITY;
134    }
135
136    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
137
138    ssize_t idx = typeToIndex(stream);
139    if (!packetSource->hasBufferAvailable(&finalResult)) {
140        if (finalResult == OK) {
141            mBuffering[idx] = true;
142            return -EAGAIN;
143        } else {
144            return finalResult;
145        }
146    }
147
148    int32_t targetDuration = 0;
149    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
150    if (meta != NULL) {
151        meta->findInt32("targetDuration", &targetDuration);
152    }
153
154    int64_t targetDurationUs = targetDuration * 1000000ll;
155    if (targetDurationUs == 0 ||
156            targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
157        // Fetchers limit buffering to
158        // min(3 * targetDuration, kMinBufferedDurationUs)
159        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
160    }
161
162    if (mBuffering[idx]) {
163        if (mSwitchInProgress
164                || packetSource->isFinished(0)
165                || packetSource->getEstimatedDurationUs() > targetDurationUs) {
166            mBuffering[idx] = false;
167        }
168    }
169
170    if (mBuffering[idx]) {
171        return -EAGAIN;
172    }
173
174    // wait for counterpart
175    sp<AnotherPacketSource> otherSource;
176    uint32_t mask = mNewStreamMask & mStreamMask;
177    uint32_t fetchersMask  = 0;
178    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
179        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
180        fetchersMask |= fetcherMask;
181    }
182    mask &= fetchersMask;
183    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
184        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
185    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
186        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
187    }
188    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
189        return finalResult == OK ? -EAGAIN : finalResult;
190    }
191
192    status_t err = packetSource->dequeueAccessUnit(accessUnit);
193
194    size_t streamIdx;
195    const char *streamStr;
196    switch (stream) {
197        case STREAMTYPE_AUDIO:
198            streamIdx = kAudioIndex;
199            streamStr = "audio";
200            break;
201        case STREAMTYPE_VIDEO:
202            streamIdx = kVideoIndex;
203            streamStr = "video";
204            break;
205        case STREAMTYPE_SUBTITLES:
206            streamIdx = kSubtitleIndex;
207            streamStr = "subs";
208            break;
209        default:
210            TRESPASS();
211    }
212
213    StreamItem& strm = mStreams[streamIdx];
214    if (err == INFO_DISCONTINUITY) {
215        // adaptive streaming, discontinuities in the playlist
216        int32_t type;
217        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
218
219        sp<AMessage> extra;
220        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
221            extra.clear();
222        }
223
224        ALOGI("[%s] read discontinuity of type %d, extra = %s",
225              streamStr,
226              type,
227              extra == NULL ? "NULL" : extra->debugString().c_str());
228
229        int32_t swap;
230        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
231            int32_t switchGeneration;
232            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
233            {
234                Mutex::Autolock lock(mSwapMutex);
235                if (switchGeneration == mSwitchGeneration) {
236                    swapPacketSource(stream);
237                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
238                    msg->setInt32("stream", stream);
239                    msg->setInt32("switchGeneration", switchGeneration);
240                    msg->post();
241                }
242            }
243        } else {
244            size_t seq = strm.mCurDiscontinuitySeq;
245            int64_t offsetTimeUs;
246            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
247                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
248            } else {
249                offsetTimeUs = 0;
250            }
251
252            seq += 1;
253            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
254                int64_t firstTimeUs;
255                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
256                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
257                offsetTimeUs += strm.mLastSampleDurationUs;
258            } else {
259                offsetTimeUs += strm.mLastSampleDurationUs;
260            }
261
262            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
263        }
264    } else if (err == OK) {
265
266        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
267            int64_t timeUs;
268            int32_t discontinuitySeq = 0;
269            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
270            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
271            strm.mCurDiscontinuitySeq = discontinuitySeq;
272
273            int32_t discard = 0;
274            int64_t firstTimeUs;
275            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
276                int64_t durUs; // approximate sample duration
277                if (timeUs > strm.mLastDequeuedTimeUs) {
278                    durUs = timeUs - strm.mLastDequeuedTimeUs;
279                } else {
280                    durUs = strm.mLastDequeuedTimeUs - timeUs;
281                }
282                strm.mLastSampleDurationUs = durUs;
283                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
284            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
285                firstTimeUs = timeUs;
286            } else {
287                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
288                firstTimeUs = timeUs;
289            }
290
291            strm.mLastDequeuedTimeUs = timeUs;
292            if (timeUs >= firstTimeUs) {
293                timeUs -= firstTimeUs;
294            } else {
295                timeUs = 0;
296            }
297            timeUs += mLastSeekTimeUs;
298            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
299                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
300            }
301
302            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
303            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
304            mLastDequeuedTimeUs = timeUs;
305            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
306        } else if (stream == STREAMTYPE_SUBTITLES) {
307            int32_t subtitleGeneration;
308            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
309                    && subtitleGeneration != mSubtitleGeneration) {
310               return -EAGAIN;
311            };
312            (*accessUnit)->meta()->setInt32(
313                    "trackIndex", mPlaylist->getSelectedIndex());
314            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
315        }
316    } else {
317        ALOGI("[%s] encountered error %d", streamStr, err);
318    }
319
320    return err;
321}
322
323status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
324    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
325    if (!(mStreamMask & stream)) {
326        return UNKNOWN_ERROR;
327    }
328
329    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
330
331    sp<MetaData> meta = packetSource->getFormat();
332
333    if (meta == NULL) {
334        return -EAGAIN;
335    }
336
337    return convertMetaDataToMessage(meta, format);
338}
339
340void LiveSession::connectAsync(
341        const char *url, const KeyedVector<String8, String8> *headers) {
342    sp<AMessage> msg = new AMessage(kWhatConnect, id());
343    msg->setString("url", url);
344
345    if (headers != NULL) {
346        msg->setPointer(
347                "headers",
348                new KeyedVector<String8, String8>(*headers));
349    }
350
351    msg->post();
352}
353
354status_t LiveSession::disconnect() {
355    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
356
357    sp<AMessage> response;
358    status_t err = msg->postAndAwaitResponse(&response);
359
360    return err;
361}
362
363status_t LiveSession::seekTo(int64_t timeUs) {
364    sp<AMessage> msg = new AMessage(kWhatSeek, id());
365    msg->setInt64("timeUs", timeUs);
366
367    sp<AMessage> response;
368    status_t err = msg->postAndAwaitResponse(&response);
369
370    return err;
371}
372
373void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
374    switch (msg->what()) {
375        case kWhatConnect:
376        {
377            onConnect(msg);
378            break;
379        }
380
381        case kWhatDisconnect:
382        {
383            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
384
385            if (mReconfigurationInProgress) {
386                break;
387            }
388
389            finishDisconnect();
390            break;
391        }
392
393        case kWhatSeek:
394        {
395            uint32_t seekReplyID;
396            CHECK(msg->senderAwaitsResponse(&seekReplyID));
397            mSeekReplyID = seekReplyID;
398            mSeekReply = new AMessage;
399
400            status_t err = onSeek(msg);
401
402            if (err != OK) {
403                msg->post(50000);
404            }
405            break;
406        }
407
408        case kWhatFetcherNotify:
409        {
410            int32_t what;
411            CHECK(msg->findInt32("what", &what));
412
413            switch (what) {
414                case PlaylistFetcher::kWhatStarted:
415                    break;
416                case PlaylistFetcher::kWhatPaused:
417                case PlaylistFetcher::kWhatStopped:
418                {
419                    if (what == PlaylistFetcher::kWhatStopped) {
420                        AString uri;
421                        CHECK(msg->findString("uri", &uri));
422                        if (mFetcherInfos.removeItem(uri) < 0) {
423                            // ignore duplicated kWhatStopped messages.
424                            break;
425                        }
426
427                        if (mSwitchInProgress) {
428                            tryToFinishBandwidthSwitch();
429                        }
430                    }
431
432                    if (mContinuation != NULL) {
433                        CHECK_GT(mContinuationCounter, 0);
434                        if (--mContinuationCounter == 0) {
435                            mContinuation->post();
436
437                            if (mSeekReplyID != 0) {
438                                CHECK(mSeekReply != NULL);
439                                mSeekReply->setInt32("err", OK);
440                                mSeekReply->postReply(mSeekReplyID);
441                                mSeekReplyID = 0;
442                                mSeekReply.clear();
443                            }
444                        }
445                    }
446                    break;
447                }
448
449                case PlaylistFetcher::kWhatDurationUpdate:
450                {
451                    AString uri;
452                    CHECK(msg->findString("uri", &uri));
453
454                    int64_t durationUs;
455                    CHECK(msg->findInt64("durationUs", &durationUs));
456
457                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
458                    info->mDurationUs = durationUs;
459                    break;
460                }
461
462                case PlaylistFetcher::kWhatError:
463                {
464                    status_t err;
465                    CHECK(msg->findInt32("err", &err));
466
467                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
468
469                    // handle EOS on subtitle tracks independently
470                    AString uri;
471                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
472                        ssize_t i = mFetcherInfos.indexOfKey(uri);
473                        if (i >= 0) {
474                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
475                            if (fetcher != NULL) {
476                                uint32_t type = fetcher->getStreamTypeMask();
477                                if (type == STREAMTYPE_SUBTITLES) {
478                                    mPacketSources.valueFor(
479                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
480                                    break;
481                                }
482                            }
483                        }
484                    }
485
486                    if (mInPreparationPhase) {
487                        postPrepared(err);
488                    }
489
490                    cancelBandwidthSwitch();
491
492                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
493
494                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
495
496                    mPacketSources.valueFor(
497                            STREAMTYPE_SUBTITLES)->signalEOS(err);
498
499                    sp<AMessage> notify = mNotify->dup();
500                    notify->setInt32("what", kWhatError);
501                    notify->setInt32("err", err);
502                    notify->post();
503                    break;
504                }
505
506                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
507                {
508                    AString uri;
509                    CHECK(msg->findString("uri", &uri));
510
511                    if (mFetcherInfos.indexOfKey(uri) < 0) {
512                        ALOGE("couldn't find uri");
513                        break;
514                    }
515                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
516                    info->mIsPrepared = true;
517
518                    if (mInPreparationPhase) {
519                        bool allFetchersPrepared = true;
520                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
521                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
522                                allFetchersPrepared = false;
523                                break;
524                            }
525                        }
526
527                        if (allFetchersPrepared) {
528                            postPrepared(OK);
529                        }
530                    }
531                    break;
532                }
533
534                case PlaylistFetcher::kWhatStartedAt:
535                {
536                    int32_t switchGeneration;
537                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
538
539                    if (switchGeneration != mSwitchGeneration) {
540                        break;
541                    }
542
543                    // Resume fetcher for the original variant; the resumed fetcher should
544                    // continue until the timestamps found in msg, which is stored by the
545                    // new fetcher to indicate where the new variant has started buffering.
546                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
547                        const FetcherInfo info = mFetcherInfos.valueAt(i);
548                        if (info.mToBeRemoved) {
549                            info.mFetcher->resumeUntilAsync(msg);
550                        }
551                    }
552                    break;
553                }
554
555                default:
556                    TRESPASS();
557            }
558
559            break;
560        }
561
562        case kWhatCheckBandwidth:
563        {
564            int32_t generation;
565            CHECK(msg->findInt32("generation", &generation));
566
567            if (generation != mCheckBandwidthGeneration) {
568                break;
569            }
570
571            onCheckBandwidth(msg);
572            break;
573        }
574
575        case kWhatChangeConfiguration:
576        {
577            onChangeConfiguration(msg);
578            break;
579        }
580
581        case kWhatChangeConfiguration2:
582        {
583            onChangeConfiguration2(msg);
584            break;
585        }
586
587        case kWhatChangeConfiguration3:
588        {
589            onChangeConfiguration3(msg);
590            break;
591        }
592
593        case kWhatFinishDisconnect2:
594        {
595            onFinishDisconnect2();
596            break;
597        }
598
599        case kWhatSwapped:
600        {
601            onSwapped(msg);
602            break;
603        }
604
605        case kWhatCheckSwitchDown:
606        {
607            onCheckSwitchDown();
608            break;
609        }
610
611        case kWhatSwitchDown:
612        {
613            onSwitchDown();
614            break;
615        }
616
617        default:
618            TRESPASS();
619            break;
620    }
621}
622
623// static
624int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
625    if (a->mBandwidth < b->mBandwidth) {
626        return -1;
627    } else if (a->mBandwidth == b->mBandwidth) {
628        return 0;
629    }
630
631    return 1;
632}
633
634// static
635LiveSession::StreamType LiveSession::indexToType(int idx) {
636    CHECK(idx >= 0 && idx < kMaxStreams);
637    return (StreamType)(1 << idx);
638}
639
640// static
641ssize_t LiveSession::typeToIndex(int32_t type) {
642    switch (type) {
643        case STREAMTYPE_AUDIO:
644            return 0;
645        case STREAMTYPE_VIDEO:
646            return 1;
647        case STREAMTYPE_SUBTITLES:
648            return 2;
649        default:
650            return -1;
651    };
652    return -1;
653}
654
655void LiveSession::onConnect(const sp<AMessage> &msg) {
656    AString url;
657    CHECK(msg->findString("url", &url));
658
659    KeyedVector<String8, String8> *headers = NULL;
660    if (!msg->findPointer("headers", (void **)&headers)) {
661        mExtraHeaders.clear();
662    } else {
663        mExtraHeaders = *headers;
664
665        delete headers;
666        headers = NULL;
667    }
668
669    // TODO currently we don't know if we are coming here from incognito mode
670    ALOGI("onConnect %s", uriDebugString(url).c_str());
671
672    mMasterURL = url;
673
674    bool dummy;
675    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
676
677    if (mPlaylist == NULL) {
678        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
679
680        postPrepared(ERROR_IO);
681        return;
682    }
683
684    // We trust the content provider to make a reasonable choice of preferred
685    // initial bandwidth by listing it first in the variant playlist.
686    // At startup we really don't have a good estimate on the available
687    // network bandwidth since we haven't tranferred any data yet. Once
688    // we have we can make a better informed choice.
689    size_t initialBandwidth = 0;
690    size_t initialBandwidthIndex = 0;
691
692    if (mPlaylist->isVariantPlaylist()) {
693        for (size_t i = 0; i < mPlaylist->size(); ++i) {
694            BandwidthItem item;
695
696            item.mPlaylistIndex = i;
697
698            sp<AMessage> meta;
699            AString uri;
700            mPlaylist->itemAt(i, &uri, &meta);
701
702            unsigned long bandwidth;
703            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
704
705            if (initialBandwidth == 0) {
706                initialBandwidth = item.mBandwidth;
707            }
708
709            mBandwidthItems.push(item);
710        }
711
712        CHECK_GT(mBandwidthItems.size(), 0u);
713
714        mBandwidthItems.sort(SortByBandwidth);
715
716        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
717            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
718                initialBandwidthIndex = i;
719                break;
720            }
721        }
722    } else {
723        // dummy item.
724        BandwidthItem item;
725        item.mPlaylistIndex = 0;
726        item.mBandwidth = 0;
727        mBandwidthItems.push(item);
728    }
729
730    mPlaylist->pickRandomMediaItems();
731    changeConfiguration(
732            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
733}
734
735void LiveSession::finishDisconnect() {
736    // No reconfiguration is currently pending, make sure none will trigger
737    // during disconnection either.
738    cancelCheckBandwidthEvent();
739
740    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
741    // (finishDisconnect, onFinishDisconnect2)
742    cancelBandwidthSwitch();
743
744    // cancel switch down monitor
745    mSwitchDownMonitor.clear();
746
747    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
748        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
749    }
750
751    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
752
753    mContinuationCounter = mFetcherInfos.size();
754    mContinuation = msg;
755
756    if (mContinuationCounter == 0) {
757        msg->post();
758    }
759}
760
761void LiveSession::onFinishDisconnect2() {
762    mContinuation.clear();
763
764    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
765    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
766
767    mPacketSources.valueFor(
768            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
769
770    sp<AMessage> response = new AMessage;
771    response->setInt32("err", OK);
772
773    response->postReply(mDisconnectReplyID);
774    mDisconnectReplyID = 0;
775}
776
777sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
778    ssize_t index = mFetcherInfos.indexOfKey(uri);
779
780    if (index >= 0) {
781        return NULL;
782    }
783
784    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
785    notify->setString("uri", uri);
786    notify->setInt32("switchGeneration", mSwitchGeneration);
787
788    FetcherInfo info;
789    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
790    info.mDurationUs = -1ll;
791    info.mIsPrepared = false;
792    info.mToBeRemoved = false;
793    looper()->registerHandler(info.mFetcher);
794
795    mFetcherInfos.add(uri, info);
796
797    return info.mFetcher;
798}
799
800/*
801 * Illustration of parameters:
802 *
803 * 0      `range_offset`
804 * +------------+-------------------------------------------------------+--+--+
805 * |            |                                 | next block to fetch |  |  |
806 * |            | `source` handle => `out` buffer |                     |  |  |
807 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
808 * |            |<----------- `range_length` / buffer capacity ----------->|  |
809 * |<------------------------------ file_size ------------------------------->|
810 *
811 * Special parameter values:
812 * - range_length == -1 means entire file
813 * - block_size == 0 means entire range
814 *
815 */
816ssize_t LiveSession::fetchFile(
817        const char *url, sp<ABuffer> *out,
818        int64_t range_offset, int64_t range_length,
819        uint32_t block_size, /* download block size */
820        sp<DataSource> *source, /* to return and reuse source */
821        String8 *actualUrl) {
822    off64_t size;
823    sp<DataSource> temp_source;
824    if (source == NULL) {
825        source = &temp_source;
826    }
827
828    if (*source == NULL) {
829        if (!strncasecmp(url, "file://", 7)) {
830            *source = new FileSource(url + 7);
831        } else if (strncasecmp(url, "http://", 7)
832                && strncasecmp(url, "https://", 8)) {
833            return ERROR_UNSUPPORTED;
834        } else {
835            KeyedVector<String8, String8> headers = mExtraHeaders;
836            if (range_offset > 0 || range_length >= 0) {
837                headers.add(
838                        String8("Range"),
839                        String8(
840                            StringPrintf(
841                                "bytes=%lld-%s",
842                                range_offset,
843                                range_length < 0
844                                    ? "" : StringPrintf("%lld",
845                                            range_offset + range_length - 1).c_str()).c_str()));
846            }
847            status_t err = mHTTPDataSource->connect(url, &headers);
848
849            if (err != OK) {
850                return err;
851            }
852
853            *source = mHTTPDataSource;
854        }
855    }
856
857    status_t getSizeErr = (*source)->getSize(&size);
858    if (getSizeErr != OK) {
859        size = 65536;
860    }
861
862    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
863    if (*out == NULL) {
864        buffer->setRange(0, 0);
865    }
866
867    ssize_t bytesRead = 0;
868    // adjust range_length if only reading partial block
869    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
870        range_length = buffer->size() + block_size;
871    }
872    for (;;) {
873        // Only resize when we don't know the size.
874        size_t bufferRemaining = buffer->capacity() - buffer->size();
875        if (bufferRemaining == 0 && getSizeErr != OK) {
876            bufferRemaining = 32768;
877
878            ALOGV("increasing download buffer to %zu bytes",
879                 buffer->size() + bufferRemaining);
880
881            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
882            memcpy(copy->data(), buffer->data(), buffer->size());
883            copy->setRange(0, buffer->size());
884
885            buffer = copy;
886        }
887
888        size_t maxBytesToRead = bufferRemaining;
889        if (range_length >= 0) {
890            int64_t bytesLeftInRange = range_length - buffer->size();
891            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
892                maxBytesToRead = bytesLeftInRange;
893
894                if (bytesLeftInRange == 0) {
895                    break;
896                }
897            }
898        }
899
900        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
901        // to help us break out of the loop.
902        ssize_t n = (*source)->readAt(
903                buffer->size(), buffer->data() + buffer->size(),
904                maxBytesToRead);
905
906        if (n < 0) {
907            return n;
908        }
909
910        if (n == 0) {
911            break;
912        }
913
914        buffer->setRange(0, buffer->size() + (size_t)n);
915        bytesRead += n;
916    }
917
918    *out = buffer;
919    if (actualUrl != NULL) {
920        *actualUrl = (*source)->getUri();
921        if (actualUrl->isEmpty()) {
922            *actualUrl = url;
923        }
924    }
925
926    return bytesRead;
927}
928
929sp<M3UParser> LiveSession::fetchPlaylist(
930        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
931    ALOGV("fetchPlaylist '%s'", url);
932
933    *unchanged = false;
934
935    sp<ABuffer> buffer;
936    String8 actualUrl;
937    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
938
939    if (err <= 0) {
940        return NULL;
941    }
942
943    // MD5 functionality is not available on the simulator, treat all
944    // playlists as changed.
945
946#if defined(HAVE_ANDROID_OS)
947    uint8_t hash[16];
948
949    MD5_CTX m;
950    MD5_Init(&m);
951    MD5_Update(&m, buffer->data(), buffer->size());
952
953    MD5_Final(hash, &m);
954
955    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
956        // playlist unchanged
957        *unchanged = true;
958
959        return NULL;
960    }
961
962    if (curPlaylistHash != NULL) {
963        memcpy(curPlaylistHash, hash, sizeof(hash));
964    }
965#endif
966
967    sp<M3UParser> playlist =
968        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
969
970    if (playlist->initCheck() != OK) {
971        ALOGE("failed to parse .m3u8 playlist");
972
973        return NULL;
974    }
975
976    return playlist;
977}
978
979static double uniformRand() {
980    return (double)rand() / RAND_MAX;
981}
982
983size_t LiveSession::getBandwidthIndex() {
984    if (mBandwidthItems.size() == 0) {
985        return 0;
986    }
987
988#if 1
989    char value[PROPERTY_VALUE_MAX];
990    ssize_t index = -1;
991    if (property_get("media.httplive.bw-index", value, NULL)) {
992        char *end;
993        index = strtol(value, &end, 10);
994        CHECK(end > value && *end == '\0');
995
996        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
997            index = mBandwidthItems.size() - 1;
998        }
999    }
1000
1001    if (index < 0) {
1002        int32_t bandwidthBps;
1003        if (mHTTPDataSource != NULL
1004                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
1005            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
1006        } else {
1007            ALOGV("no bandwidth estimate.");
1008            return 0;  // Pick the lowest bandwidth stream by default.
1009        }
1010
1011        char value[PROPERTY_VALUE_MAX];
1012        if (property_get("media.httplive.max-bw", value, NULL)) {
1013            char *end;
1014            long maxBw = strtoul(value, &end, 10);
1015            if (end > value && *end == '\0') {
1016                if (maxBw > 0 && bandwidthBps > maxBw) {
1017                    ALOGV("bandwidth capped to %ld bps", maxBw);
1018                    bandwidthBps = maxBw;
1019                }
1020            }
1021        }
1022
1023        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1024
1025        index = mBandwidthItems.size() - 1;
1026        while (index > 0) {
1027            // consider only 80% of the available bandwidth, but if we are switching up,
1028            // be even more conservative (70%) to avoid overestimating and immediately
1029            // switching back.
1030            size_t adjustedBandwidthBps = bandwidthBps;
1031            if (index > mCurBandwidthIndex) {
1032                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1033            } else {
1034                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1035            }
1036            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1037                break;
1038            }
1039            --index;
1040        }
1041    }
1042#elif 0
1043    // Change bandwidth at random()
1044    size_t index = uniformRand() * mBandwidthItems.size();
1045#elif 0
1046    // There's a 50% chance to stay on the current bandwidth and
1047    // a 50% chance to switch to the next higher bandwidth (wrapping around
1048    // to lowest)
1049    const size_t kMinIndex = 0;
1050
1051    static ssize_t mCurBandwidthIndex = -1;
1052
1053    size_t index;
1054    if (mCurBandwidthIndex < 0) {
1055        index = kMinIndex;
1056    } else if (uniformRand() < 0.5) {
1057        index = (size_t)mCurBandwidthIndex;
1058    } else {
1059        index = mCurBandwidthIndex + 1;
1060        if (index == mBandwidthItems.size()) {
1061            index = kMinIndex;
1062        }
1063    }
1064    mCurBandwidthIndex = index;
1065#elif 0
1066    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1067
1068    size_t index = mBandwidthItems.size() - 1;
1069    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1070        --index;
1071    }
1072#elif 1
1073    char value[PROPERTY_VALUE_MAX];
1074    size_t index;
1075    if (property_get("media.httplive.bw-index", value, NULL)) {
1076        char *end;
1077        index = strtoul(value, &end, 10);
1078        CHECK(end > value && *end == '\0');
1079
1080        if (index >= mBandwidthItems.size()) {
1081            index = mBandwidthItems.size() - 1;
1082        }
1083    } else {
1084        index = 0;
1085    }
1086#else
1087    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1088#endif
1089
1090    CHECK_GE(index, 0);
1091
1092    return index;
1093}
1094
1095int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1096    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1097    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1098    if (audioMeta != NULL) {
1099        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1100    }
1101
1102    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1103    if (videoMeta != NULL
1104            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1105        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1106            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1107        }
1108
1109    }
1110    return minSegmentStartTimeUs;
1111}
1112
1113status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1114    int64_t timeUs;
1115    CHECK(msg->findInt64("timeUs", &timeUs));
1116
1117    if (!mReconfigurationInProgress) {
1118        changeConfiguration(timeUs, mCurBandwidthIndex);
1119        return OK;
1120    } else {
1121        return -EWOULDBLOCK;
1122    }
1123}
1124
1125status_t LiveSession::getDuration(int64_t *durationUs) const {
1126    int64_t maxDurationUs = -1ll;
1127    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1128        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1129
1130        if (fetcherDurationUs > maxDurationUs) {
1131            maxDurationUs = fetcherDurationUs;
1132        }
1133    }
1134
1135    *durationUs = maxDurationUs;
1136
1137    return OK;
1138}
1139
1140bool LiveSession::isSeekable() const {
1141    int64_t durationUs;
1142    return getDuration(&durationUs) == OK && durationUs >= 0;
1143}
1144
1145bool LiveSession::hasDynamicDuration() const {
1146    return false;
1147}
1148
1149size_t LiveSession::getTrackCount() const {
1150    if (mPlaylist == NULL) {
1151        return 0;
1152    } else {
1153        return mPlaylist->getTrackCount();
1154    }
1155}
1156
1157sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1158    if (mPlaylist == NULL) {
1159        return NULL;
1160    } else {
1161        return mPlaylist->getTrackInfo(trackIndex);
1162    }
1163}
1164
1165status_t LiveSession::selectTrack(size_t index, bool select) {
1166    if (mPlaylist == NULL) {
1167        return INVALID_OPERATION;
1168    }
1169
1170    ++mSubtitleGeneration;
1171    status_t err = mPlaylist->selectTrack(index, select);
1172    if (err == OK) {
1173        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1174        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1175        msg->setInt32("pickTrack", select);
1176        msg->post();
1177    }
1178    return err;
1179}
1180
1181ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1182    if (mPlaylist == NULL) {
1183        return -1;
1184    } else {
1185        return mPlaylist->getSelectedTrack(type);
1186    }
1187}
1188
1189bool LiveSession::canSwitchUp() {
1190    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1191    status_t err = OK;
1192    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1193        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1194        int64_t dur = source->getBufferedDurationUs(&err);
1195        if (err == OK && dur > 10000000) {
1196            return true;
1197        }
1198    }
1199    return false;
1200}
1201
1202void LiveSession::changeConfiguration(
1203        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1204    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1205    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1206    cancelBandwidthSwitch();
1207
1208    CHECK(!mReconfigurationInProgress);
1209    mReconfigurationInProgress = true;
1210
1211    mCurBandwidthIndex = bandwidthIndex;
1212
1213    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1214          timeUs, bandwidthIndex, pickTrack);
1215
1216    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1217    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1218
1219    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1220    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1221
1222    AString URIs[kMaxStreams];
1223    for (size_t i = 0; i < kMaxStreams; ++i) {
1224        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1225            streamMask |= indexToType(i);
1226        }
1227    }
1228
1229    // Step 1, stop and discard fetchers that are no longer needed.
1230    // Pause those that we'll reuse.
1231    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1232        const AString &uri = mFetcherInfos.keyAt(i);
1233
1234        bool discardFetcher = true;
1235
1236        // If we're seeking all current fetchers are discarded.
1237        if (timeUs < 0ll) {
1238            // delay fetcher removal if not picking tracks
1239            discardFetcher = pickTrack;
1240
1241            for (size_t j = 0; j < kMaxStreams; ++j) {
1242                StreamType type = indexToType(j);
1243                if ((streamMask & type) && uri == URIs[j]) {
1244                    resumeMask |= type;
1245                    streamMask &= ~type;
1246                    discardFetcher = false;
1247                }
1248            }
1249        }
1250
1251        if (discardFetcher) {
1252            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1253        } else {
1254            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1255        }
1256    }
1257
1258    sp<AMessage> msg;
1259    if (timeUs < 0ll) {
1260        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1261        msg = new AMessage(kWhatChangeConfiguration3, id());
1262    } else {
1263        msg = new AMessage(kWhatChangeConfiguration2, id());
1264    }
1265    msg->setInt32("streamMask", streamMask);
1266    msg->setInt32("resumeMask", resumeMask);
1267    msg->setInt32("pickTrack", pickTrack);
1268    msg->setInt64("timeUs", timeUs);
1269    for (size_t i = 0; i < kMaxStreams; ++i) {
1270        if ((streamMask | resumeMask) & indexToType(i)) {
1271            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1272        }
1273    }
1274
1275    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1276    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1277    // fetchers have completed their asynchronous operation, we'll post
1278    // mContinuation, which then is handled below in onChangeConfiguration2.
1279    mContinuationCounter = mFetcherInfos.size();
1280    mContinuation = msg;
1281
1282    if (mContinuationCounter == 0) {
1283        msg->post();
1284
1285        if (mSeekReplyID != 0) {
1286            CHECK(mSeekReply != NULL);
1287            mSeekReply->setInt32("err", OK);
1288            mSeekReply->postReply(mSeekReplyID);
1289            mSeekReplyID = 0;
1290            mSeekReply.clear();
1291        }
1292    }
1293}
1294
1295void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1296    if (!mReconfigurationInProgress) {
1297        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1298        msg->findInt32("pickTrack", &pickTrack);
1299        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1300        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1301    } else {
1302        msg->post(1000000ll); // retry in 1 sec
1303    }
1304}
1305
1306void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1307    mContinuation.clear();
1308
1309    // All fetchers are either suspended or have been removed now.
1310
1311    uint32_t streamMask, resumeMask;
1312    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1313    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1314
1315    // currently onChangeConfiguration2 is only called for seeking;
1316    // remove the following CHECK if using it else where.
1317    CHECK_EQ(resumeMask, 0);
1318    streamMask |= resumeMask;
1319
1320    AString URIs[kMaxStreams];
1321    for (size_t i = 0; i < kMaxStreams; ++i) {
1322        if (streamMask & indexToType(i)) {
1323            const AString &uriKey = mStreams[i].uriKey();
1324            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1325            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1326        }
1327    }
1328
1329    // Determine which decoders to shutdown on the player side,
1330    // a decoder has to be shutdown if either
1331    // 1) its streamtype was active before but now longer isn't.
1332    // or
1333    // 2) its streamtype was already active and still is but the URI
1334    //    has changed.
1335    uint32_t changedMask = 0;
1336    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1337        if (((mStreamMask & streamMask & indexToType(i))
1338                && !(URIs[i] == mStreams[i].mUri))
1339                || (mStreamMask & ~streamMask & indexToType(i))) {
1340            changedMask |= indexToType(i);
1341        }
1342    }
1343
1344    if (changedMask == 0) {
1345        // If nothing changed as far as the audio/video decoders
1346        // are concerned we can proceed.
1347        onChangeConfiguration3(msg);
1348        return;
1349    }
1350
1351    // Something changed, inform the player which will shutdown the
1352    // corresponding decoders and will post the reply once that's done.
1353    // Handling the reply will continue executing below in
1354    // onChangeConfiguration3.
1355    sp<AMessage> notify = mNotify->dup();
1356    notify->setInt32("what", kWhatStreamsChanged);
1357    notify->setInt32("changedMask", changedMask);
1358
1359    msg->setWhat(kWhatChangeConfiguration3);
1360    msg->setTarget(id());
1361
1362    notify->setMessage("reply", msg);
1363    notify->post();
1364}
1365
1366void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1367    mContinuation.clear();
1368    // All remaining fetchers are still suspended, the player has shutdown
1369    // any decoders that needed it.
1370
1371    uint32_t streamMask, resumeMask;
1372    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1373    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1374
1375    int64_t timeUs;
1376    int32_t pickTrack;
1377    bool switching = false;
1378    CHECK(msg->findInt64("timeUs", &timeUs));
1379    CHECK(msg->findInt32("pickTrack", &pickTrack));
1380
1381    if (timeUs < 0ll) {
1382        if (!pickTrack) {
1383            switching = true;
1384        }
1385        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1386    } else {
1387        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1388    }
1389
1390    for (size_t i = 0; i < kMaxStreams; ++i) {
1391        if (streamMask & indexToType(i)) {
1392            if (switching) {
1393                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1394            } else {
1395                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1396            }
1397        }
1398    }
1399
1400    mNewStreamMask = streamMask | resumeMask;
1401    if (switching) {
1402        mSwapMask = mStreamMask & ~resumeMask;
1403    }
1404
1405    // Of all existing fetchers:
1406    // * Resume fetchers that are still needed and assign them original packet sources.
1407    // * Mark otherwise unneeded fetchers for removal.
1408    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1409    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1410        const AString &uri = mFetcherInfos.keyAt(i);
1411
1412        sp<AnotherPacketSource> sources[kMaxStreams];
1413        for (size_t j = 0; j < kMaxStreams; ++j) {
1414            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1415                sources[j] = mPacketSources.valueFor(indexToType(j));
1416
1417                if (j != kSubtitleIndex) {
1418                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1419                    sp<AnotherPacketSource> discontinuityQueue;
1420                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1421                    discontinuityQueue->queueDiscontinuity(
1422                            ATSParser::DISCONTINUITY_NONE,
1423                            NULL,
1424                            true);
1425                }
1426            }
1427        }
1428
1429        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1430        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1431                || sources[kSubtitleIndex] != NULL) {
1432            info.mFetcher->startAsync(
1433                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1434        } else {
1435            info.mToBeRemoved = true;
1436        }
1437    }
1438
1439    // streamMask now only contains the types that need a new fetcher created.
1440
1441    if (streamMask != 0) {
1442        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1443    }
1444
1445    // Find out when the original fetchers have buffered up to and start the new fetchers
1446    // at a later timestamp.
1447    for (size_t i = 0; i < kMaxStreams; i++) {
1448        if (!(indexToType(i) & streamMask)) {
1449            continue;
1450        }
1451
1452        AString uri;
1453        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1454
1455        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1456        CHECK(fetcher != NULL);
1457
1458        int32_t latestSeq = -1;
1459        int64_t startTimeUs = -1;
1460        int64_t segmentStartTimeUs = -1ll;
1461        int32_t discontinuitySeq = -1;
1462        sp<AnotherPacketSource> sources[kMaxStreams];
1463
1464        if (i == kSubtitleIndex) {
1465            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1466        }
1467
1468        // TRICKY: looping from i as earlier streams are already removed from streamMask
1469        for (size_t j = i; j < kMaxStreams; ++j) {
1470            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1471            if ((streamMask & indexToType(j)) && uri == streamUri) {
1472                sources[j] = mPacketSources.valueFor(indexToType(j));
1473
1474                if (timeUs >= 0) {
1475                    sources[j]->clear();
1476                    startTimeUs = timeUs;
1477
1478                    sp<AnotherPacketSource> discontinuityQueue;
1479                    sp<AMessage> extra = new AMessage;
1480                    extra->setInt64("timeUs", timeUs);
1481                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1482                    discontinuityQueue->queueDiscontinuity(
1483                            ATSParser::DISCONTINUITY_TIME, extra, true);
1484                } else {
1485                    int32_t type;
1486                    int64_t srcSegmentStartTimeUs;
1487                    sp<AMessage> meta;
1488                    if (pickTrack) {
1489                        // selecting
1490                        meta = sources[j]->getLatestDequeuedMeta();
1491                    } else {
1492                        // adapting
1493                        meta = sources[j]->getLatestEnqueuedMeta();
1494                    }
1495
1496                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1497                        int64_t tmpUs;
1498                        CHECK(meta->findInt64("timeUs", &tmpUs));
1499                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1500                            startTimeUs = tmpUs;
1501                        }
1502
1503                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1504                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1505                            segmentStartTimeUs = tmpUs;
1506                        }
1507
1508                        int32_t seq;
1509                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1510                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1511                            discontinuitySeq = seq;
1512                        }
1513                    }
1514
1515                    if (pickTrack) {
1516                        // selecting track, queue discontinuities before content
1517                        sources[j]->clear();
1518                        if (j == kSubtitleIndex) {
1519                            break;
1520                        }
1521                        sp<AnotherPacketSource> discontinuityQueue;
1522                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1523                        discontinuityQueue->queueDiscontinuity(
1524                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1525                    } else {
1526                        // adapting, queue discontinuities after resume
1527                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1528                        sources[j]->clear();
1529                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1530                        if (extraStreams & indexToType(j)) {
1531                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1532                        }
1533                    }
1534                }
1535
1536                streamMask &= ~indexToType(j);
1537            }
1538        }
1539
1540        fetcher->startAsync(
1541                sources[kAudioIndex],
1542                sources[kVideoIndex],
1543                sources[kSubtitleIndex],
1544                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1545                segmentStartTimeUs,
1546                discontinuitySeq,
1547                switching);
1548    }
1549
1550    // All fetchers have now been started, the configuration change
1551    // has completed.
1552
1553    cancelCheckBandwidthEvent();
1554    scheduleCheckBandwidthEvent();
1555
1556    ALOGV("XXX configuration change completed.");
1557    mReconfigurationInProgress = false;
1558    if (switching) {
1559        mSwitchInProgress = true;
1560    } else {
1561        mStreamMask = mNewStreamMask;
1562    }
1563
1564    if (mDisconnectReplyID != 0) {
1565        finishDisconnect();
1566    }
1567}
1568
1569void LiveSession::onSwapped(const sp<AMessage> &msg) {
1570    int32_t switchGeneration;
1571    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1572    if (switchGeneration != mSwitchGeneration) {
1573        return;
1574    }
1575
1576    int32_t stream;
1577    CHECK(msg->findInt32("stream", &stream));
1578
1579    ssize_t idx = typeToIndex(stream);
1580    CHECK(idx >= 0);
1581    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1582        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1583    }
1584    mStreams[idx].mUri = mStreams[idx].mNewUri;
1585    mStreams[idx].mNewUri.clear();
1586
1587    mSwapMask &= ~stream;
1588    if (mSwapMask != 0) {
1589        return;
1590    }
1591
1592    // Check if new variant contains extra streams.
1593    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1594    while (extraStreams) {
1595        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1596        swapPacketSource(extraStream);
1597        extraStreams &= ~extraStream;
1598
1599        idx = typeToIndex(extraStream);
1600        CHECK(idx >= 0);
1601        if (mStreams[idx].mNewUri.empty()) {
1602            ALOGW("swapping extra stream type %d %s to empty stream",
1603                    extraStream, mStreams[idx].mUri.c_str());
1604        }
1605        mStreams[idx].mUri = mStreams[idx].mNewUri;
1606        mStreams[idx].mNewUri.clear();
1607    }
1608
1609    tryToFinishBandwidthSwitch();
1610}
1611
1612void LiveSession::onCheckSwitchDown() {
1613    if (mSwitchDownMonitor == NULL) {
1614        return;
1615    }
1616
1617    for (size_t i = 0; i < kMaxStreams; ++i) {
1618        int32_t targetDuration;
1619        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1620        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1621
1622        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1623            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1624            int64_t targetDurationUs = targetDuration * 1000000ll;
1625
1626            if (bufferedDurationUs < targetDurationUs / 3) {
1627                (new AMessage(kWhatSwitchDown, id()))->post();
1628                break;
1629            }
1630        }
1631    }
1632
1633    mSwitchDownMonitor->post(1000000ll);
1634}
1635
1636void LiveSession::onSwitchDown() {
1637    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1638        return;
1639    }
1640
1641    ssize_t bandwidthIndex = getBandwidthIndex();
1642    if (bandwidthIndex < mCurBandwidthIndex) {
1643        changeConfiguration(-1, bandwidthIndex, false);
1644        return;
1645    }
1646
1647    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1648}
1649
1650// Mark switch done when:
1651//   1. all old buffers are swapped out
1652void LiveSession::tryToFinishBandwidthSwitch() {
1653    if (!mSwitchInProgress) {
1654        return;
1655    }
1656
1657    bool needToRemoveFetchers = false;
1658    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1659        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1660            needToRemoveFetchers = true;
1661            break;
1662        }
1663    }
1664
1665    if (!needToRemoveFetchers && mSwapMask == 0) {
1666        ALOGI("mSwitchInProgress = false");
1667        mStreamMask = mNewStreamMask;
1668        mSwitchInProgress = false;
1669    }
1670}
1671
1672void LiveSession::scheduleCheckBandwidthEvent() {
1673    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1674    msg->setInt32("generation", mCheckBandwidthGeneration);
1675    msg->post(10000000ll);
1676}
1677
1678void LiveSession::cancelCheckBandwidthEvent() {
1679    ++mCheckBandwidthGeneration;
1680}
1681
1682void LiveSession::cancelBandwidthSwitch() {
1683    Mutex::Autolock lock(mSwapMutex);
1684    mSwitchGeneration++;
1685    mSwitchInProgress = false;
1686    mSwapMask = 0;
1687
1688    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1689        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1690        if (info.mToBeRemoved) {
1691            info.mToBeRemoved = false;
1692        }
1693    }
1694
1695    for (size_t i = 0; i < kMaxStreams; ++i) {
1696        if (!mStreams[i].mNewUri.empty()) {
1697            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1698            if (j < 0) {
1699                mStreams[i].mNewUri.clear();
1700                continue;
1701            }
1702
1703            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1704            info.mFetcher->stopAsync();
1705            mFetcherInfos.removeItemsAt(j);
1706            mStreams[i].mNewUri.clear();
1707        }
1708    }
1709}
1710
1711bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1712    if (mReconfigurationInProgress || mSwitchInProgress) {
1713        return false;
1714    }
1715
1716    if (mCurBandwidthIndex < 0) {
1717        return true;
1718    }
1719
1720    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1721        return false;
1722    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1723        return canSwitchUp();
1724    } else {
1725        return true;
1726    }
1727}
1728
1729void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1730    size_t bandwidthIndex = getBandwidthIndex();
1731    if (canSwitchBandwidthTo(bandwidthIndex)) {
1732        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1733    } else {
1734        // Come back and check again 10 seconds later in case there is nothing to do now.
1735        // If we DO change configuration, once that completes it'll schedule a new
1736        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1737        msg->post(10000000ll);
1738    }
1739}
1740
1741void LiveSession::postPrepared(status_t err) {
1742    CHECK(mInPreparationPhase);
1743
1744    sp<AMessage> notify = mNotify->dup();
1745    if (err == OK || err == ERROR_END_OF_STREAM) {
1746        notify->setInt32("what", kWhatPrepared);
1747    } else {
1748        notify->setInt32("what", kWhatPreparationFailed);
1749        notify->setInt32("err", err);
1750    }
1751
1752    notify->post();
1753
1754    mInPreparationPhase = false;
1755
1756    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1757    mSwitchDownMonitor->post();
1758}
1759
1760}  // namespace android
1761
1762