LiveSession.cpp revision 309aa8bf5e4cd66fe988adf2654cac3fadc2a1c3
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0),
72      mFirstTimeUsValid(false),
73      mFirstTimeUs(0),
74      mLastSeekTimeUs(0) {
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitles");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84    }
85}
86
87LiveSession::~LiveSession() {
88}
89
90sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
91    ABuffer *discontinuity = new ABuffer(0);
92    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
93    discontinuity->meta()->setInt32("swapPacketSource", swap);
94    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
95    discontinuity->meta()->setInt64("timeUs", -1);
96    return discontinuity;
97}
98
99void LiveSession::swapPacketSource(StreamType stream) {
100    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
101    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
102    sp<AnotherPacketSource> tmp = aps;
103    aps = aps2;
104    aps2 = tmp;
105    aps2->clear();
106}
107
108status_t LiveSession::dequeueAccessUnit(
109        StreamType stream, sp<ABuffer> *accessUnit) {
110    if (!(mStreamMask & stream)) {
111        // return -EWOULDBLOCK to avoid halting the decoder
112        // when switching between audio/video and audio only.
113        return -EWOULDBLOCK;
114    }
115
116    status_t finalResult;
117    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
118    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
119        discontinuityQueue->dequeueAccessUnit(accessUnit);
120        // seeking, track switching
121        sp<AMessage> extra;
122        int64_t timeUs;
123        if ((*accessUnit)->meta()->findMessage("extra", &extra)
124                && extra != NULL
125                && extra->findInt64("timeUs", &timeUs)) {
126            // seeking only
127            mLastSeekTimeUs = timeUs;
128            mDiscontinuityOffsetTimesUs.clear();
129            mDiscontinuityAbsStartTimesUs.clear();
130        }
131        return INFO_DISCONTINUITY;
132    }
133
134    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
135
136    if (!packetSource->hasBufferAvailable(&finalResult)) {
137        return finalResult == OK ? -EAGAIN : finalResult;
138    }
139
140    // wait for counterpart
141    sp<AnotherPacketSource> otherSource;
142    if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) {
143        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
144    } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) {
145        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
146    }
147    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
148        return finalResult == OK ? -EAGAIN : finalResult;
149    }
150
151    status_t err = packetSource->dequeueAccessUnit(accessUnit);
152
153    size_t streamIdx;
154    const char *streamStr;
155    switch (stream) {
156        case STREAMTYPE_AUDIO:
157            streamIdx = kAudioIndex;
158            streamStr = "audio";
159            break;
160        case STREAMTYPE_VIDEO:
161            streamIdx = kVideoIndex;
162            streamStr = "video";
163            break;
164        case STREAMTYPE_SUBTITLES:
165            streamIdx = kSubtitleIndex;
166            streamStr = "subs";
167            break;
168        default:
169            TRESPASS();
170    }
171
172    StreamItem& strm = mStreams[streamIdx];
173    if (err == INFO_DISCONTINUITY) {
174        // adaptive streaming, discontinuities in the playlist
175        int32_t type;
176        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
177
178        sp<AMessage> extra;
179        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
180            extra.clear();
181        }
182
183        ALOGI("[%s] read discontinuity of type %d, extra = %s",
184              streamStr,
185              type,
186              extra == NULL ? "NULL" : extra->debugString().c_str());
187
188        int32_t swap;
189        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
190            int32_t switchGeneration;
191            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
192            {
193                Mutex::Autolock lock(mSwapMutex);
194                if (switchGeneration == mSwitchGeneration) {
195                    swapPacketSource(stream);
196                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
197                    msg->setInt32("stream", stream);
198                    msg->setInt32("switchGeneration", switchGeneration);
199                    msg->post();
200                }
201            }
202        } else {
203            size_t seq = strm.mCurDiscontinuitySeq;
204            int64_t offsetTimeUs;
205            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
206                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
207            } else {
208                offsetTimeUs = 0;
209            }
210
211            seq += 1;
212            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
213                int64_t firstTimeUs;
214                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
215                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
216                offsetTimeUs += strm.mLastSampleDurationUs;
217            } else {
218                offsetTimeUs += strm.mLastSampleDurationUs;
219            }
220
221            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
222        }
223    } else if (err == OK) {
224
225        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
226            int64_t timeUs;
227            int32_t discontinuitySeq = 0;
228            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
229            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
230            strm.mCurDiscontinuitySeq = discontinuitySeq;
231
232            int32_t discard = 0;
233            int64_t firstTimeUs;
234            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
235                int64_t durUs; // approximate sample duration
236                if (timeUs > strm.mLastDequeuedTimeUs) {
237                    durUs = timeUs - strm.mLastDequeuedTimeUs;
238                } else {
239                    durUs = strm.mLastDequeuedTimeUs - timeUs;
240                }
241                strm.mLastSampleDurationUs = durUs;
242                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
243            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
244                firstTimeUs = timeUs;
245            } else {
246                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
247                firstTimeUs = timeUs;
248            }
249
250            strm.mLastDequeuedTimeUs = timeUs;
251            if (timeUs >= firstTimeUs) {
252                timeUs -= firstTimeUs;
253            } else {
254                timeUs = 0;
255            }
256            timeUs += mLastSeekTimeUs;
257            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
258                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
259            }
260
261            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
262            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
263            mLastDequeuedTimeUs = timeUs;
264            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
265        } else if (stream == STREAMTYPE_SUBTITLES) {
266            (*accessUnit)->meta()->setInt32(
267                    "trackIndex", mPlaylist->getSelectedIndex());
268            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
269        }
270    } else {
271        ALOGI("[%s] encountered error %d", streamStr, err);
272    }
273
274    return err;
275}
276
277status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
278    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
279    if (!(mStreamMask & stream)) {
280        return UNKNOWN_ERROR;
281    }
282
283    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
284
285    sp<MetaData> meta = packetSource->getFormat();
286
287    if (meta == NULL) {
288        return -EAGAIN;
289    }
290
291    return convertMetaDataToMessage(meta, format);
292}
293
294void LiveSession::connectAsync(
295        const char *url, const KeyedVector<String8, String8> *headers) {
296    sp<AMessage> msg = new AMessage(kWhatConnect, id());
297    msg->setString("url", url);
298
299    if (headers != NULL) {
300        msg->setPointer(
301                "headers",
302                new KeyedVector<String8, String8>(*headers));
303    }
304
305    msg->post();
306}
307
308status_t LiveSession::disconnect() {
309    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
310
311    sp<AMessage> response;
312    status_t err = msg->postAndAwaitResponse(&response);
313
314    return err;
315}
316
317status_t LiveSession::seekTo(int64_t timeUs) {
318    sp<AMessage> msg = new AMessage(kWhatSeek, id());
319    msg->setInt64("timeUs", timeUs);
320
321    sp<AMessage> response;
322    status_t err = msg->postAndAwaitResponse(&response);
323
324    uint32_t replyID;
325    CHECK(response == mSeekReply && 0 != mSeekReplyID);
326    mSeekReply.clear();
327    mSeekReplyID = 0;
328    return err;
329}
330
331void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
332    switch (msg->what()) {
333        case kWhatConnect:
334        {
335            onConnect(msg);
336            break;
337        }
338
339        case kWhatDisconnect:
340        {
341            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
342
343            if (mReconfigurationInProgress) {
344                break;
345            }
346
347            finishDisconnect();
348            break;
349        }
350
351        case kWhatSeek:
352        {
353            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
354
355            status_t err = onSeek(msg);
356
357            mSeekReply = new AMessage;
358            mSeekReply->setInt32("err", err);
359            break;
360        }
361
362        case kWhatFetcherNotify:
363        {
364            int32_t what;
365            CHECK(msg->findInt32("what", &what));
366
367            switch (what) {
368                case PlaylistFetcher::kWhatStarted:
369                    break;
370                case PlaylistFetcher::kWhatPaused:
371                case PlaylistFetcher::kWhatStopped:
372                {
373                    if (what == PlaylistFetcher::kWhatStopped) {
374                        AString uri;
375                        CHECK(msg->findString("uri", &uri));
376                        if (mFetcherInfos.removeItem(uri) < 0) {
377                            // ignore duplicated kWhatStopped messages.
378                            break;
379                        }
380
381                        if (mSwitchInProgress) {
382                            tryToFinishBandwidthSwitch();
383                        }
384                    }
385
386                    if (mContinuation != NULL) {
387                        CHECK_GT(mContinuationCounter, 0);
388                        if (--mContinuationCounter == 0) {
389                            mContinuation->post();
390
391                            if (mSeekReplyID != 0) {
392                                CHECK(mSeekReply != NULL);
393                                mSeekReply->postReply(mSeekReplyID);
394                            }
395                        }
396                    }
397                    break;
398                }
399
400                case PlaylistFetcher::kWhatDurationUpdate:
401                {
402                    AString uri;
403                    CHECK(msg->findString("uri", &uri));
404
405                    int64_t durationUs;
406                    CHECK(msg->findInt64("durationUs", &durationUs));
407
408                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
409                    info->mDurationUs = durationUs;
410                    break;
411                }
412
413                case PlaylistFetcher::kWhatError:
414                {
415                    status_t err;
416                    CHECK(msg->findInt32("err", &err));
417
418                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
419
420                    if (mInPreparationPhase) {
421                        postPrepared(err);
422                    }
423
424                    cancelBandwidthSwitch();
425
426                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
427
428                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
429
430                    mPacketSources.valueFor(
431                            STREAMTYPE_SUBTITLES)->signalEOS(err);
432
433                    sp<AMessage> notify = mNotify->dup();
434                    notify->setInt32("what", kWhatError);
435                    notify->setInt32("err", err);
436                    notify->post();
437                    break;
438                }
439
440                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
441                {
442                    AString uri;
443                    CHECK(msg->findString("uri", &uri));
444
445                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
446                    info->mIsPrepared = true;
447
448                    if (mInPreparationPhase) {
449                        bool allFetchersPrepared = true;
450                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
451                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
452                                allFetchersPrepared = false;
453                                break;
454                            }
455                        }
456
457                        if (allFetchersPrepared) {
458                            postPrepared(OK);
459                        }
460                    }
461                    break;
462                }
463
464                case PlaylistFetcher::kWhatStartedAt:
465                {
466                    int32_t switchGeneration;
467                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
468
469                    if (switchGeneration != mSwitchGeneration) {
470                        break;
471                    }
472
473                    // Resume fetcher for the original variant; the resumed fetcher should
474                    // continue until the timestamps found in msg, which is stored by the
475                    // new fetcher to indicate where the new variant has started buffering.
476                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
477                        const FetcherInfo info = mFetcherInfos.valueAt(i);
478                        if (info.mToBeRemoved) {
479                            info.mFetcher->resumeUntilAsync(msg);
480                        }
481                    }
482                    break;
483                }
484
485                default:
486                    TRESPASS();
487            }
488
489            break;
490        }
491
492        case kWhatCheckBandwidth:
493        {
494            int32_t generation;
495            CHECK(msg->findInt32("generation", &generation));
496
497            if (generation != mCheckBandwidthGeneration) {
498                break;
499            }
500
501            onCheckBandwidth();
502            break;
503        }
504
505        case kWhatChangeConfiguration:
506        {
507            onChangeConfiguration(msg);
508            break;
509        }
510
511        case kWhatChangeConfiguration2:
512        {
513            onChangeConfiguration2(msg);
514            break;
515        }
516
517        case kWhatChangeConfiguration3:
518        {
519            onChangeConfiguration3(msg);
520            break;
521        }
522
523        case kWhatFinishDisconnect2:
524        {
525            onFinishDisconnect2();
526            break;
527        }
528
529        case kWhatSwapped:
530        {
531            onSwapped(msg);
532            break;
533        }
534        default:
535            TRESPASS();
536            break;
537    }
538}
539
540// static
541int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
542    if (a->mBandwidth < b->mBandwidth) {
543        return -1;
544    } else if (a->mBandwidth == b->mBandwidth) {
545        return 0;
546    }
547
548    return 1;
549}
550
551// static
552LiveSession::StreamType LiveSession::indexToType(int idx) {
553    CHECK(idx >= 0 && idx < kMaxStreams);
554    return (StreamType)(1 << idx);
555}
556
557void LiveSession::onConnect(const sp<AMessage> &msg) {
558    AString url;
559    CHECK(msg->findString("url", &url));
560
561    KeyedVector<String8, String8> *headers = NULL;
562    if (!msg->findPointer("headers", (void **)&headers)) {
563        mExtraHeaders.clear();
564    } else {
565        mExtraHeaders = *headers;
566
567        delete headers;
568        headers = NULL;
569    }
570
571    // TODO currently we don't know if we are coming here from incognito mode
572    ALOGI("onConnect %s", uriDebugString(url).c_str());
573
574    mMasterURL = url;
575
576    bool dummy;
577    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
578
579    if (mPlaylist == NULL) {
580        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
581
582        postPrepared(ERROR_IO);
583        return;
584    }
585
586    // We trust the content provider to make a reasonable choice of preferred
587    // initial bandwidth by listing it first in the variant playlist.
588    // At startup we really don't have a good estimate on the available
589    // network bandwidth since we haven't tranferred any data yet. Once
590    // we have we can make a better informed choice.
591    size_t initialBandwidth = 0;
592    size_t initialBandwidthIndex = 0;
593
594    if (mPlaylist->isVariantPlaylist()) {
595        for (size_t i = 0; i < mPlaylist->size(); ++i) {
596            BandwidthItem item;
597
598            item.mPlaylistIndex = i;
599
600            sp<AMessage> meta;
601            AString uri;
602            mPlaylist->itemAt(i, &uri, &meta);
603
604            unsigned long bandwidth;
605            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
606
607            if (initialBandwidth == 0) {
608                initialBandwidth = item.mBandwidth;
609            }
610
611            mBandwidthItems.push(item);
612        }
613
614        CHECK_GT(mBandwidthItems.size(), 0u);
615
616        mBandwidthItems.sort(SortByBandwidth);
617
618        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
619            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
620                initialBandwidthIndex = i;
621                break;
622            }
623        }
624    } else {
625        // dummy item.
626        BandwidthItem item;
627        item.mPlaylistIndex = 0;
628        item.mBandwidth = 0;
629        mBandwidthItems.push(item);
630    }
631
632    mPlaylist->pickRandomMediaItems();
633    changeConfiguration(
634            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
635}
636
637void LiveSession::finishDisconnect() {
638    // No reconfiguration is currently pending, make sure none will trigger
639    // during disconnection either.
640    cancelCheckBandwidthEvent();
641
642    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
643    // (finishDisconnect, onFinishDisconnect2)
644    cancelBandwidthSwitch();
645
646    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
647        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
648    }
649
650    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
651
652    mContinuationCounter = mFetcherInfos.size();
653    mContinuation = msg;
654
655    if (mContinuationCounter == 0) {
656        msg->post();
657    }
658}
659
660void LiveSession::onFinishDisconnect2() {
661    mContinuation.clear();
662
663    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
664    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
665
666    mPacketSources.valueFor(
667            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
668
669    sp<AMessage> response = new AMessage;
670    response->setInt32("err", OK);
671
672    response->postReply(mDisconnectReplyID);
673    mDisconnectReplyID = 0;
674}
675
676sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
677    ssize_t index = mFetcherInfos.indexOfKey(uri);
678
679    if (index >= 0) {
680        return NULL;
681    }
682
683    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
684    notify->setString("uri", uri);
685    notify->setInt32("switchGeneration", mSwitchGeneration);
686
687    FetcherInfo info;
688    info.mFetcher = new PlaylistFetcher(notify, this, uri);
689    info.mDurationUs = -1ll;
690    info.mIsPrepared = false;
691    info.mToBeRemoved = false;
692    looper()->registerHandler(info.mFetcher);
693
694    mFetcherInfos.add(uri, info);
695
696    return info.mFetcher;
697}
698
699/*
700 * Illustration of parameters:
701 *
702 * 0      `range_offset`
703 * +------------+-------------------------------------------------------+--+--+
704 * |            |                                 | next block to fetch |  |  |
705 * |            | `source` handle => `out` buffer |                     |  |  |
706 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
707 * |            |<----------- `range_length` / buffer capacity ----------->|  |
708 * |<------------------------------ file_size ------------------------------->|
709 *
710 * Special parameter values:
711 * - range_length == -1 means entire file
712 * - block_size == 0 means entire range
713 *
714 */
715ssize_t LiveSession::fetchFile(
716        const char *url, sp<ABuffer> *out,
717        int64_t range_offset, int64_t range_length,
718        uint32_t block_size, /* download block size */
719        sp<DataSource> *source, /* to return and reuse source */
720        String8 *actualUrl) {
721    off64_t size;
722    sp<DataSource> temp_source;
723    if (source == NULL) {
724        source = &temp_source;
725    }
726
727    if (*source == NULL) {
728        if (!strncasecmp(url, "file://", 7)) {
729            *source = new FileSource(url + 7);
730        } else if (strncasecmp(url, "http://", 7)
731                && strncasecmp(url, "https://", 8)) {
732            return ERROR_UNSUPPORTED;
733        } else {
734            KeyedVector<String8, String8> headers = mExtraHeaders;
735            if (range_offset > 0 || range_length >= 0) {
736                headers.add(
737                        String8("Range"),
738                        String8(
739                            StringPrintf(
740                                "bytes=%lld-%s",
741                                range_offset,
742                                range_length < 0
743                                    ? "" : StringPrintf("%lld",
744                                            range_offset + range_length - 1).c_str()).c_str()));
745            }
746            status_t err = mHTTPDataSource->connect(url, &headers);
747
748            if (err != OK) {
749                return err;
750            }
751
752            *source = mHTTPDataSource;
753        }
754    }
755
756    status_t getSizeErr = (*source)->getSize(&size);
757    if (getSizeErr != OK) {
758        size = 65536;
759    }
760
761    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
762    if (*out == NULL) {
763        buffer->setRange(0, 0);
764    }
765
766    ssize_t bytesRead = 0;
767    // adjust range_length if only reading partial block
768    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
769        range_length = buffer->size() + block_size;
770    }
771    for (;;) {
772        // Only resize when we don't know the size.
773        size_t bufferRemaining = buffer->capacity() - buffer->size();
774        if (bufferRemaining == 0 && getSizeErr != OK) {
775            bufferRemaining = 32768;
776
777            ALOGV("increasing download buffer to %zu bytes",
778                 buffer->size() + bufferRemaining);
779
780            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
781            memcpy(copy->data(), buffer->data(), buffer->size());
782            copy->setRange(0, buffer->size());
783
784            buffer = copy;
785        }
786
787        size_t maxBytesToRead = bufferRemaining;
788        if (range_length >= 0) {
789            int64_t bytesLeftInRange = range_length - buffer->size();
790            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
791                maxBytesToRead = bytesLeftInRange;
792
793                if (bytesLeftInRange == 0) {
794                    break;
795                }
796            }
797        }
798
799        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
800        // to help us break out of the loop.
801        ssize_t n = (*source)->readAt(
802                buffer->size(), buffer->data() + buffer->size(),
803                maxBytesToRead);
804
805        if (n < 0) {
806            return n;
807        }
808
809        if (n == 0) {
810            break;
811        }
812
813        buffer->setRange(0, buffer->size() + (size_t)n);
814        bytesRead += n;
815    }
816
817    *out = buffer;
818    if (actualUrl != NULL) {
819        *actualUrl = (*source)->getUri();
820        if (actualUrl->isEmpty()) {
821            *actualUrl = url;
822        }
823    }
824
825    return bytesRead;
826}
827
828sp<M3UParser> LiveSession::fetchPlaylist(
829        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
830    ALOGV("fetchPlaylist '%s'", url);
831
832    *unchanged = false;
833
834    sp<ABuffer> buffer;
835    String8 actualUrl;
836    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
837
838    if (err <= 0) {
839        return NULL;
840    }
841
842    // MD5 functionality is not available on the simulator, treat all
843    // playlists as changed.
844
845#if defined(HAVE_ANDROID_OS)
846    uint8_t hash[16];
847
848    MD5_CTX m;
849    MD5_Init(&m);
850    MD5_Update(&m, buffer->data(), buffer->size());
851
852    MD5_Final(hash, &m);
853
854    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
855        // playlist unchanged
856        *unchanged = true;
857
858        return NULL;
859    }
860
861    if (curPlaylistHash != NULL) {
862        memcpy(curPlaylistHash, hash, sizeof(hash));
863    }
864#endif
865
866    sp<M3UParser> playlist =
867        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
868
869    if (playlist->initCheck() != OK) {
870        ALOGE("failed to parse .m3u8 playlist");
871
872        return NULL;
873    }
874
875    return playlist;
876}
877
878static double uniformRand() {
879    return (double)rand() / RAND_MAX;
880}
881
882size_t LiveSession::getBandwidthIndex() {
883    if (mBandwidthItems.size() == 0) {
884        return 0;
885    }
886
887#if 1
888    char value[PROPERTY_VALUE_MAX];
889    ssize_t index = -1;
890    if (property_get("media.httplive.bw-index", value, NULL)) {
891        char *end;
892        index = strtol(value, &end, 10);
893        CHECK(end > value && *end == '\0');
894
895        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
896            index = mBandwidthItems.size() - 1;
897        }
898    }
899
900    if (index < 0) {
901        int32_t bandwidthBps;
902        if (mHTTPDataSource != NULL
903                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
904            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
905        } else {
906            ALOGV("no bandwidth estimate.");
907            return 0;  // Pick the lowest bandwidth stream by default.
908        }
909
910        char value[PROPERTY_VALUE_MAX];
911        if (property_get("media.httplive.max-bw", value, NULL)) {
912            char *end;
913            long maxBw = strtoul(value, &end, 10);
914            if (end > value && *end == '\0') {
915                if (maxBw > 0 && bandwidthBps > maxBw) {
916                    ALOGV("bandwidth capped to %ld bps", maxBw);
917                    bandwidthBps = maxBw;
918                }
919            }
920        }
921
922        // Consider only 80% of the available bandwidth usable.
923        bandwidthBps = (bandwidthBps * 8) / 10;
924
925        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
926
927        index = mBandwidthItems.size() - 1;
928        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
929                                > (size_t)bandwidthBps) {
930            --index;
931        }
932    }
933#elif 0
934    // Change bandwidth at random()
935    size_t index = uniformRand() * mBandwidthItems.size();
936#elif 0
937    // There's a 50% chance to stay on the current bandwidth and
938    // a 50% chance to switch to the next higher bandwidth (wrapping around
939    // to lowest)
940    const size_t kMinIndex = 0;
941
942    static ssize_t mCurBandwidthIndex = -1;
943
944    size_t index;
945    if (mCurBandwidthIndex < 0) {
946        index = kMinIndex;
947    } else if (uniformRand() < 0.5) {
948        index = (size_t)mCurBandwidthIndex;
949    } else {
950        index = mCurBandwidthIndex + 1;
951        if (index == mBandwidthItems.size()) {
952            index = kMinIndex;
953        }
954    }
955    mCurBandwidthIndex = index;
956#elif 0
957    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
958
959    size_t index = mBandwidthItems.size() - 1;
960    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
961        --index;
962    }
963#elif 1
964    char value[PROPERTY_VALUE_MAX];
965    size_t index;
966    if (property_get("media.httplive.bw-index", value, NULL)) {
967        char *end;
968        index = strtoul(value, &end, 10);
969        CHECK(end > value && *end == '\0');
970
971        if (index >= mBandwidthItems.size()) {
972            index = mBandwidthItems.size() - 1;
973        }
974    } else {
975        index = 0;
976    }
977#else
978    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
979#endif
980
981    CHECK_GE(index, 0);
982
983    return index;
984}
985
986status_t LiveSession::onSeek(const sp<AMessage> &msg) {
987    int64_t timeUs;
988    CHECK(msg->findInt64("timeUs", &timeUs));
989
990    if (!mReconfigurationInProgress) {
991        changeConfiguration(timeUs, getBandwidthIndex());
992    }
993
994    return OK;
995}
996
997status_t LiveSession::getDuration(int64_t *durationUs) const {
998    int64_t maxDurationUs = 0ll;
999    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1000        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1001
1002        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1003            maxDurationUs = fetcherDurationUs;
1004        }
1005    }
1006
1007    *durationUs = maxDurationUs;
1008
1009    return OK;
1010}
1011
1012bool LiveSession::isSeekable() const {
1013    int64_t durationUs;
1014    return getDuration(&durationUs) == OK && durationUs >= 0;
1015}
1016
1017bool LiveSession::hasDynamicDuration() const {
1018    return false;
1019}
1020
1021size_t LiveSession::getTrackCount() const {
1022    return mPlaylist->getTrackCount();
1023}
1024
1025sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1026    return mPlaylist->getTrackInfo(trackIndex);
1027}
1028
1029status_t LiveSession::selectTrack(size_t index, bool select) {
1030    status_t err = mPlaylist->selectTrack(index, select);
1031    if (err == OK) {
1032        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1033        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1034        msg->setInt32("pickTrack", select);
1035        msg->post();
1036    }
1037    return err;
1038}
1039
1040bool LiveSession::canSwitchUp() {
1041    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1042    status_t err = OK;
1043    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1044        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1045        int64_t dur = source->getBufferedDurationUs(&err);
1046        if (err == OK && dur > 10000000) {
1047            return true;
1048        }
1049    }
1050    return false;
1051}
1052
1053void LiveSession::changeConfiguration(
1054        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1055    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1056    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1057    cancelBandwidthSwitch();
1058
1059    CHECK(!mReconfigurationInProgress);
1060    mReconfigurationInProgress = true;
1061
1062    mCurBandwidthIndex = bandwidthIndex;
1063
1064    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1065          timeUs, bandwidthIndex, pickTrack);
1066
1067    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1068    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1069
1070    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1071    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1072
1073    AString URIs[kMaxStreams];
1074    for (size_t i = 0; i < kMaxStreams; ++i) {
1075        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1076            streamMask |= indexToType(i);
1077        }
1078    }
1079
1080    // Step 1, stop and discard fetchers that are no longer needed.
1081    // Pause those that we'll reuse.
1082    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1083        const AString &uri = mFetcherInfos.keyAt(i);
1084
1085        bool discardFetcher = true;
1086
1087        // If we're seeking all current fetchers are discarded.
1088        if (timeUs < 0ll) {
1089            // delay fetcher removal if not picking tracks
1090            discardFetcher = pickTrack;
1091
1092            for (size_t j = 0; j < kMaxStreams; ++j) {
1093                StreamType type = indexToType(j);
1094                if ((streamMask & type) && uri == URIs[j]) {
1095                    resumeMask |= type;
1096                    streamMask &= ~type;
1097                    discardFetcher = false;
1098                }
1099            }
1100        }
1101
1102        if (discardFetcher) {
1103            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1104        } else {
1105            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1106        }
1107    }
1108
1109    sp<AMessage> msg;
1110    if (timeUs < 0ll) {
1111        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1112        msg = new AMessage(kWhatChangeConfiguration3, id());
1113    } else {
1114        msg = new AMessage(kWhatChangeConfiguration2, id());
1115    }
1116    msg->setInt32("streamMask", streamMask);
1117    msg->setInt32("resumeMask", resumeMask);
1118    msg->setInt32("pickTrack", pickTrack);
1119    msg->setInt64("timeUs", timeUs);
1120    for (size_t i = 0; i < kMaxStreams; ++i) {
1121        if ((streamMask | resumeMask) & indexToType(i)) {
1122            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1123        }
1124    }
1125
1126    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1127    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1128    // fetchers have completed their asynchronous operation, we'll post
1129    // mContinuation, which then is handled below in onChangeConfiguration2.
1130    mContinuationCounter = mFetcherInfos.size();
1131    mContinuation = msg;
1132
1133    if (mContinuationCounter == 0) {
1134        msg->post();
1135
1136        if (mSeekReplyID != 0) {
1137            CHECK(mSeekReply != NULL);
1138            mSeekReply->postReply(mSeekReplyID);
1139        }
1140    }
1141}
1142
1143void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1144    if (!mReconfigurationInProgress) {
1145        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1146        msg->findInt32("pickTrack", &pickTrack);
1147        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1148        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1149    } else {
1150        msg->post(1000000ll); // retry in 1 sec
1151    }
1152}
1153
1154void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1155    mContinuation.clear();
1156
1157    // All fetchers are either suspended or have been removed now.
1158
1159    uint32_t streamMask, resumeMask;
1160    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1161    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1162
1163    // currently onChangeConfiguration2 is only called for seeking;
1164    // remove the following CHECK if using it else where.
1165    CHECK_EQ(resumeMask, 0);
1166    streamMask |= resumeMask;
1167
1168    AString URIs[kMaxStreams];
1169    for (size_t i = 0; i < kMaxStreams; ++i) {
1170        if (streamMask & indexToType(i)) {
1171            const AString &uriKey = mStreams[i].uriKey();
1172            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1173            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1174        }
1175    }
1176
1177    // Determine which decoders to shutdown on the player side,
1178    // a decoder has to be shutdown if either
1179    // 1) its streamtype was active before but now longer isn't.
1180    // or
1181    // 2) its streamtype was already active and still is but the URI
1182    //    has changed.
1183    uint32_t changedMask = 0;
1184    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1185        if (((mStreamMask & streamMask & indexToType(i))
1186                && !(URIs[i] == mStreams[i].mUri))
1187                || (mStreamMask & ~streamMask & indexToType(i))) {
1188            changedMask |= indexToType(i);
1189        }
1190    }
1191
1192    if (changedMask == 0) {
1193        // If nothing changed as far as the audio/video decoders
1194        // are concerned we can proceed.
1195        onChangeConfiguration3(msg);
1196        return;
1197    }
1198
1199    // Something changed, inform the player which will shutdown the
1200    // corresponding decoders and will post the reply once that's done.
1201    // Handling the reply will continue executing below in
1202    // onChangeConfiguration3.
1203    sp<AMessage> notify = mNotify->dup();
1204    notify->setInt32("what", kWhatStreamsChanged);
1205    notify->setInt32("changedMask", changedMask);
1206
1207    msg->setWhat(kWhatChangeConfiguration3);
1208    msg->setTarget(id());
1209
1210    notify->setMessage("reply", msg);
1211    notify->post();
1212}
1213
1214void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1215    mContinuation.clear();
1216    // All remaining fetchers are still suspended, the player has shutdown
1217    // any decoders that needed it.
1218
1219    uint32_t streamMask, resumeMask;
1220    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1221    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1222
1223    for (size_t i = 0; i < kMaxStreams; ++i) {
1224        if (streamMask & indexToType(i)) {
1225            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1226        }
1227    }
1228
1229    int64_t timeUs;
1230    int32_t pickTrack;
1231    bool switching = false;
1232    CHECK(msg->findInt64("timeUs", &timeUs));
1233    CHECK(msg->findInt32("pickTrack", &pickTrack));
1234
1235    if (timeUs < 0ll) {
1236        if (!pickTrack) {
1237            switching = true;
1238        }
1239        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1240    } else {
1241        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1242    }
1243
1244    mNewStreamMask = streamMask | resumeMask;
1245
1246    // Of all existing fetchers:
1247    // * Resume fetchers that are still needed and assign them original packet sources.
1248    // * Mark otherwise unneeded fetchers for removal.
1249    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1250    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1251        const AString &uri = mFetcherInfos.keyAt(i);
1252
1253        sp<AnotherPacketSource> sources[kMaxStreams];
1254        for (size_t j = 0; j < kMaxStreams; ++j) {
1255            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1256                sources[j] = mPacketSources.valueFor(indexToType(j));
1257
1258                if (j != kSubtitleIndex) {
1259                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1260                    sp<AnotherPacketSource> discontinuityQueue;
1261                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1262                    discontinuityQueue->queueDiscontinuity(
1263                            ATSParser::DISCONTINUITY_NONE,
1264                            NULL,
1265                            true);
1266                }
1267            }
1268        }
1269
1270        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1271        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1272                || sources[kSubtitleIndex] != NULL) {
1273            info.mFetcher->startAsync(
1274                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1275        } else {
1276            info.mToBeRemoved = true;
1277        }
1278    }
1279
1280    // streamMask now only contains the types that need a new fetcher created.
1281
1282    if (streamMask != 0) {
1283        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1284    }
1285
1286    // Find out when the original fetchers have buffered up to and start the new fetchers
1287    // at a later timestamp.
1288    for (size_t i = 0; i < kMaxStreams; i++) {
1289        if (!(indexToType(i) & streamMask)) {
1290            continue;
1291        }
1292
1293        AString uri;
1294        uri = mStreams[i].mUri;
1295
1296        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1297        CHECK(fetcher != NULL);
1298
1299        int32_t latestSeq = -1;
1300        int64_t startTimeUs = -1;
1301        int64_t segmentStartTimeUs = -1ll;
1302        int32_t discontinuitySeq = -1;
1303        sp<AnotherPacketSource> sources[kMaxStreams];
1304
1305        // TRICKY: looping from i as earlier streams are already removed from streamMask
1306        for (size_t j = i; j < kMaxStreams; ++j) {
1307            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1308                sources[j] = mPacketSources.valueFor(indexToType(j));
1309
1310                if (timeUs >= 0) {
1311                    sources[j]->clear();
1312                    startTimeUs = timeUs;
1313
1314                    sp<AnotherPacketSource> discontinuityQueue;
1315                    sp<AMessage> extra = new AMessage;
1316                    extra->setInt64("timeUs", timeUs);
1317                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1318                    discontinuityQueue->queueDiscontinuity(
1319                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1320                } else {
1321                    int32_t type;
1322                    int64_t srcSegmentStartTimeUs;
1323                    sp<AMessage> meta;
1324                    if (pickTrack) {
1325                        // selecting
1326                        meta = sources[j]->getLatestDequeuedMeta();
1327                    } else {
1328                        // adapting
1329                        meta = sources[j]->getLatestEnqueuedMeta();
1330                    }
1331
1332                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1333                        int64_t tmpUs;
1334                        CHECK(meta->findInt64("timeUs", &tmpUs));
1335                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1336                            startTimeUs = tmpUs;
1337                        }
1338
1339                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1340                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1341                            segmentStartTimeUs = tmpUs;
1342                        }
1343
1344                        int32_t seq;
1345                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1346                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1347                            discontinuitySeq = seq;
1348                        }
1349                    }
1350
1351                    if (pickTrack) {
1352                        // selecting track, queue discontinuities before content
1353                        sources[j]->clear();
1354                        if (j == kSubtitleIndex) {
1355                            break;
1356                        }
1357                        sp<AnotherPacketSource> discontinuityQueue;
1358                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1359                        discontinuityQueue->queueDiscontinuity(
1360                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1361                    } else {
1362                        // adapting, queue discontinuities after resume
1363                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1364                        sources[j]->clear();
1365                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1366                        if (extraStreams & indexToType(j)) {
1367                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1368                        }
1369                    }
1370                }
1371
1372                streamMask &= ~indexToType(j);
1373            }
1374        }
1375
1376        fetcher->startAsync(
1377                sources[kAudioIndex],
1378                sources[kVideoIndex],
1379                sources[kSubtitleIndex],
1380                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1381                segmentStartTimeUs,
1382                discontinuitySeq,
1383                switching);
1384    }
1385
1386    // All fetchers have now been started, the configuration change
1387    // has completed.
1388
1389    scheduleCheckBandwidthEvent();
1390
1391    ALOGV("XXX configuration change completed.");
1392    mReconfigurationInProgress = false;
1393    if (switching) {
1394        mSwitchInProgress = true;
1395        mSwapMask = streamMask;
1396    } else {
1397        mStreamMask = mNewStreamMask;
1398    }
1399
1400    if (mDisconnectReplyID != 0) {
1401        finishDisconnect();
1402    }
1403}
1404
1405void LiveSession::onSwapped(const sp<AMessage> &msg) {
1406    int32_t switchGeneration;
1407    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1408    if (switchGeneration != mSwitchGeneration) {
1409        return;
1410    }
1411
1412    int32_t stream;
1413    CHECK(msg->findInt32("stream", &stream));
1414    mSwapMask &= ~stream;
1415    if (mSwapMask != 0) {
1416        return;
1417    }
1418
1419    // Check if new variant contains extra streams.
1420    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1421    while (extraStreams) {
1422        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1423        swapPacketSource(extraStream);
1424        extraStreams &= ~extraStream;
1425    }
1426
1427    tryToFinishBandwidthSwitch();
1428}
1429
1430// Mark switch done when:
1431//   1. all old buffers are swapped out
1432void LiveSession::tryToFinishBandwidthSwitch() {
1433    if (!mSwitchInProgress) {
1434        return;
1435    }
1436
1437    bool needToRemoveFetchers = false;
1438    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1439        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1440            needToRemoveFetchers = true;
1441            break;
1442        }
1443    }
1444
1445    if (!needToRemoveFetchers && mSwapMask == 0) {
1446        ALOGI("mSwitchInProgress = false");
1447        mStreamMask = mNewStreamMask;
1448        mSwitchInProgress = false;
1449    }
1450}
1451
1452void LiveSession::scheduleCheckBandwidthEvent() {
1453    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1454    msg->setInt32("generation", mCheckBandwidthGeneration);
1455    msg->post(10000000ll);
1456}
1457
1458void LiveSession::cancelCheckBandwidthEvent() {
1459    ++mCheckBandwidthGeneration;
1460}
1461
1462void LiveSession::cancelBandwidthSwitch() {
1463    Mutex::Autolock lock(mSwapMutex);
1464    mSwitchGeneration++;
1465    mSwitchInProgress = false;
1466    mSwapMask = 0;
1467}
1468
1469bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1470    if (mReconfigurationInProgress || mSwitchInProgress) {
1471        return false;
1472    }
1473
1474    if (mCurBandwidthIndex < 0) {
1475        return true;
1476    }
1477
1478    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1479        return false;
1480    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1481        return canSwitchUp();
1482    } else {
1483        return true;
1484    }
1485}
1486
1487void LiveSession::onCheckBandwidth() {
1488    size_t bandwidthIndex = getBandwidthIndex();
1489    if (canSwitchBandwidthTo(bandwidthIndex)) {
1490        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1491    } else {
1492        scheduleCheckBandwidthEvent();
1493    }
1494
1495    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1496    // schedule another one on return, only an explicit call to
1497    // scheduleCheckBandwidthEvent will do that.
1498    // This ensures that only one configuration change is ongoing at any
1499    // one time, once that completes it'll schedule another check bandwidth
1500    // event.
1501}
1502
1503void LiveSession::postPrepared(status_t err) {
1504    CHECK(mInPreparationPhase);
1505
1506    sp<AMessage> notify = mNotify->dup();
1507    if (err == OK || err == ERROR_END_OF_STREAM) {
1508        notify->setInt32("what", kWhatPrepared);
1509    } else {
1510        notify->setInt32("what", kWhatPreparationFailed);
1511        notify->setInt32("err", err);
1512    }
1513
1514    notify->post();
1515
1516    mInPreparationPhase = false;
1517}
1518
1519}  // namespace android
1520
1521