LiveSession.cpp revision 94dcc94b16cc6c2a7aa02df2d0d6b8743d738d78
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <openssl/aes.h>
42#include <openssl/md5.h>
43
44namespace android {
45
46LiveSession::LiveSession(
47        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
48    : mNotify(notify),
49      mFlags(flags),
50      mUIDValid(uidValid),
51      mUID(uid),
52      mInPreparationPhase(true),
53      mHTTPDataSource(
54              HTTPBase::Create(
55                  (mFlags & kFlagIncognito)
56                    ? HTTPBase::kFlagIncognito
57                    : 0)),
58      mPrevBandwidthIndex(-1),
59      mStreamMask(0),
60      mCheckBandwidthGeneration(0),
61      mLastDequeuedTimeUs(0ll),
62      mRealTimeBaseUs(0ll),
63      mReconfigurationInProgress(false),
64      mDisconnectReplyID(0) {
65    if (mUIDValid) {
66        mHTTPDataSource->setUID(mUID);
67    }
68
69    mPacketSources.add(
70            STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
71
72    mPacketSources.add(
73            STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
74
75    mPacketSources.add(
76            STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
77}
78
79LiveSession::~LiveSession() {
80}
81
82status_t LiveSession::dequeueAccessUnit(
83        StreamType stream, sp<ABuffer> *accessUnit) {
84    if (!(mStreamMask & stream)) {
85        return UNKNOWN_ERROR;
86    }
87
88    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
89
90    status_t finalResult;
91    if (!packetSource->hasBufferAvailable(&finalResult)) {
92        return finalResult == OK ? -EAGAIN : finalResult;
93    }
94
95    status_t err = packetSource->dequeueAccessUnit(accessUnit);
96
97    const char *streamStr;
98    switch (stream) {
99        case STREAMTYPE_AUDIO:
100            streamStr = "audio";
101            break;
102        case STREAMTYPE_VIDEO:
103            streamStr = "video";
104            break;
105        case STREAMTYPE_SUBTITLES:
106            streamStr = "subs";
107            break;
108        default:
109            TRESPASS();
110    }
111
112    if (err == INFO_DISCONTINUITY) {
113        int32_t type;
114        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
115
116        sp<AMessage> extra;
117        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
118            extra.clear();
119        }
120
121        ALOGI("[%s] read discontinuity of type %d, extra = %s",
122              streamStr,
123              type,
124              extra == NULL ? "NULL" : extra->debugString().c_str());
125    } else if (err == OK) {
126        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
127            int64_t timeUs;
128            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
129            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
130
131            mLastDequeuedTimeUs = timeUs;
132            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
133        } else if (stream == STREAMTYPE_SUBTITLES) {
134            (*accessUnit)->meta()->setInt32(
135                    "trackIndex", mPlaylist->getSelectedIndex());
136            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
137        }
138    } else {
139        ALOGI("[%s] encountered error %d", streamStr, err);
140    }
141
142    return err;
143}
144
145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
146    if (!(mStreamMask & stream)) {
147        return UNKNOWN_ERROR;
148    }
149
150    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
151
152    sp<MetaData> meta = packetSource->getFormat();
153
154    if (meta == NULL) {
155        return -EAGAIN;
156    }
157
158    return convertMetaDataToMessage(meta, format);
159}
160
161void LiveSession::connectAsync(
162        const char *url, const KeyedVector<String8, String8> *headers) {
163    sp<AMessage> msg = new AMessage(kWhatConnect, id());
164    msg->setString("url", url);
165
166    if (headers != NULL) {
167        msg->setPointer(
168                "headers",
169                new KeyedVector<String8, String8>(*headers));
170    }
171
172    msg->post();
173}
174
175status_t LiveSession::disconnect() {
176    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
177
178    sp<AMessage> response;
179    status_t err = msg->postAndAwaitResponse(&response);
180
181    return err;
182}
183
184status_t LiveSession::seekTo(int64_t timeUs) {
185    sp<AMessage> msg = new AMessage(kWhatSeek, id());
186    msg->setInt64("timeUs", timeUs);
187
188    sp<AMessage> response;
189    status_t err = msg->postAndAwaitResponse(&response);
190
191    return err;
192}
193
194void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
195    switch (msg->what()) {
196        case kWhatConnect:
197        {
198            onConnect(msg);
199            break;
200        }
201
202        case kWhatDisconnect:
203        {
204            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
205
206            if (mReconfigurationInProgress) {
207                break;
208            }
209
210            finishDisconnect();
211            break;
212        }
213
214        case kWhatSeek:
215        {
216            uint32_t replyID;
217            CHECK(msg->senderAwaitsResponse(&replyID));
218
219            status_t err = onSeek(msg);
220
221            sp<AMessage> response = new AMessage;
222            response->setInt32("err", err);
223
224            response->postReply(replyID);
225            break;
226        }
227
228        case kWhatFetcherNotify:
229        {
230            int32_t what;
231            CHECK(msg->findInt32("what", &what));
232
233            switch (what) {
234                case PlaylistFetcher::kWhatStarted:
235                    break;
236                case PlaylistFetcher::kWhatPaused:
237                case PlaylistFetcher::kWhatStopped:
238                {
239                    if (what == PlaylistFetcher::kWhatStopped) {
240                        AString uri;
241                        CHECK(msg->findString("uri", &uri));
242                        mFetcherInfos.removeItem(uri);
243                    }
244
245                    if (mContinuation != NULL) {
246                        CHECK_GT(mContinuationCounter, 0);
247                        if (--mContinuationCounter == 0) {
248                            mContinuation->post();
249                        }
250                    }
251                    break;
252                }
253
254                case PlaylistFetcher::kWhatDurationUpdate:
255                {
256                    AString uri;
257                    CHECK(msg->findString("uri", &uri));
258
259                    int64_t durationUs;
260                    CHECK(msg->findInt64("durationUs", &durationUs));
261
262                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
263                    info->mDurationUs = durationUs;
264                    break;
265                }
266
267                case PlaylistFetcher::kWhatError:
268                {
269                    status_t err;
270                    CHECK(msg->findInt32("err", &err));
271
272                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
273
274                    if (mInPreparationPhase) {
275                        postPrepared(err);
276                    }
277
278                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
279
280                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
281
282                    mPacketSources.valueFor(
283                            STREAMTYPE_SUBTITLES)->signalEOS(err);
284
285                    sp<AMessage> notify = mNotify->dup();
286                    notify->setInt32("what", kWhatError);
287                    notify->setInt32("err", err);
288                    notify->post();
289                    break;
290                }
291
292                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
293                {
294                    AString uri;
295                    CHECK(msg->findString("uri", &uri));
296
297                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
298                    info->mIsPrepared = true;
299
300                    if (mInPreparationPhase) {
301                        bool allFetchersPrepared = true;
302                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
303                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
304                                allFetchersPrepared = false;
305                                break;
306                            }
307                        }
308
309                        if (allFetchersPrepared) {
310                            postPrepared(OK);
311                        }
312                    }
313                    break;
314                }
315
316                default:
317                    TRESPASS();
318            }
319
320            break;
321        }
322
323        case kWhatCheckBandwidth:
324        {
325            int32_t generation;
326            CHECK(msg->findInt32("generation", &generation));
327
328            if (generation != mCheckBandwidthGeneration) {
329                break;
330            }
331
332            onCheckBandwidth();
333            break;
334        }
335
336        case kWhatChangeConfiguration:
337        {
338            onChangeConfiguration(msg);
339            break;
340        }
341
342        case kWhatChangeConfiguration2:
343        {
344            onChangeConfiguration2(msg);
345            break;
346        }
347
348        case kWhatChangeConfiguration3:
349        {
350            onChangeConfiguration3(msg);
351            break;
352        }
353
354        case kWhatFinishDisconnect2:
355        {
356            onFinishDisconnect2();
357            break;
358        }
359
360        default:
361            TRESPASS();
362            break;
363    }
364}
365
366// static
367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
368    if (a->mBandwidth < b->mBandwidth) {
369        return -1;
370    } else if (a->mBandwidth == b->mBandwidth) {
371        return 0;
372    }
373
374    return 1;
375}
376
377void LiveSession::onConnect(const sp<AMessage> &msg) {
378    AString url;
379    CHECK(msg->findString("url", &url));
380
381    KeyedVector<String8, String8> *headers = NULL;
382    if (!msg->findPointer("headers", (void **)&headers)) {
383        mExtraHeaders.clear();
384    } else {
385        mExtraHeaders = *headers;
386
387        delete headers;
388        headers = NULL;
389    }
390
391#if 1
392    ALOGI("onConnect <URL suppressed>");
393#else
394    ALOGI("onConnect %s", url.c_str());
395#endif
396
397    mMasterURL = url;
398
399    bool dummy;
400    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
401
402    if (mPlaylist == NULL) {
403        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
404
405        postPrepared(ERROR_IO);
406        return;
407    }
408
409    // We trust the content provider to make a reasonable choice of preferred
410    // initial bandwidth by listing it first in the variant playlist.
411    // At startup we really don't have a good estimate on the available
412    // network bandwidth since we haven't tranferred any data yet. Once
413    // we have we can make a better informed choice.
414    size_t initialBandwidth = 0;
415    size_t initialBandwidthIndex = 0;
416
417    if (mPlaylist->isVariantPlaylist()) {
418        for (size_t i = 0; i < mPlaylist->size(); ++i) {
419            BandwidthItem item;
420
421            item.mPlaylistIndex = i;
422
423            sp<AMessage> meta;
424            AString uri;
425            mPlaylist->itemAt(i, &uri, &meta);
426
427            unsigned long bandwidth;
428            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
429
430            if (initialBandwidth == 0) {
431                initialBandwidth = item.mBandwidth;
432            }
433
434            mBandwidthItems.push(item);
435        }
436
437        CHECK_GT(mBandwidthItems.size(), 0u);
438
439        mBandwidthItems.sort(SortByBandwidth);
440
441        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
442            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
443                initialBandwidthIndex = i;
444                break;
445            }
446        }
447    } else {
448        // dummy item.
449        BandwidthItem item;
450        item.mPlaylistIndex = 0;
451        item.mBandwidth = 0;
452        mBandwidthItems.push(item);
453    }
454
455    changeConfiguration(
456            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
457}
458
459void LiveSession::finishDisconnect() {
460    // No reconfiguration is currently pending, make sure none will trigger
461    // during disconnection either.
462    cancelCheckBandwidthEvent();
463
464    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
465        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
466    }
467
468    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
469
470    mContinuationCounter = mFetcherInfos.size();
471    mContinuation = msg;
472
473    if (mContinuationCounter == 0) {
474        msg->post();
475    }
476}
477
478void LiveSession::onFinishDisconnect2() {
479    mContinuation.clear();
480
481    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
482    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
483
484    mPacketSources.valueFor(
485            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
486
487    sp<AMessage> response = new AMessage;
488    response->setInt32("err", OK);
489
490    response->postReply(mDisconnectReplyID);
491    mDisconnectReplyID = 0;
492}
493
494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
495    ssize_t index = mFetcherInfos.indexOfKey(uri);
496
497    if (index >= 0) {
498        return NULL;
499    }
500
501    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
502    notify->setString("uri", uri);
503
504    FetcherInfo info;
505    info.mFetcher = new PlaylistFetcher(notify, this, uri);
506    info.mDurationUs = -1ll;
507    info.mIsPrepared = false;
508    looper()->registerHandler(info.mFetcher);
509
510    mFetcherInfos.add(uri, info);
511
512    return info.mFetcher;
513}
514
515status_t LiveSession::fetchFile(
516        const char *url, sp<ABuffer> *out,
517        int64_t range_offset, int64_t range_length) {
518    *out = NULL;
519
520    sp<DataSource> source;
521
522    if (!strncasecmp(url, "file://", 7)) {
523        source = new FileSource(url + 7);
524    } else if (strncasecmp(url, "http://", 7)
525            && strncasecmp(url, "https://", 8)) {
526        return ERROR_UNSUPPORTED;
527    } else {
528        KeyedVector<String8, String8> headers = mExtraHeaders;
529        if (range_offset > 0 || range_length >= 0) {
530            headers.add(
531                    String8("Range"),
532                    String8(
533                        StringPrintf(
534                            "bytes=%lld-%s",
535                            range_offset,
536                            range_length < 0
537                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
538        }
539        status_t err = mHTTPDataSource->connect(url, &headers);
540
541        if (err != OK) {
542            return err;
543        }
544
545        source = mHTTPDataSource;
546    }
547
548    off64_t size;
549    status_t err = source->getSize(&size);
550
551    if (err != OK) {
552        size = 65536;
553    }
554
555    sp<ABuffer> buffer = new ABuffer(size);
556    buffer->setRange(0, 0);
557
558    for (;;) {
559        size_t bufferRemaining = buffer->capacity() - buffer->size();
560
561        if (bufferRemaining == 0) {
562            bufferRemaining = 32768;
563
564            ALOGV("increasing download buffer to %d bytes",
565                 buffer->size() + bufferRemaining);
566
567            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
568            memcpy(copy->data(), buffer->data(), buffer->size());
569            copy->setRange(0, buffer->size());
570
571            buffer = copy;
572        }
573
574        size_t maxBytesToRead = bufferRemaining;
575        if (range_length >= 0) {
576            int64_t bytesLeftInRange = range_length - buffer->size();
577            if (bytesLeftInRange < maxBytesToRead) {
578                maxBytesToRead = bytesLeftInRange;
579
580                if (bytesLeftInRange == 0) {
581                    break;
582                }
583            }
584        }
585
586        ssize_t n = source->readAt(
587                buffer->size(), buffer->data() + buffer->size(),
588                maxBytesToRead);
589
590        if (n < 0) {
591            return n;
592        }
593
594        if (n == 0) {
595            break;
596        }
597
598        buffer->setRange(0, buffer->size() + (size_t)n);
599    }
600
601    *out = buffer;
602
603    return OK;
604}
605
606sp<M3UParser> LiveSession::fetchPlaylist(
607        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
608    ALOGV("fetchPlaylist '%s'", url);
609
610    *unchanged = false;
611
612    sp<ABuffer> buffer;
613    status_t err = fetchFile(url, &buffer);
614
615    if (err != OK) {
616        return NULL;
617    }
618
619    // MD5 functionality is not available on the simulator, treat all
620    // playlists as changed.
621
622#if defined(HAVE_ANDROID_OS)
623    uint8_t hash[16];
624
625    MD5_CTX m;
626    MD5_Init(&m);
627    MD5_Update(&m, buffer->data(), buffer->size());
628
629    MD5_Final(hash, &m);
630
631    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
632        // playlist unchanged
633        *unchanged = true;
634
635        return NULL;
636    }
637
638    if (curPlaylistHash != NULL) {
639        memcpy(curPlaylistHash, hash, sizeof(hash));
640    }
641#endif
642
643    sp<M3UParser> playlist =
644        new M3UParser(url, buffer->data(), buffer->size());
645
646    if (playlist->initCheck() != OK) {
647        ALOGE("failed to parse .m3u8 playlist");
648
649        return NULL;
650    }
651
652    return playlist;
653}
654
655static double uniformRand() {
656    return (double)rand() / RAND_MAX;
657}
658
659size_t LiveSession::getBandwidthIndex() {
660    if (mBandwidthItems.size() == 0) {
661        return 0;
662    }
663
664#if 1
665    char value[PROPERTY_VALUE_MAX];
666    ssize_t index = -1;
667    if (property_get("media.httplive.bw-index", value, NULL)) {
668        char *end;
669        index = strtol(value, &end, 10);
670        CHECK(end > value && *end == '\0');
671
672        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
673            index = mBandwidthItems.size() - 1;
674        }
675    }
676
677    if (index < 0) {
678        int32_t bandwidthBps;
679        if (mHTTPDataSource != NULL
680                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
681            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
682        } else {
683            ALOGV("no bandwidth estimate.");
684            return 0;  // Pick the lowest bandwidth stream by default.
685        }
686
687        char value[PROPERTY_VALUE_MAX];
688        if (property_get("media.httplive.max-bw", value, NULL)) {
689            char *end;
690            long maxBw = strtoul(value, &end, 10);
691            if (end > value && *end == '\0') {
692                if (maxBw > 0 && bandwidthBps > maxBw) {
693                    ALOGV("bandwidth capped to %ld bps", maxBw);
694                    bandwidthBps = maxBw;
695                }
696            }
697        }
698
699        // Consider only 80% of the available bandwidth usable.
700        bandwidthBps = (bandwidthBps * 8) / 10;
701
702        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
703
704        index = mBandwidthItems.size() - 1;
705        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
706                                > (size_t)bandwidthBps) {
707            --index;
708        }
709    }
710#elif 0
711    // Change bandwidth at random()
712    size_t index = uniformRand() * mBandwidthItems.size();
713#elif 0
714    // There's a 50% chance to stay on the current bandwidth and
715    // a 50% chance to switch to the next higher bandwidth (wrapping around
716    // to lowest)
717    const size_t kMinIndex = 0;
718
719    static ssize_t mPrevBandwidthIndex = -1;
720
721    size_t index;
722    if (mPrevBandwidthIndex < 0) {
723        index = kMinIndex;
724    } else if (uniformRand() < 0.5) {
725        index = (size_t)mPrevBandwidthIndex;
726    } else {
727        index = mPrevBandwidthIndex + 1;
728        if (index == mBandwidthItems.size()) {
729            index = kMinIndex;
730        }
731    }
732    mPrevBandwidthIndex = index;
733#elif 0
734    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
735
736    size_t index = mBandwidthItems.size() - 1;
737    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
738        --index;
739    }
740#elif 1
741    char value[PROPERTY_VALUE_MAX];
742    size_t index;
743    if (property_get("media.httplive.bw-index", value, NULL)) {
744        char *end;
745        index = strtoul(value, &end, 10);
746        CHECK(end > value && *end == '\0');
747
748        if (index >= mBandwidthItems.size()) {
749            index = mBandwidthItems.size() - 1;
750        }
751    } else {
752        index = 0;
753    }
754#else
755    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
756#endif
757
758    CHECK_GE(index, 0);
759
760    return index;
761}
762
763status_t LiveSession::onSeek(const sp<AMessage> &msg) {
764    int64_t timeUs;
765    CHECK(msg->findInt64("timeUs", &timeUs));
766
767    if (!mReconfigurationInProgress) {
768        changeConfiguration(timeUs, getBandwidthIndex());
769    }
770
771    return OK;
772}
773
774status_t LiveSession::getDuration(int64_t *durationUs) const {
775    int64_t maxDurationUs = 0ll;
776    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
777        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
778
779        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
780            maxDurationUs = fetcherDurationUs;
781        }
782    }
783
784    *durationUs = maxDurationUs;
785
786    return OK;
787}
788
789bool LiveSession::isSeekable() const {
790    int64_t durationUs;
791    return getDuration(&durationUs) == OK && durationUs >= 0;
792}
793
794bool LiveSession::hasDynamicDuration() const {
795    return false;
796}
797
798status_t LiveSession::getTrackInfo(Parcel *reply) const {
799    return mPlaylist->getTrackInfo(reply);
800}
801
802status_t LiveSession::selectTrack(size_t index, bool select) {
803    status_t err = mPlaylist->selectTrack(index, select);
804    if (err == OK) {
805        (new AMessage(kWhatChangeConfiguration, id()))->post();
806    }
807    return err;
808}
809
810void LiveSession::changeConfiguration(
811        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
812    CHECK(!mReconfigurationInProgress);
813    mReconfigurationInProgress = true;
814
815    mPrevBandwidthIndex = bandwidthIndex;
816
817    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
818          timeUs, bandwidthIndex, pickTrack);
819
820    if (pickTrack) {
821        mPlaylist->pickRandomMediaItems();
822    }
823
824    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
825    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
826
827    uint32_t streamMask = 0;
828
829    AString audioURI;
830    if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
831        streamMask |= STREAMTYPE_AUDIO;
832    }
833
834    AString videoURI;
835    if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
836        streamMask |= STREAMTYPE_VIDEO;
837    }
838
839    AString subtitleURI;
840    if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
841        streamMask |= STREAMTYPE_SUBTITLES;
842    }
843
844    // Step 1, stop and discard fetchers that are no longer needed.
845    // Pause those that we'll reuse.
846    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
847        const AString &uri = mFetcherInfos.keyAt(i);
848
849        bool discardFetcher = true;
850
851        // If we're seeking all current fetchers are discarded.
852        if (timeUs < 0ll) {
853            if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
854                    || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
855                    || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
856                discardFetcher = false;
857            }
858        }
859
860        if (discardFetcher) {
861            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
862        } else {
863            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
864        }
865    }
866
867    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
868    msg->setInt32("streamMask", streamMask);
869    msg->setInt64("timeUs", timeUs);
870    if (streamMask & STREAMTYPE_AUDIO) {
871        msg->setString("audioURI", audioURI.c_str());
872    }
873    if (streamMask & STREAMTYPE_VIDEO) {
874        msg->setString("videoURI", videoURI.c_str());
875    }
876    if (streamMask & STREAMTYPE_SUBTITLES) {
877        msg->setString("subtitleURI", subtitleURI.c_str());
878    }
879
880    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
881    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
882    // fetchers have completed their asynchronous operation, we'll post
883    // mContinuation, which then is handled below in onChangeConfiguration2.
884    mContinuationCounter = mFetcherInfos.size();
885    mContinuation = msg;
886
887    if (mContinuationCounter == 0) {
888        msg->post();
889    }
890}
891
892void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
893    if (!mReconfigurationInProgress) {
894        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
895    } else {
896        msg->post(1000000ll); // retry in 1 sec
897    }
898}
899
900void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
901    mContinuation.clear();
902
903    // All fetchers are either suspended or have been removed now.
904
905    uint32_t streamMask;
906    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
907
908    AString audioURI, videoURI, subtitleURI;
909    if (streamMask & STREAMTYPE_AUDIO) {
910        CHECK(msg->findString("audioURI", &audioURI));
911        ALOGV("audioURI = '%s'", audioURI.c_str());
912    }
913    if (streamMask & STREAMTYPE_VIDEO) {
914        CHECK(msg->findString("videoURI", &videoURI));
915        ALOGV("videoURI = '%s'", videoURI.c_str());
916    }
917    if (streamMask & STREAMTYPE_SUBTITLES) {
918        CHECK(msg->findString("subtitleURI", &subtitleURI));
919        ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
920    }
921
922    // Determine which decoders to shutdown on the player side,
923    // a decoder has to be shutdown if either
924    // 1) its streamtype was active before but now longer isn't.
925    // or
926    // 2) its streamtype was already active and still is but the URI
927    //    has changed.
928    uint32_t changedMask = 0;
929    if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
930                && !(audioURI == mAudioURI))
931        || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
932        changedMask |= STREAMTYPE_AUDIO;
933    }
934    if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
935                && !(videoURI == mVideoURI))
936        || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
937        changedMask |= STREAMTYPE_VIDEO;
938    }
939
940    if (changedMask == 0) {
941        // If nothing changed as far as the audio/video decoders
942        // are concerned we can proceed.
943        onChangeConfiguration3(msg);
944        return;
945    }
946
947    // Something changed, inform the player which will shutdown the
948    // corresponding decoders and will post the reply once that's done.
949    // Handling the reply will continue executing below in
950    // onChangeConfiguration3.
951    sp<AMessage> notify = mNotify->dup();
952    notify->setInt32("what", kWhatStreamsChanged);
953    notify->setInt32("changedMask", changedMask);
954
955    msg->setWhat(kWhatChangeConfiguration3);
956    msg->setTarget(id());
957
958    notify->setMessage("reply", msg);
959    notify->post();
960}
961
962void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
963    // All remaining fetchers are still suspended, the player has shutdown
964    // any decoders that needed it.
965
966    uint32_t streamMask;
967    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
968
969    AString audioURI, videoURI, subtitleURI;
970    if (streamMask & STREAMTYPE_AUDIO) {
971        CHECK(msg->findString("audioURI", &audioURI));
972    }
973    if (streamMask & STREAMTYPE_VIDEO) {
974        CHECK(msg->findString("videoURI", &videoURI));
975    }
976    if (streamMask & STREAMTYPE_SUBTITLES) {
977        CHECK(msg->findString("subtitleURI", &subtitleURI));
978    }
979
980    int64_t timeUs;
981    CHECK(msg->findInt64("timeUs", &timeUs));
982
983    if (timeUs < 0ll) {
984        timeUs = mLastDequeuedTimeUs;
985    }
986    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
987
988    mStreamMask = streamMask;
989    mAudioURI = audioURI;
990    mVideoURI = videoURI;
991    mSubtitleURI = subtitleURI;
992
993    // Resume all existing fetchers and assign them packet sources.
994    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
995        const AString &uri = mFetcherInfos.keyAt(i);
996
997        uint32_t resumeMask = 0;
998
999        sp<AnotherPacketSource> audioSource;
1000        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1001            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1002            resumeMask |= STREAMTYPE_AUDIO;
1003        }
1004
1005        sp<AnotherPacketSource> videoSource;
1006        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1007            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1008            resumeMask |= STREAMTYPE_VIDEO;
1009        }
1010
1011        sp<AnotherPacketSource> subtitleSource;
1012        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1013            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1014            resumeMask |= STREAMTYPE_SUBTITLES;
1015        }
1016
1017        CHECK_NE(resumeMask, 0u);
1018
1019        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1020
1021        streamMask &= ~resumeMask;
1022
1023        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1024                audioSource, videoSource, subtitleSource);
1025    }
1026
1027    // streamMask now only contains the types that need a new fetcher created.
1028
1029    if (streamMask != 0) {
1030        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1031    }
1032
1033    while (streamMask != 0) {
1034        StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
1035
1036        AString uri;
1037        switch (streamType) {
1038            case STREAMTYPE_AUDIO:
1039                uri = audioURI;
1040                break;
1041            case STREAMTYPE_VIDEO:
1042                uri = videoURI;
1043                break;
1044            case STREAMTYPE_SUBTITLES:
1045                uri = subtitleURI;
1046                break;
1047            default:
1048                TRESPASS();
1049        }
1050
1051        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1052        CHECK(fetcher != NULL);
1053
1054        sp<AnotherPacketSource> audioSource;
1055        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1056            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1057            audioSource->clear();
1058
1059            streamMask &= ~STREAMTYPE_AUDIO;
1060        }
1061
1062        sp<AnotherPacketSource> videoSource;
1063        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1064            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1065            videoSource->clear();
1066
1067            streamMask &= ~STREAMTYPE_VIDEO;
1068        }
1069
1070        sp<AnotherPacketSource> subtitleSource;
1071        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1072            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1073            subtitleSource->clear();
1074
1075            streamMask &= ~STREAMTYPE_SUBTITLES;
1076        }
1077
1078        fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
1079    }
1080
1081    // All fetchers have now been started, the configuration change
1082    // has completed.
1083
1084    scheduleCheckBandwidthEvent();
1085
1086    ALOGV("XXX configuration change completed.");
1087
1088    mReconfigurationInProgress = false;
1089
1090    if (mDisconnectReplyID != 0) {
1091        finishDisconnect();
1092    }
1093}
1094
1095void LiveSession::scheduleCheckBandwidthEvent() {
1096    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1097    msg->setInt32("generation", mCheckBandwidthGeneration);
1098    msg->post(10000000ll);
1099}
1100
1101void LiveSession::cancelCheckBandwidthEvent() {
1102    ++mCheckBandwidthGeneration;
1103}
1104
1105void LiveSession::onCheckBandwidth() {
1106    if (mReconfigurationInProgress) {
1107        scheduleCheckBandwidthEvent();
1108        return;
1109    }
1110
1111    size_t bandwidthIndex = getBandwidthIndex();
1112    if (mPrevBandwidthIndex < 0
1113            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1114        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1115    }
1116
1117    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1118    // schedule another one on return, only an explicit call to
1119    // scheduleCheckBandwidthEvent will do that.
1120    // This ensures that only one configuration change is ongoing at any
1121    // one time, once that completes it'll schedule another check bandwidth
1122    // event.
1123}
1124
1125void LiveSession::postPrepared(status_t err) {
1126    CHECK(mInPreparationPhase);
1127
1128    sp<AMessage> notify = mNotify->dup();
1129    if (err == OK || err == ERROR_END_OF_STREAM) {
1130        notify->setInt32("what", kWhatPrepared);
1131    } else {
1132        notify->setInt32("what", kWhatPreparationFailed);
1133        notify->setInt32("err", err);
1134    }
1135
1136    notify->post();
1137
1138    mInPreparationPhase = false;
1139}
1140
1141}  // namespace android
1142
1143