LiveSession.cpp revision b44ce2f84691559672cfaf6bb8fd3a9ac43904f2
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mCurBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mSubtitleGeneration(0),
67      mLastDequeuedTimeUs(0ll),
68      mRealTimeBaseUs(0ll),
69      mReconfigurationInProgress(false),
70      mSwitchInProgress(false),
71      mDisconnectReplyID(0),
72      mSeekReplyID(0),
73      mFirstTimeUsValid(false),
74      mFirstTimeUs(0),
75      mLastSeekTimeUs(0) {
76
77    mStreams[kAudioIndex] = StreamItem("audio");
78    mStreams[kVideoIndex] = StreamItem("video");
79    mStreams[kSubtitleIndex] = StreamItem("subtitles");
80
81    for (size_t i = 0; i < kMaxStreams; ++i) {
82        mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
84        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
85        mBuffering[i] = false;
86    }
87}
88
89LiveSession::~LiveSession() {
90}
91
92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
93    ABuffer *discontinuity = new ABuffer(0);
94    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
95    discontinuity->meta()->setInt32("swapPacketSource", swap);
96    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
97    discontinuity->meta()->setInt64("timeUs", -1);
98    return discontinuity;
99}
100
101void LiveSession::swapPacketSource(StreamType stream) {
102    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
103    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
104    sp<AnotherPacketSource> tmp = aps;
105    aps = aps2;
106    aps2 = tmp;
107    aps2->clear();
108}
109
110status_t LiveSession::dequeueAccessUnit(
111        StreamType stream, sp<ABuffer> *accessUnit) {
112    if (!(mStreamMask & stream)) {
113        // return -EWOULDBLOCK to avoid halting the decoder
114        // when switching between audio/video and audio only.
115        return -EWOULDBLOCK;
116    }
117
118    status_t finalResult;
119    sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
120    if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
121        discontinuityQueue->dequeueAccessUnit(accessUnit);
122        // seeking, track switching
123        sp<AMessage> extra;
124        int64_t timeUs;
125        if ((*accessUnit)->meta()->findMessage("extra", &extra)
126                && extra != NULL
127                && extra->findInt64("timeUs", &timeUs)) {
128            // seeking only
129            mLastSeekTimeUs = timeUs;
130            mDiscontinuityOffsetTimesUs.clear();
131            mDiscontinuityAbsStartTimesUs.clear();
132        }
133        return INFO_DISCONTINUITY;
134    }
135
136    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
137
138    ssize_t idx = typeToIndex(stream);
139    if (!packetSource->hasBufferAvailable(&finalResult)) {
140        if (finalResult == OK) {
141            mBuffering[idx] = true;
142            return -EAGAIN;
143        } else {
144            return finalResult;
145        }
146    }
147
148    if (mBuffering[idx]) {
149        if (mSwitchInProgress
150                || packetSource->isFinished(0)
151                || packetSource->getEstimatedDurationUs() > 10000000ll) {
152            mBuffering[idx] = false;
153        }
154    }
155
156    if (mBuffering[idx]) {
157        return -EAGAIN;
158    }
159
160    // wait for counterpart
161    sp<AnotherPacketSource> otherSource;
162    uint32_t mask = mNewStreamMask & mStreamMask;
163    uint32_t fetchersMask  = 0;
164    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
165        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
166        fetchersMask |= fetcherMask;
167    }
168    mask &= fetchersMask;
169    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
170        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
171    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
172        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
173    }
174    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
175        return finalResult == OK ? -EAGAIN : finalResult;
176    }
177
178    status_t err = packetSource->dequeueAccessUnit(accessUnit);
179
180    size_t streamIdx;
181    const char *streamStr;
182    switch (stream) {
183        case STREAMTYPE_AUDIO:
184            streamIdx = kAudioIndex;
185            streamStr = "audio";
186            break;
187        case STREAMTYPE_VIDEO:
188            streamIdx = kVideoIndex;
189            streamStr = "video";
190            break;
191        case STREAMTYPE_SUBTITLES:
192            streamIdx = kSubtitleIndex;
193            streamStr = "subs";
194            break;
195        default:
196            TRESPASS();
197    }
198
199    StreamItem& strm = mStreams[streamIdx];
200    if (err == INFO_DISCONTINUITY) {
201        // adaptive streaming, discontinuities in the playlist
202        int32_t type;
203        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
204
205        sp<AMessage> extra;
206        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
207            extra.clear();
208        }
209
210        ALOGI("[%s] read discontinuity of type %d, extra = %s",
211              streamStr,
212              type,
213              extra == NULL ? "NULL" : extra->debugString().c_str());
214
215        int32_t swap;
216        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
217            int32_t switchGeneration;
218            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
219            {
220                Mutex::Autolock lock(mSwapMutex);
221                if (switchGeneration == mSwitchGeneration) {
222                    swapPacketSource(stream);
223                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
224                    msg->setInt32("stream", stream);
225                    msg->setInt32("switchGeneration", switchGeneration);
226                    msg->post();
227                }
228            }
229        } else {
230            size_t seq = strm.mCurDiscontinuitySeq;
231            int64_t offsetTimeUs;
232            if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
233                offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
234            } else {
235                offsetTimeUs = 0;
236            }
237
238            seq += 1;
239            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
240                int64_t firstTimeUs;
241                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
242                offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
243                offsetTimeUs += strm.mLastSampleDurationUs;
244            } else {
245                offsetTimeUs += strm.mLastSampleDurationUs;
246            }
247
248            mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
249        }
250    } else if (err == OK) {
251
252        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
253            int64_t timeUs;
254            int32_t discontinuitySeq = 0;
255            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
256            (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
257            strm.mCurDiscontinuitySeq = discontinuitySeq;
258
259            int32_t discard = 0;
260            int64_t firstTimeUs;
261            if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
262                int64_t durUs; // approximate sample duration
263                if (timeUs > strm.mLastDequeuedTimeUs) {
264                    durUs = timeUs - strm.mLastDequeuedTimeUs;
265                } else {
266                    durUs = strm.mLastDequeuedTimeUs - timeUs;
267                }
268                strm.mLastSampleDurationUs = durUs;
269                firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
270            } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
271                firstTimeUs = timeUs;
272            } else {
273                mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
274                firstTimeUs = timeUs;
275            }
276
277            strm.mLastDequeuedTimeUs = timeUs;
278            if (timeUs >= firstTimeUs) {
279                timeUs -= firstTimeUs;
280            } else {
281                timeUs = 0;
282            }
283            timeUs += mLastSeekTimeUs;
284            if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
285                timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
286            }
287
288            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
289            (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
290            mLastDequeuedTimeUs = timeUs;
291            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
292        } else if (stream == STREAMTYPE_SUBTITLES) {
293            int32_t subtitleGeneration;
294            if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
295                    && subtitleGeneration != mSubtitleGeneration) {
296               return -EAGAIN;
297            };
298            (*accessUnit)->meta()->setInt32(
299                    "trackIndex", mPlaylist->getSelectedIndex());
300            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
301        }
302    } else {
303        ALOGI("[%s] encountered error %d", streamStr, err);
304    }
305
306    return err;
307}
308
309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
310    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
311    if (!(mStreamMask & stream)) {
312        return UNKNOWN_ERROR;
313    }
314
315    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
316
317    sp<MetaData> meta = packetSource->getFormat();
318
319    if (meta == NULL) {
320        return -EAGAIN;
321    }
322
323    return convertMetaDataToMessage(meta, format);
324}
325
326void LiveSession::connectAsync(
327        const char *url, const KeyedVector<String8, String8> *headers) {
328    sp<AMessage> msg = new AMessage(kWhatConnect, id());
329    msg->setString("url", url);
330
331    if (headers != NULL) {
332        msg->setPointer(
333                "headers",
334                new KeyedVector<String8, String8>(*headers));
335    }
336
337    msg->post();
338}
339
340status_t LiveSession::disconnect() {
341    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
342
343    sp<AMessage> response;
344    status_t err = msg->postAndAwaitResponse(&response);
345
346    return err;
347}
348
349status_t LiveSession::seekTo(int64_t timeUs) {
350    sp<AMessage> msg = new AMessage(kWhatSeek, id());
351    msg->setInt64("timeUs", timeUs);
352
353    sp<AMessage> response;
354    status_t err = msg->postAndAwaitResponse(&response);
355
356    uint32_t replyID;
357    CHECK(response == mSeekReply && 0 != mSeekReplyID);
358    mSeekReply.clear();
359    mSeekReplyID = 0;
360    return err;
361}
362
363void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
364    switch (msg->what()) {
365        case kWhatConnect:
366        {
367            onConnect(msg);
368            break;
369        }
370
371        case kWhatDisconnect:
372        {
373            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
374
375            if (mReconfigurationInProgress) {
376                break;
377            }
378
379            finishDisconnect();
380            break;
381        }
382
383        case kWhatSeek:
384        {
385            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
386
387            status_t err = onSeek(msg);
388
389            mSeekReply = new AMessage;
390            mSeekReply->setInt32("err", err);
391            break;
392        }
393
394        case kWhatFetcherNotify:
395        {
396            int32_t what;
397            CHECK(msg->findInt32("what", &what));
398
399            switch (what) {
400                case PlaylistFetcher::kWhatStarted:
401                    break;
402                case PlaylistFetcher::kWhatPaused:
403                case PlaylistFetcher::kWhatStopped:
404                {
405                    if (what == PlaylistFetcher::kWhatStopped) {
406                        AString uri;
407                        CHECK(msg->findString("uri", &uri));
408                        if (mFetcherInfos.removeItem(uri) < 0) {
409                            // ignore duplicated kWhatStopped messages.
410                            break;
411                        }
412
413                        if (mSwitchInProgress) {
414                            tryToFinishBandwidthSwitch();
415                        }
416                    }
417
418                    if (mContinuation != NULL) {
419                        CHECK_GT(mContinuationCounter, 0);
420                        if (--mContinuationCounter == 0) {
421                            mContinuation->post();
422
423                            if (mSeekReplyID != 0) {
424                                CHECK(mSeekReply != NULL);
425                                mSeekReply->postReply(mSeekReplyID);
426                            }
427                        }
428                    }
429                    break;
430                }
431
432                case PlaylistFetcher::kWhatDurationUpdate:
433                {
434                    AString uri;
435                    CHECK(msg->findString("uri", &uri));
436
437                    int64_t durationUs;
438                    CHECK(msg->findInt64("durationUs", &durationUs));
439
440                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
441                    info->mDurationUs = durationUs;
442                    break;
443                }
444
445                case PlaylistFetcher::kWhatError:
446                {
447                    status_t err;
448                    CHECK(msg->findInt32("err", &err));
449
450                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
451
452                    // handle EOS on subtitle tracks independently
453                    AString uri;
454                    if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
455                        ssize_t i = mFetcherInfos.indexOfKey(uri);
456                        if (i >= 0) {
457                            const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
458                            if (fetcher != NULL) {
459                                uint32_t type = fetcher->getStreamTypeMask();
460                                if (type == STREAMTYPE_SUBTITLES) {
461                                    mPacketSources.valueFor(
462                                            STREAMTYPE_SUBTITLES)->signalEOS(err);;
463                                    break;
464                                }
465                            }
466                        }
467                    }
468
469                    if (mInPreparationPhase) {
470                        postPrepared(err);
471                    }
472
473                    cancelBandwidthSwitch();
474
475                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
476
477                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
478
479                    mPacketSources.valueFor(
480                            STREAMTYPE_SUBTITLES)->signalEOS(err);
481
482                    sp<AMessage> notify = mNotify->dup();
483                    notify->setInt32("what", kWhatError);
484                    notify->setInt32("err", err);
485                    notify->post();
486                    break;
487                }
488
489                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
490                {
491                    AString uri;
492                    CHECK(msg->findString("uri", &uri));
493
494                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
495                    info->mIsPrepared = true;
496
497                    if (mInPreparationPhase) {
498                        bool allFetchersPrepared = true;
499                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
500                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
501                                allFetchersPrepared = false;
502                                break;
503                            }
504                        }
505
506                        if (allFetchersPrepared) {
507                            postPrepared(OK);
508                        }
509                    }
510                    break;
511                }
512
513                case PlaylistFetcher::kWhatStartedAt:
514                {
515                    int32_t switchGeneration;
516                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
517
518                    if (switchGeneration != mSwitchGeneration) {
519                        break;
520                    }
521
522                    // Resume fetcher for the original variant; the resumed fetcher should
523                    // continue until the timestamps found in msg, which is stored by the
524                    // new fetcher to indicate where the new variant has started buffering.
525                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
526                        const FetcherInfo info = mFetcherInfos.valueAt(i);
527                        if (info.mToBeRemoved) {
528                            info.mFetcher->resumeUntilAsync(msg);
529                        }
530                    }
531                    break;
532                }
533
534                default:
535                    TRESPASS();
536            }
537
538            break;
539        }
540
541        case kWhatCheckBandwidth:
542        {
543            int32_t generation;
544            CHECK(msg->findInt32("generation", &generation));
545
546            if (generation != mCheckBandwidthGeneration) {
547                break;
548            }
549
550            onCheckBandwidth(msg);
551            break;
552        }
553
554        case kWhatChangeConfiguration:
555        {
556            onChangeConfiguration(msg);
557            break;
558        }
559
560        case kWhatChangeConfiguration2:
561        {
562            onChangeConfiguration2(msg);
563            break;
564        }
565
566        case kWhatChangeConfiguration3:
567        {
568            onChangeConfiguration3(msg);
569            break;
570        }
571
572        case kWhatFinishDisconnect2:
573        {
574            onFinishDisconnect2();
575            break;
576        }
577
578        case kWhatSwapped:
579        {
580            onSwapped(msg);
581            break;
582        }
583
584        case kWhatCheckSwitchDown:
585        {
586            onCheckSwitchDown();
587            break;
588        }
589
590        case kWhatSwitchDown:
591        {
592            onSwitchDown();
593            break;
594        }
595
596        default:
597            TRESPASS();
598            break;
599    }
600}
601
602// static
603int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
604    if (a->mBandwidth < b->mBandwidth) {
605        return -1;
606    } else if (a->mBandwidth == b->mBandwidth) {
607        return 0;
608    }
609
610    return 1;
611}
612
613// static
614LiveSession::StreamType LiveSession::indexToType(int idx) {
615    CHECK(idx >= 0 && idx < kMaxStreams);
616    return (StreamType)(1 << idx);
617}
618
619// static
620ssize_t LiveSession::typeToIndex(int32_t type) {
621    switch (type) {
622        case STREAMTYPE_AUDIO:
623            return 0;
624        case STREAMTYPE_VIDEO:
625            return 1;
626        case STREAMTYPE_SUBTITLES:
627            return 2;
628        default:
629            return -1;
630    };
631    return -1;
632}
633
634void LiveSession::onConnect(const sp<AMessage> &msg) {
635    AString url;
636    CHECK(msg->findString("url", &url));
637
638    KeyedVector<String8, String8> *headers = NULL;
639    if (!msg->findPointer("headers", (void **)&headers)) {
640        mExtraHeaders.clear();
641    } else {
642        mExtraHeaders = *headers;
643
644        delete headers;
645        headers = NULL;
646    }
647
648    // TODO currently we don't know if we are coming here from incognito mode
649    ALOGI("onConnect %s", uriDebugString(url).c_str());
650
651    mMasterURL = url;
652
653    bool dummy;
654    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
655
656    if (mPlaylist == NULL) {
657        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
658
659        postPrepared(ERROR_IO);
660        return;
661    }
662
663    // We trust the content provider to make a reasonable choice of preferred
664    // initial bandwidth by listing it first in the variant playlist.
665    // At startup we really don't have a good estimate on the available
666    // network bandwidth since we haven't tranferred any data yet. Once
667    // we have we can make a better informed choice.
668    size_t initialBandwidth = 0;
669    size_t initialBandwidthIndex = 0;
670
671    if (mPlaylist->isVariantPlaylist()) {
672        for (size_t i = 0; i < mPlaylist->size(); ++i) {
673            BandwidthItem item;
674
675            item.mPlaylistIndex = i;
676
677            sp<AMessage> meta;
678            AString uri;
679            mPlaylist->itemAt(i, &uri, &meta);
680
681            unsigned long bandwidth;
682            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
683
684            if (initialBandwidth == 0) {
685                initialBandwidth = item.mBandwidth;
686            }
687
688            mBandwidthItems.push(item);
689        }
690
691        CHECK_GT(mBandwidthItems.size(), 0u);
692
693        mBandwidthItems.sort(SortByBandwidth);
694
695        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
696            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
697                initialBandwidthIndex = i;
698                break;
699            }
700        }
701    } else {
702        // dummy item.
703        BandwidthItem item;
704        item.mPlaylistIndex = 0;
705        item.mBandwidth = 0;
706        mBandwidthItems.push(item);
707    }
708
709    mPlaylist->pickRandomMediaItems();
710    changeConfiguration(
711            0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
712}
713
714void LiveSession::finishDisconnect() {
715    // No reconfiguration is currently pending, make sure none will trigger
716    // during disconnection either.
717    cancelCheckBandwidthEvent();
718
719    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
720    // (finishDisconnect, onFinishDisconnect2)
721    cancelBandwidthSwitch();
722
723    // cancel switch down monitor
724    mSwitchDownMonitor.clear();
725
726    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
727        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
728    }
729
730    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
731
732    mContinuationCounter = mFetcherInfos.size();
733    mContinuation = msg;
734
735    if (mContinuationCounter == 0) {
736        msg->post();
737    }
738}
739
740void LiveSession::onFinishDisconnect2() {
741    mContinuation.clear();
742
743    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
744    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
745
746    mPacketSources.valueFor(
747            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
748
749    sp<AMessage> response = new AMessage;
750    response->setInt32("err", OK);
751
752    response->postReply(mDisconnectReplyID);
753    mDisconnectReplyID = 0;
754}
755
756sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
757    ssize_t index = mFetcherInfos.indexOfKey(uri);
758
759    if (index >= 0) {
760        return NULL;
761    }
762
763    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
764    notify->setString("uri", uri);
765    notify->setInt32("switchGeneration", mSwitchGeneration);
766
767    FetcherInfo info;
768    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
769    info.mDurationUs = -1ll;
770    info.mIsPrepared = false;
771    info.mToBeRemoved = false;
772    looper()->registerHandler(info.mFetcher);
773
774    mFetcherInfos.add(uri, info);
775
776    return info.mFetcher;
777}
778
779/*
780 * Illustration of parameters:
781 *
782 * 0      `range_offset`
783 * +------------+-------------------------------------------------------+--+--+
784 * |            |                                 | next block to fetch |  |  |
785 * |            | `source` handle => `out` buffer |                     |  |  |
786 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
787 * |            |<----------- `range_length` / buffer capacity ----------->|  |
788 * |<------------------------------ file_size ------------------------------->|
789 *
790 * Special parameter values:
791 * - range_length == -1 means entire file
792 * - block_size == 0 means entire range
793 *
794 */
795ssize_t LiveSession::fetchFile(
796        const char *url, sp<ABuffer> *out,
797        int64_t range_offset, int64_t range_length,
798        uint32_t block_size, /* download block size */
799        sp<DataSource> *source, /* to return and reuse source */
800        String8 *actualUrl) {
801    off64_t size;
802    sp<DataSource> temp_source;
803    if (source == NULL) {
804        source = &temp_source;
805    }
806
807    if (*source == NULL) {
808        if (!strncasecmp(url, "file://", 7)) {
809            *source = new FileSource(url + 7);
810        } else if (strncasecmp(url, "http://", 7)
811                && strncasecmp(url, "https://", 8)) {
812            return ERROR_UNSUPPORTED;
813        } else {
814            KeyedVector<String8, String8> headers = mExtraHeaders;
815            if (range_offset > 0 || range_length >= 0) {
816                headers.add(
817                        String8("Range"),
818                        String8(
819                            StringPrintf(
820                                "bytes=%lld-%s",
821                                range_offset,
822                                range_length < 0
823                                    ? "" : StringPrintf("%lld",
824                                            range_offset + range_length - 1).c_str()).c_str()));
825            }
826            status_t err = mHTTPDataSource->connect(url, &headers);
827
828            if (err != OK) {
829                return err;
830            }
831
832            *source = mHTTPDataSource;
833        }
834    }
835
836    status_t getSizeErr = (*source)->getSize(&size);
837    if (getSizeErr != OK) {
838        size = 65536;
839    }
840
841    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
842    if (*out == NULL) {
843        buffer->setRange(0, 0);
844    }
845
846    ssize_t bytesRead = 0;
847    // adjust range_length if only reading partial block
848    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
849        range_length = buffer->size() + block_size;
850    }
851    for (;;) {
852        // Only resize when we don't know the size.
853        size_t bufferRemaining = buffer->capacity() - buffer->size();
854        if (bufferRemaining == 0 && getSizeErr != OK) {
855            bufferRemaining = 32768;
856
857            ALOGV("increasing download buffer to %zu bytes",
858                 buffer->size() + bufferRemaining);
859
860            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
861            memcpy(copy->data(), buffer->data(), buffer->size());
862            copy->setRange(0, buffer->size());
863
864            buffer = copy;
865        }
866
867        size_t maxBytesToRead = bufferRemaining;
868        if (range_length >= 0) {
869            int64_t bytesLeftInRange = range_length - buffer->size();
870            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
871                maxBytesToRead = bytesLeftInRange;
872
873                if (bytesLeftInRange == 0) {
874                    break;
875                }
876            }
877        }
878
879        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
880        // to help us break out of the loop.
881        ssize_t n = (*source)->readAt(
882                buffer->size(), buffer->data() + buffer->size(),
883                maxBytesToRead);
884
885        if (n < 0) {
886            return n;
887        }
888
889        if (n == 0) {
890            break;
891        }
892
893        buffer->setRange(0, buffer->size() + (size_t)n);
894        bytesRead += n;
895    }
896
897    *out = buffer;
898    if (actualUrl != NULL) {
899        *actualUrl = (*source)->getUri();
900        if (actualUrl->isEmpty()) {
901            *actualUrl = url;
902        }
903    }
904
905    return bytesRead;
906}
907
908sp<M3UParser> LiveSession::fetchPlaylist(
909        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
910    ALOGV("fetchPlaylist '%s'", url);
911
912    *unchanged = false;
913
914    sp<ABuffer> buffer;
915    String8 actualUrl;
916    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
917
918    if (err <= 0) {
919        return NULL;
920    }
921
922    // MD5 functionality is not available on the simulator, treat all
923    // playlists as changed.
924
925#if defined(HAVE_ANDROID_OS)
926    uint8_t hash[16];
927
928    MD5_CTX m;
929    MD5_Init(&m);
930    MD5_Update(&m, buffer->data(), buffer->size());
931
932    MD5_Final(hash, &m);
933
934    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
935        // playlist unchanged
936        *unchanged = true;
937
938        return NULL;
939    }
940
941    if (curPlaylistHash != NULL) {
942        memcpy(curPlaylistHash, hash, sizeof(hash));
943    }
944#endif
945
946    sp<M3UParser> playlist =
947        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
948
949    if (playlist->initCheck() != OK) {
950        ALOGE("failed to parse .m3u8 playlist");
951
952        return NULL;
953    }
954
955    return playlist;
956}
957
958static double uniformRand() {
959    return (double)rand() / RAND_MAX;
960}
961
962size_t LiveSession::getBandwidthIndex() {
963    if (mBandwidthItems.size() == 0) {
964        return 0;
965    }
966
967#if 1
968    char value[PROPERTY_VALUE_MAX];
969    ssize_t index = -1;
970    if (property_get("media.httplive.bw-index", value, NULL)) {
971        char *end;
972        index = strtol(value, &end, 10);
973        CHECK(end > value && *end == '\0');
974
975        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
976            index = mBandwidthItems.size() - 1;
977        }
978    }
979
980    if (index < 0) {
981        int32_t bandwidthBps;
982        if (mHTTPDataSource != NULL
983                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
984            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
985        } else {
986            ALOGV("no bandwidth estimate.");
987            return 0;  // Pick the lowest bandwidth stream by default.
988        }
989
990        char value[PROPERTY_VALUE_MAX];
991        if (property_get("media.httplive.max-bw", value, NULL)) {
992            char *end;
993            long maxBw = strtoul(value, &end, 10);
994            if (end > value && *end == '\0') {
995                if (maxBw > 0 && bandwidthBps > maxBw) {
996                    ALOGV("bandwidth capped to %ld bps", maxBw);
997                    bandwidthBps = maxBw;
998                }
999            }
1000        }
1001
1002        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
1003
1004        index = mBandwidthItems.size() - 1;
1005        while (index > 0) {
1006            // consider only 80% of the available bandwidth, but if we are switching up,
1007            // be even more conservative (70%) to avoid overestimating and immediately
1008            // switching back.
1009            size_t adjustedBandwidthBps = bandwidthBps;
1010            if (index > mCurBandwidthIndex) {
1011                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
1012            } else {
1013                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
1014            }
1015            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
1016                break;
1017            }
1018            --index;
1019        }
1020    }
1021#elif 0
1022    // Change bandwidth at random()
1023    size_t index = uniformRand() * mBandwidthItems.size();
1024#elif 0
1025    // There's a 50% chance to stay on the current bandwidth and
1026    // a 50% chance to switch to the next higher bandwidth (wrapping around
1027    // to lowest)
1028    const size_t kMinIndex = 0;
1029
1030    static ssize_t mCurBandwidthIndex = -1;
1031
1032    size_t index;
1033    if (mCurBandwidthIndex < 0) {
1034        index = kMinIndex;
1035    } else if (uniformRand() < 0.5) {
1036        index = (size_t)mCurBandwidthIndex;
1037    } else {
1038        index = mCurBandwidthIndex + 1;
1039        if (index == mBandwidthItems.size()) {
1040            index = kMinIndex;
1041        }
1042    }
1043    mCurBandwidthIndex = index;
1044#elif 0
1045    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1046
1047    size_t index = mBandwidthItems.size() - 1;
1048    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1049        --index;
1050    }
1051#elif 1
1052    char value[PROPERTY_VALUE_MAX];
1053    size_t index;
1054    if (property_get("media.httplive.bw-index", value, NULL)) {
1055        char *end;
1056        index = strtoul(value, &end, 10);
1057        CHECK(end > value && *end == '\0');
1058
1059        if (index >= mBandwidthItems.size()) {
1060            index = mBandwidthItems.size() - 1;
1061        }
1062    } else {
1063        index = 0;
1064    }
1065#else
1066    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1067#endif
1068
1069    CHECK_GE(index, 0);
1070
1071    return index;
1072}
1073
1074int64_t LiveSession::latestMediaSegmentStartTimeUs() {
1075    sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
1076    int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
1077    if (audioMeta != NULL) {
1078        audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
1079    }
1080
1081    sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
1082    if (videoMeta != NULL
1083            && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
1084        if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
1085            minSegmentStartTimeUs = videoSegmentStartTimeUs;
1086        }
1087
1088    }
1089    return minSegmentStartTimeUs;
1090}
1091
1092status_t LiveSession::onSeek(const sp<AMessage> &msg) {
1093    int64_t timeUs;
1094    CHECK(msg->findInt64("timeUs", &timeUs));
1095
1096    if (!mReconfigurationInProgress) {
1097        changeConfiguration(timeUs, getBandwidthIndex());
1098    }
1099
1100    return OK;
1101}
1102
1103status_t LiveSession::getDuration(int64_t *durationUs) const {
1104    int64_t maxDurationUs = 0ll;
1105    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1106        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1107
1108        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
1109            maxDurationUs = fetcherDurationUs;
1110        }
1111    }
1112
1113    *durationUs = maxDurationUs;
1114
1115    return OK;
1116}
1117
1118bool LiveSession::isSeekable() const {
1119    int64_t durationUs;
1120    return getDuration(&durationUs) == OK && durationUs >= 0;
1121}
1122
1123bool LiveSession::hasDynamicDuration() const {
1124    return false;
1125}
1126
1127size_t LiveSession::getTrackCount() const {
1128    if (mPlaylist == NULL) {
1129        return 0;
1130    } else {
1131        return mPlaylist->getTrackCount();
1132    }
1133}
1134
1135sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1136    if (mPlaylist == NULL) {
1137        return NULL;
1138    } else {
1139        return mPlaylist->getTrackInfo(trackIndex);
1140    }
1141}
1142
1143status_t LiveSession::selectTrack(size_t index, bool select) {
1144    if (mPlaylist == NULL) {
1145        return INVALID_OPERATION;
1146    }
1147
1148    ++mSubtitleGeneration;
1149    status_t err = mPlaylist->selectTrack(index, select);
1150    if (err == OK) {
1151        sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
1152        msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
1153        msg->setInt32("pickTrack", select);
1154        msg->post();
1155    }
1156    return err;
1157}
1158
1159bool LiveSession::canSwitchUp() {
1160    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
1161    status_t err = OK;
1162    for (size_t i = 0; i < mPacketSources.size(); ++i) {
1163        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
1164        int64_t dur = source->getBufferedDurationUs(&err);
1165        if (err == OK && dur > 10000000) {
1166            return true;
1167        }
1168    }
1169    return false;
1170}
1171
1172void LiveSession::changeConfiguration(
1173        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
1174    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
1175    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
1176    cancelBandwidthSwitch();
1177
1178    CHECK(!mReconfigurationInProgress);
1179    mReconfigurationInProgress = true;
1180
1181    mCurBandwidthIndex = bandwidthIndex;
1182
1183    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
1184          timeUs, bandwidthIndex, pickTrack);
1185
1186    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
1187    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
1188
1189    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1190    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1191
1192    AString URIs[kMaxStreams];
1193    for (size_t i = 0; i < kMaxStreams; ++i) {
1194        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1195            streamMask |= indexToType(i);
1196        }
1197    }
1198
1199    // Step 1, stop and discard fetchers that are no longer needed.
1200    // Pause those that we'll reuse.
1201    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1202        const AString &uri = mFetcherInfos.keyAt(i);
1203
1204        bool discardFetcher = true;
1205
1206        // If we're seeking all current fetchers are discarded.
1207        if (timeUs < 0ll) {
1208            // delay fetcher removal if not picking tracks
1209            discardFetcher = pickTrack;
1210
1211            for (size_t j = 0; j < kMaxStreams; ++j) {
1212                StreamType type = indexToType(j);
1213                if ((streamMask & type) && uri == URIs[j]) {
1214                    resumeMask |= type;
1215                    streamMask &= ~type;
1216                    discardFetcher = false;
1217                }
1218            }
1219        }
1220
1221        if (discardFetcher) {
1222            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1223        } else {
1224            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1225        }
1226    }
1227
1228    sp<AMessage> msg;
1229    if (timeUs < 0ll) {
1230        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1231        msg = new AMessage(kWhatChangeConfiguration3, id());
1232    } else {
1233        msg = new AMessage(kWhatChangeConfiguration2, id());
1234    }
1235    msg->setInt32("streamMask", streamMask);
1236    msg->setInt32("resumeMask", resumeMask);
1237    msg->setInt32("pickTrack", pickTrack);
1238    msg->setInt64("timeUs", timeUs);
1239    for (size_t i = 0; i < kMaxStreams; ++i) {
1240        if ((streamMask | resumeMask) & indexToType(i)) {
1241            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1242        }
1243    }
1244
1245    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1246    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1247    // fetchers have completed their asynchronous operation, we'll post
1248    // mContinuation, which then is handled below in onChangeConfiguration2.
1249    mContinuationCounter = mFetcherInfos.size();
1250    mContinuation = msg;
1251
1252    if (mContinuationCounter == 0) {
1253        msg->post();
1254
1255        if (mSeekReplyID != 0) {
1256            CHECK(mSeekReply != NULL);
1257            mSeekReply->postReply(mSeekReplyID);
1258        }
1259    }
1260}
1261
1262void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1263    if (!mReconfigurationInProgress) {
1264        int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
1265        msg->findInt32("pickTrack", &pickTrack);
1266        msg->findInt32("bandwidthIndex", &bandwidthIndex);
1267        changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
1268    } else {
1269        msg->post(1000000ll); // retry in 1 sec
1270    }
1271}
1272
1273void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1274    mContinuation.clear();
1275
1276    // All fetchers are either suspended or have been removed now.
1277
1278    uint32_t streamMask, resumeMask;
1279    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1280    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1281
1282    // currently onChangeConfiguration2 is only called for seeking;
1283    // remove the following CHECK if using it else where.
1284    CHECK_EQ(resumeMask, 0);
1285    streamMask |= resumeMask;
1286
1287    AString URIs[kMaxStreams];
1288    for (size_t i = 0; i < kMaxStreams; ++i) {
1289        if (streamMask & indexToType(i)) {
1290            const AString &uriKey = mStreams[i].uriKey();
1291            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1292            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1293        }
1294    }
1295
1296    // Determine which decoders to shutdown on the player side,
1297    // a decoder has to be shutdown if either
1298    // 1) its streamtype was active before but now longer isn't.
1299    // or
1300    // 2) its streamtype was already active and still is but the URI
1301    //    has changed.
1302    uint32_t changedMask = 0;
1303    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1304        if (((mStreamMask & streamMask & indexToType(i))
1305                && !(URIs[i] == mStreams[i].mUri))
1306                || (mStreamMask & ~streamMask & indexToType(i))) {
1307            changedMask |= indexToType(i);
1308        }
1309    }
1310
1311    if (changedMask == 0) {
1312        // If nothing changed as far as the audio/video decoders
1313        // are concerned we can proceed.
1314        onChangeConfiguration3(msg);
1315        return;
1316    }
1317
1318    // Something changed, inform the player which will shutdown the
1319    // corresponding decoders and will post the reply once that's done.
1320    // Handling the reply will continue executing below in
1321    // onChangeConfiguration3.
1322    sp<AMessage> notify = mNotify->dup();
1323    notify->setInt32("what", kWhatStreamsChanged);
1324    notify->setInt32("changedMask", changedMask);
1325
1326    msg->setWhat(kWhatChangeConfiguration3);
1327    msg->setTarget(id());
1328
1329    notify->setMessage("reply", msg);
1330    notify->post();
1331}
1332
1333void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1334    mContinuation.clear();
1335    // All remaining fetchers are still suspended, the player has shutdown
1336    // any decoders that needed it.
1337
1338    uint32_t streamMask, resumeMask;
1339    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1340    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1341
1342    int64_t timeUs;
1343    int32_t pickTrack;
1344    bool switching = false;
1345    CHECK(msg->findInt64("timeUs", &timeUs));
1346    CHECK(msg->findInt32("pickTrack", &pickTrack));
1347
1348    if (timeUs < 0ll) {
1349        if (!pickTrack) {
1350            switching = true;
1351        }
1352        mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1353    } else {
1354        mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1355    }
1356
1357    for (size_t i = 0; i < kMaxStreams; ++i) {
1358        if (streamMask & indexToType(i)) {
1359            if (switching) {
1360                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1361            } else {
1362                CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1363            }
1364        }
1365    }
1366
1367    mNewStreamMask = streamMask | resumeMask;
1368    if (switching) {
1369        mSwapMask = mStreamMask & ~resumeMask;
1370    }
1371
1372    // Of all existing fetchers:
1373    // * Resume fetchers that are still needed and assign them original packet sources.
1374    // * Mark otherwise unneeded fetchers for removal.
1375    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1376    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1377        const AString &uri = mFetcherInfos.keyAt(i);
1378
1379        sp<AnotherPacketSource> sources[kMaxStreams];
1380        for (size_t j = 0; j < kMaxStreams; ++j) {
1381            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1382                sources[j] = mPacketSources.valueFor(indexToType(j));
1383
1384                if (j != kSubtitleIndex) {
1385                    ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
1386                    sp<AnotherPacketSource> discontinuityQueue;
1387                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1388                    discontinuityQueue->queueDiscontinuity(
1389                            ATSParser::DISCONTINUITY_NONE,
1390                            NULL,
1391                            true);
1392                }
1393            }
1394        }
1395
1396        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1397        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1398                || sources[kSubtitleIndex] != NULL) {
1399            info.mFetcher->startAsync(
1400                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1401        } else {
1402            info.mToBeRemoved = true;
1403        }
1404    }
1405
1406    // streamMask now only contains the types that need a new fetcher created.
1407
1408    if (streamMask != 0) {
1409        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1410    }
1411
1412    // Find out when the original fetchers have buffered up to and start the new fetchers
1413    // at a later timestamp.
1414    for (size_t i = 0; i < kMaxStreams; i++) {
1415        if (!(indexToType(i) & streamMask)) {
1416            continue;
1417        }
1418
1419        AString uri;
1420        uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1421
1422        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1423        CHECK(fetcher != NULL);
1424
1425        int32_t latestSeq = -1;
1426        int64_t startTimeUs = -1;
1427        int64_t segmentStartTimeUs = -1ll;
1428        int32_t discontinuitySeq = -1;
1429        sp<AnotherPacketSource> sources[kMaxStreams];
1430
1431        if (i == kSubtitleIndex) {
1432            segmentStartTimeUs = latestMediaSegmentStartTimeUs();
1433        }
1434
1435        // TRICKY: looping from i as earlier streams are already removed from streamMask
1436        for (size_t j = i; j < kMaxStreams; ++j) {
1437            const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1438            if ((streamMask & indexToType(j)) && uri == streamUri) {
1439                sources[j] = mPacketSources.valueFor(indexToType(j));
1440
1441                if (timeUs >= 0) {
1442                    sources[j]->clear();
1443                    startTimeUs = timeUs;
1444
1445                    sp<AnotherPacketSource> discontinuityQueue;
1446                    sp<AMessage> extra = new AMessage;
1447                    extra->setInt64("timeUs", timeUs);
1448                    discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1449                    discontinuityQueue->queueDiscontinuity(
1450                            ATSParser::DISCONTINUITY_SEEK, extra, true);
1451                } else {
1452                    int32_t type;
1453                    int64_t srcSegmentStartTimeUs;
1454                    sp<AMessage> meta;
1455                    if (pickTrack) {
1456                        // selecting
1457                        meta = sources[j]->getLatestDequeuedMeta();
1458                    } else {
1459                        // adapting
1460                        meta = sources[j]->getLatestEnqueuedMeta();
1461                    }
1462
1463                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1464                        int64_t tmpUs;
1465                        CHECK(meta->findInt64("timeUs", &tmpUs));
1466                        if (startTimeUs < 0 || tmpUs < startTimeUs) {
1467                            startTimeUs = tmpUs;
1468                        }
1469
1470                        CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs));
1471                        if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) {
1472                            segmentStartTimeUs = tmpUs;
1473                        }
1474
1475                        int32_t seq;
1476                        CHECK(meta->findInt32("discontinuitySeq", &seq));
1477                        if (discontinuitySeq < 0 || seq < discontinuitySeq) {
1478                            discontinuitySeq = seq;
1479                        }
1480                    }
1481
1482                    if (pickTrack) {
1483                        // selecting track, queue discontinuities before content
1484                        sources[j]->clear();
1485                        if (j == kSubtitleIndex) {
1486                            break;
1487                        }
1488                        sp<AnotherPacketSource> discontinuityQueue;
1489                        discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
1490                        discontinuityQueue->queueDiscontinuity(
1491                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1492                    } else {
1493                        // adapting, queue discontinuities after resume
1494                        sources[j] = mPacketSources2.valueFor(indexToType(j));
1495                        sources[j]->clear();
1496                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1497                        if (extraStreams & indexToType(j)) {
1498                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
1499                        }
1500                    }
1501                }
1502
1503                streamMask &= ~indexToType(j);
1504            }
1505        }
1506
1507        fetcher->startAsync(
1508                sources[kAudioIndex],
1509                sources[kVideoIndex],
1510                sources[kSubtitleIndex],
1511                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
1512                segmentStartTimeUs,
1513                discontinuitySeq,
1514                switching);
1515    }
1516
1517    // All fetchers have now been started, the configuration change
1518    // has completed.
1519
1520    cancelCheckBandwidthEvent();
1521    scheduleCheckBandwidthEvent();
1522
1523    ALOGV("XXX configuration change completed.");
1524    mReconfigurationInProgress = false;
1525    if (switching) {
1526        mSwitchInProgress = true;
1527    } else {
1528        mStreamMask = mNewStreamMask;
1529    }
1530
1531    if (mDisconnectReplyID != 0) {
1532        finishDisconnect();
1533    }
1534}
1535
1536void LiveSession::onSwapped(const sp<AMessage> &msg) {
1537    int32_t switchGeneration;
1538    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1539    if (switchGeneration != mSwitchGeneration) {
1540        return;
1541    }
1542
1543    int32_t stream;
1544    CHECK(msg->findInt32("stream", &stream));
1545
1546    ssize_t idx = typeToIndex(stream);
1547    CHECK(idx >= 0);
1548    if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1549        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
1550    }
1551    mStreams[idx].mUri = mStreams[idx].mNewUri;
1552    mStreams[idx].mNewUri.clear();
1553
1554    mSwapMask &= ~stream;
1555    if (mSwapMask != 0) {
1556        return;
1557    }
1558
1559    // Check if new variant contains extra streams.
1560    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1561    while (extraStreams) {
1562        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1563        swapPacketSource(extraStream);
1564        extraStreams &= ~extraStream;
1565
1566        idx = typeToIndex(extraStream);
1567        CHECK(idx >= 0);
1568        if (mStreams[idx].mNewUri.empty()) {
1569            ALOGW("swapping extra stream type %d %s to empty stream",
1570                    extraStream, mStreams[idx].mUri.c_str());
1571        }
1572        mStreams[idx].mUri = mStreams[idx].mNewUri;
1573        mStreams[idx].mNewUri.clear();
1574    }
1575
1576    tryToFinishBandwidthSwitch();
1577}
1578
1579void LiveSession::onCheckSwitchDown() {
1580    if (mSwitchDownMonitor == NULL) {
1581        return;
1582    }
1583
1584    for (size_t i = 0; i < kMaxStreams; ++i) {
1585        int32_t targetDuration;
1586        sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
1587        sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
1588
1589        if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
1590            int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
1591            int64_t targetDurationUs = targetDuration * 1000000ll;
1592
1593            if (bufferedDurationUs < targetDurationUs / 3) {
1594                (new AMessage(kWhatSwitchDown, id()))->post();
1595                break;
1596            }
1597        }
1598    }
1599
1600    mSwitchDownMonitor->post(1000000ll);
1601}
1602
1603void LiveSession::onSwitchDown() {
1604    if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
1605        return;
1606    }
1607
1608    ssize_t bandwidthIndex = getBandwidthIndex();
1609    if (bandwidthIndex < mCurBandwidthIndex) {
1610        changeConfiguration(-1, bandwidthIndex, false);
1611        return;
1612    }
1613
1614    changeConfiguration(-1, mCurBandwidthIndex - 1, false);
1615}
1616
1617// Mark switch done when:
1618//   1. all old buffers are swapped out
1619void LiveSession::tryToFinishBandwidthSwitch() {
1620    if (!mSwitchInProgress) {
1621        return;
1622    }
1623
1624    bool needToRemoveFetchers = false;
1625    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1626        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1627            needToRemoveFetchers = true;
1628            break;
1629        }
1630    }
1631
1632    if (!needToRemoveFetchers && mSwapMask == 0) {
1633        ALOGI("mSwitchInProgress = false");
1634        mStreamMask = mNewStreamMask;
1635        mSwitchInProgress = false;
1636    }
1637}
1638
1639void LiveSession::scheduleCheckBandwidthEvent() {
1640    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1641    msg->setInt32("generation", mCheckBandwidthGeneration);
1642    msg->post(10000000ll);
1643}
1644
1645void LiveSession::cancelCheckBandwidthEvent() {
1646    ++mCheckBandwidthGeneration;
1647}
1648
1649void LiveSession::cancelBandwidthSwitch() {
1650    Mutex::Autolock lock(mSwapMutex);
1651    mSwitchGeneration++;
1652    mSwitchInProgress = false;
1653    mSwapMask = 0;
1654
1655    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1656        FetcherInfo& info = mFetcherInfos.editValueAt(i);
1657        if (info.mToBeRemoved) {
1658            info.mToBeRemoved = false;
1659        }
1660    }
1661
1662    for (size_t i = 0; i < kMaxStreams; ++i) {
1663        if (!mStreams[i].mNewUri.empty()) {
1664            ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
1665            if (j < 0) {
1666                mStreams[i].mNewUri.clear();
1667                continue;
1668            }
1669
1670            const FetcherInfo &info = mFetcherInfos.valueAt(j);
1671            info.mFetcher->stopAsync();
1672            mFetcherInfos.removeItemsAt(j);
1673            mStreams[i].mNewUri.clear();
1674        }
1675    }
1676}
1677
1678bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1679    if (mReconfigurationInProgress || mSwitchInProgress) {
1680        return false;
1681    }
1682
1683    if (mCurBandwidthIndex < 0) {
1684        return true;
1685    }
1686
1687    if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
1688        return false;
1689    } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
1690        return canSwitchUp();
1691    } else {
1692        return true;
1693    }
1694}
1695
1696void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
1697    size_t bandwidthIndex = getBandwidthIndex();
1698    if (canSwitchBandwidthTo(bandwidthIndex)) {
1699        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1700    } else {
1701        // Come back and check again 10 seconds later in case there is nothing to do now.
1702        // If we DO change configuration, once that completes it'll schedule a new
1703        // check bandwidth event with an incremented mCheckBandwidthGeneration.
1704        msg->post(10000000ll);
1705    }
1706}
1707
1708void LiveSession::postPrepared(status_t err) {
1709    CHECK(mInPreparationPhase);
1710
1711    sp<AMessage> notify = mNotify->dup();
1712    if (err == OK || err == ERROR_END_OF_STREAM) {
1713        notify->setInt32("what", kWhatPrepared);
1714    } else {
1715        notify->setInt32("what", kWhatPreparationFailed);
1716        notify->setInt32("err", err);
1717    }
1718
1719    notify->post();
1720
1721    mInPreparationPhase = false;
1722
1723    mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
1724    mSwitchDownMonitor->post();
1725}
1726
1727}  // namespace android
1728
1729