1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <openssl/aes.h>
42#include <openssl/md5.h>
43
44namespace android {
45
46LiveSession::LiveSession(
47        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
48    : mNotify(notify),
49      mFlags(flags),
50      mUIDValid(uidValid),
51      mUID(uid),
52      mInPreparationPhase(true),
53      mHTTPDataSource(
54              HTTPBase::Create(
55                  (mFlags & kFlagIncognito)
56                    ? HTTPBase::kFlagIncognito
57                    : 0)),
58      mPrevBandwidthIndex(-1),
59      mStreamMask(0),
60      mCheckBandwidthGeneration(0),
61      mLastDequeuedTimeUs(0ll),
62      mRealTimeBaseUs(0ll),
63      mReconfigurationInProgress(false),
64      mDisconnectReplyID(0) {
65    if (mUIDValid) {
66        mHTTPDataSource->setUID(mUID);
67    }
68
69    mPacketSources.add(
70            STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
71
72    mPacketSources.add(
73            STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
74
75    mPacketSources.add(
76            STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
77}
78
79LiveSession::~LiveSession() {
80}
81
82status_t LiveSession::dequeueAccessUnit(
83        StreamType stream, sp<ABuffer> *accessUnit) {
84    if (!(mStreamMask & stream)) {
85        return UNKNOWN_ERROR;
86    }
87
88    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
89
90    status_t finalResult;
91    if (!packetSource->hasBufferAvailable(&finalResult)) {
92        return finalResult == OK ? -EAGAIN : finalResult;
93    }
94
95    status_t err = packetSource->dequeueAccessUnit(accessUnit);
96
97    const char *streamStr;
98    switch (stream) {
99        case STREAMTYPE_AUDIO:
100            streamStr = "audio";
101            break;
102        case STREAMTYPE_VIDEO:
103            streamStr = "video";
104            break;
105        case STREAMTYPE_SUBTITLES:
106            streamStr = "subs";
107            break;
108        default:
109            TRESPASS();
110    }
111
112    if (err == INFO_DISCONTINUITY) {
113        int32_t type;
114        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
115
116        sp<AMessage> extra;
117        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
118            extra.clear();
119        }
120
121        ALOGI("[%s] read discontinuity of type %d, extra = %s",
122              streamStr,
123              type,
124              extra == NULL ? "NULL" : extra->debugString().c_str());
125    } else if (err == OK) {
126        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
127            int64_t timeUs;
128            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
129            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
130
131            mLastDequeuedTimeUs = timeUs;
132            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
133        } else if (stream == STREAMTYPE_SUBTITLES) {
134            (*accessUnit)->meta()->setInt32(
135                    "trackIndex", mPlaylist->getSelectedIndex());
136            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
137        }
138    } else {
139        ALOGI("[%s] encountered error %d", streamStr, err);
140    }
141
142    return err;
143}
144
145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
146    if (!(mStreamMask & stream)) {
147        return UNKNOWN_ERROR;
148    }
149
150    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
151
152    sp<MetaData> meta = packetSource->getFormat();
153
154    if (meta == NULL) {
155        return -EAGAIN;
156    }
157
158    return convertMetaDataToMessage(meta, format);
159}
160
161void LiveSession::connectAsync(
162        const char *url, const KeyedVector<String8, String8> *headers) {
163    sp<AMessage> msg = new AMessage(kWhatConnect, id());
164    msg->setString("url", url);
165
166    if (headers != NULL) {
167        msg->setPointer(
168                "headers",
169                new KeyedVector<String8, String8>(*headers));
170    }
171
172    msg->post();
173}
174
175status_t LiveSession::disconnect() {
176    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
177
178    sp<AMessage> response;
179    status_t err = msg->postAndAwaitResponse(&response);
180
181    return err;
182}
183
184status_t LiveSession::seekTo(int64_t timeUs) {
185    sp<AMessage> msg = new AMessage(kWhatSeek, id());
186    msg->setInt64("timeUs", timeUs);
187
188    sp<AMessage> response;
189    status_t err = msg->postAndAwaitResponse(&response);
190
191    return err;
192}
193
194void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
195    switch (msg->what()) {
196        case kWhatConnect:
197        {
198            onConnect(msg);
199            break;
200        }
201
202        case kWhatDisconnect:
203        {
204            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
205
206            if (mReconfigurationInProgress) {
207                break;
208            }
209
210            finishDisconnect();
211            break;
212        }
213
214        case kWhatSeek:
215        {
216            uint32_t replyID;
217            CHECK(msg->senderAwaitsResponse(&replyID));
218
219            status_t err = onSeek(msg);
220
221            sp<AMessage> response = new AMessage;
222            response->setInt32("err", err);
223
224            response->postReply(replyID);
225            break;
226        }
227
228        case kWhatFetcherNotify:
229        {
230            int32_t what;
231            CHECK(msg->findInt32("what", &what));
232
233            switch (what) {
234                case PlaylistFetcher::kWhatStarted:
235                    break;
236                case PlaylistFetcher::kWhatPaused:
237                case PlaylistFetcher::kWhatStopped:
238                {
239                    if (what == PlaylistFetcher::kWhatStopped) {
240                        AString uri;
241                        CHECK(msg->findString("uri", &uri));
242                        mFetcherInfos.removeItem(uri);
243                    }
244
245                    if (mContinuation != NULL) {
246                        CHECK_GT(mContinuationCounter, 0);
247                        if (--mContinuationCounter == 0) {
248                            mContinuation->post();
249                        }
250                    }
251                    break;
252                }
253
254                case PlaylistFetcher::kWhatDurationUpdate:
255                {
256                    AString uri;
257                    CHECK(msg->findString("uri", &uri));
258
259                    int64_t durationUs;
260                    CHECK(msg->findInt64("durationUs", &durationUs));
261
262                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
263                    info->mDurationUs = durationUs;
264                    break;
265                }
266
267                case PlaylistFetcher::kWhatError:
268                {
269                    status_t err;
270                    CHECK(msg->findInt32("err", &err));
271
272                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
273
274                    if (mInPreparationPhase) {
275                        postPrepared(err);
276                    }
277
278                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
279
280                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
281
282                    mPacketSources.valueFor(
283                            STREAMTYPE_SUBTITLES)->signalEOS(err);
284
285                    sp<AMessage> notify = mNotify->dup();
286                    notify->setInt32("what", kWhatError);
287                    notify->setInt32("err", err);
288                    notify->post();
289                    break;
290                }
291
292                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
293                {
294                    AString uri;
295                    CHECK(msg->findString("uri", &uri));
296
297                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
298                    info->mIsPrepared = true;
299
300                    if (mInPreparationPhase) {
301                        bool allFetchersPrepared = true;
302                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
303                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
304                                allFetchersPrepared = false;
305                                break;
306                            }
307                        }
308
309                        if (allFetchersPrepared) {
310                            postPrepared(OK);
311                        }
312                    }
313                    break;
314                }
315
316                default:
317                    TRESPASS();
318            }
319
320            break;
321        }
322
323        case kWhatCheckBandwidth:
324        {
325            int32_t generation;
326            CHECK(msg->findInt32("generation", &generation));
327
328            if (generation != mCheckBandwidthGeneration) {
329                break;
330            }
331
332            onCheckBandwidth();
333            break;
334        }
335
336        case kWhatChangeConfiguration:
337        {
338            onChangeConfiguration(msg);
339            break;
340        }
341
342        case kWhatChangeConfiguration2:
343        {
344            onChangeConfiguration2(msg);
345            break;
346        }
347
348        case kWhatChangeConfiguration3:
349        {
350            onChangeConfiguration3(msg);
351            break;
352        }
353
354        case kWhatFinishDisconnect2:
355        {
356            onFinishDisconnect2();
357            break;
358        }
359
360        default:
361            TRESPASS();
362            break;
363    }
364}
365
366// static
367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
368    if (a->mBandwidth < b->mBandwidth) {
369        return -1;
370    } else if (a->mBandwidth == b->mBandwidth) {
371        return 0;
372    }
373
374    return 1;
375}
376
377void LiveSession::onConnect(const sp<AMessage> &msg) {
378    AString url;
379    CHECK(msg->findString("url", &url));
380
381    KeyedVector<String8, String8> *headers = NULL;
382    if (!msg->findPointer("headers", (void **)&headers)) {
383        mExtraHeaders.clear();
384    } else {
385        mExtraHeaders = *headers;
386
387        delete headers;
388        headers = NULL;
389    }
390
391#if 1
392    ALOGI("onConnect <URL suppressed>");
393#else
394    ALOGI("onConnect %s", url.c_str());
395#endif
396
397    mMasterURL = url;
398
399    bool dummy;
400    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
401
402    if (mPlaylist == NULL) {
403        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
404
405        postPrepared(ERROR_IO);
406        return;
407    }
408
409    // We trust the content provider to make a reasonable choice of preferred
410    // initial bandwidth by listing it first in the variant playlist.
411    // At startup we really don't have a good estimate on the available
412    // network bandwidth since we haven't tranferred any data yet. Once
413    // we have we can make a better informed choice.
414    size_t initialBandwidth = 0;
415    size_t initialBandwidthIndex = 0;
416
417    if (mPlaylist->isVariantPlaylist()) {
418        for (size_t i = 0; i < mPlaylist->size(); ++i) {
419            BandwidthItem item;
420
421            item.mPlaylistIndex = i;
422
423            sp<AMessage> meta;
424            AString uri;
425            mPlaylist->itemAt(i, &uri, &meta);
426
427            unsigned long bandwidth;
428            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
429
430            if (initialBandwidth == 0) {
431                initialBandwidth = item.mBandwidth;
432            }
433
434            mBandwidthItems.push(item);
435        }
436
437        CHECK_GT(mBandwidthItems.size(), 0u);
438
439        mBandwidthItems.sort(SortByBandwidth);
440
441        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
442            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
443                initialBandwidthIndex = i;
444                break;
445            }
446        }
447    } else {
448        // dummy item.
449        BandwidthItem item;
450        item.mPlaylistIndex = 0;
451        item.mBandwidth = 0;
452        mBandwidthItems.push(item);
453    }
454
455    changeConfiguration(
456            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
457}
458
459void LiveSession::finishDisconnect() {
460    // No reconfiguration is currently pending, make sure none will trigger
461    // during disconnection either.
462    cancelCheckBandwidthEvent();
463
464    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
465        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
466    }
467
468    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
469
470    mContinuationCounter = mFetcherInfos.size();
471    mContinuation = msg;
472
473    if (mContinuationCounter == 0) {
474        msg->post();
475    }
476}
477
478void LiveSession::onFinishDisconnect2() {
479    mContinuation.clear();
480
481    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
482    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
483
484    mPacketSources.valueFor(
485            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
486
487    sp<AMessage> response = new AMessage;
488    response->setInt32("err", OK);
489
490    response->postReply(mDisconnectReplyID);
491    mDisconnectReplyID = 0;
492}
493
494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
495    ssize_t index = mFetcherInfos.indexOfKey(uri);
496
497    if (index >= 0) {
498        return NULL;
499    }
500
501    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
502    notify->setString("uri", uri);
503
504    FetcherInfo info;
505    info.mFetcher = new PlaylistFetcher(notify, this, uri);
506    info.mDurationUs = -1ll;
507    info.mIsPrepared = false;
508    looper()->registerHandler(info.mFetcher);
509
510    mFetcherInfos.add(uri, info);
511
512    return info.mFetcher;
513}
514
515status_t LiveSession::fetchFile(
516        const char *url, sp<ABuffer> *out,
517        int64_t range_offset, int64_t range_length) {
518    *out = NULL;
519
520    sp<DataSource> source;
521
522    if (!strncasecmp(url, "file://", 7)) {
523        source = new FileSource(url + 7);
524    } else if (strncasecmp(url, "http://", 7)
525            && strncasecmp(url, "https://", 8)) {
526        return ERROR_UNSUPPORTED;
527    } else {
528        KeyedVector<String8, String8> headers = mExtraHeaders;
529        if (range_offset > 0 || range_length >= 0) {
530            headers.add(
531                    String8("Range"),
532                    String8(
533                        StringPrintf(
534                            "bytes=%lld-%s",
535                            range_offset,
536                            range_length < 0
537                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
538        }
539        status_t err = mHTTPDataSource->connect(url, &headers);
540
541        if (err != OK) {
542            return err;
543        }
544
545        source = mHTTPDataSource;
546    }
547
548    off64_t size;
549    status_t err = source->getSize(&size);
550
551    if (err != OK) {
552        size = 65536;
553    }
554
555    sp<ABuffer> buffer = new ABuffer(size);
556    buffer->setRange(0, 0);
557
558    for (;;) {
559        size_t bufferRemaining = buffer->capacity() - buffer->size();
560
561        if (bufferRemaining == 0) {
562            bufferRemaining = 32768;
563
564            ALOGV("increasing download buffer to %d bytes",
565                 buffer->size() + bufferRemaining);
566
567            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
568            memcpy(copy->data(), buffer->data(), buffer->size());
569            copy->setRange(0, buffer->size());
570
571            buffer = copy;
572        }
573
574        size_t maxBytesToRead = bufferRemaining;
575        if (range_length >= 0) {
576            int64_t bytesLeftInRange = range_length - buffer->size();
577            if (bytesLeftInRange < maxBytesToRead) {
578                maxBytesToRead = bytesLeftInRange;
579
580                if (bytesLeftInRange == 0) {
581                    break;
582                }
583            }
584        }
585
586        ssize_t n = source->readAt(
587                buffer->size(), buffer->data() + buffer->size(),
588                maxBytesToRead);
589
590        if (n < 0) {
591            return n;
592        }
593
594        if (n == 0) {
595            break;
596        }
597
598        buffer->setRange(0, buffer->size() + (size_t)n);
599    }
600
601    *out = buffer;
602
603    return OK;
604}
605
606sp<M3UParser> LiveSession::fetchPlaylist(
607        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
608    ALOGV("fetchPlaylist '%s'", url);
609
610    *unchanged = false;
611
612    sp<ABuffer> buffer;
613    status_t err = fetchFile(url, &buffer);
614
615    if (err != OK) {
616        return NULL;
617    }
618
619    // MD5 functionality is not available on the simulator, treat all
620    // playlists as changed.
621
622#if defined(HAVE_ANDROID_OS)
623    uint8_t hash[16];
624
625    MD5_CTX m;
626    MD5_Init(&m);
627    MD5_Update(&m, buffer->data(), buffer->size());
628
629    MD5_Final(hash, &m);
630
631    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
632        // playlist unchanged
633        *unchanged = true;
634
635        ALOGV("Playlist unchanged, refresh state is now %d",
636             (int)mRefreshState);
637
638        return NULL;
639    }
640
641    if (curPlaylistHash != NULL) {
642        memcpy(curPlaylistHash, hash, sizeof(hash));
643    }
644#endif
645
646    sp<M3UParser> playlist =
647        new M3UParser(url, buffer->data(), buffer->size());
648
649    if (playlist->initCheck() != OK) {
650        ALOGE("failed to parse .m3u8 playlist");
651
652        return NULL;
653    }
654
655    return playlist;
656}
657
658static double uniformRand() {
659    return (double)rand() / RAND_MAX;
660}
661
662size_t LiveSession::getBandwidthIndex() {
663    if (mBandwidthItems.size() == 0) {
664        return 0;
665    }
666
667#if 1
668    char value[PROPERTY_VALUE_MAX];
669    ssize_t index = -1;
670    if (property_get("media.httplive.bw-index", value, NULL)) {
671        char *end;
672        index = strtol(value, &end, 10);
673        CHECK(end > value && *end == '\0');
674
675        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
676            index = mBandwidthItems.size() - 1;
677        }
678    }
679
680    if (index < 0) {
681        int32_t bandwidthBps;
682        if (mHTTPDataSource != NULL
683                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
684            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
685        } else {
686            ALOGV("no bandwidth estimate.");
687            return 0;  // Pick the lowest bandwidth stream by default.
688        }
689
690        char value[PROPERTY_VALUE_MAX];
691        if (property_get("media.httplive.max-bw", value, NULL)) {
692            char *end;
693            long maxBw = strtoul(value, &end, 10);
694            if (end > value && *end == '\0') {
695                if (maxBw > 0 && bandwidthBps > maxBw) {
696                    ALOGV("bandwidth capped to %ld bps", maxBw);
697                    bandwidthBps = maxBw;
698                }
699            }
700        }
701
702        // Consider only 80% of the available bandwidth usable.
703        bandwidthBps = (bandwidthBps * 8) / 10;
704
705        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
706
707        index = mBandwidthItems.size() - 1;
708        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
709                                > (size_t)bandwidthBps) {
710            --index;
711        }
712    }
713#elif 0
714    // Change bandwidth at random()
715    size_t index = uniformRand() * mBandwidthItems.size();
716#elif 0
717    // There's a 50% chance to stay on the current bandwidth and
718    // a 50% chance to switch to the next higher bandwidth (wrapping around
719    // to lowest)
720    const size_t kMinIndex = 0;
721
722    static ssize_t mPrevBandwidthIndex = -1;
723
724    size_t index;
725    if (mPrevBandwidthIndex < 0) {
726        index = kMinIndex;
727    } else if (uniformRand() < 0.5) {
728        index = (size_t)mPrevBandwidthIndex;
729    } else {
730        index = mPrevBandwidthIndex + 1;
731        if (index == mBandwidthItems.size()) {
732            index = kMinIndex;
733        }
734    }
735    mPrevBandwidthIndex = index;
736#elif 0
737    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
738
739    size_t index = mBandwidthItems.size() - 1;
740    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
741        --index;
742    }
743#elif 1
744    char value[PROPERTY_VALUE_MAX];
745    size_t index;
746    if (property_get("media.httplive.bw-index", value, NULL)) {
747        char *end;
748        index = strtoul(value, &end, 10);
749        CHECK(end > value && *end == '\0');
750
751        if (index >= mBandwidthItems.size()) {
752            index = mBandwidthItems.size() - 1;
753        }
754    } else {
755        index = 0;
756    }
757#else
758    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
759#endif
760
761    CHECK_GE(index, 0);
762
763    return index;
764}
765
766status_t LiveSession::onSeek(const sp<AMessage> &msg) {
767    int64_t timeUs;
768    CHECK(msg->findInt64("timeUs", &timeUs));
769
770    if (!mReconfigurationInProgress) {
771        changeConfiguration(timeUs, getBandwidthIndex());
772    }
773
774    return OK;
775}
776
777status_t LiveSession::getDuration(int64_t *durationUs) const {
778    int64_t maxDurationUs = 0ll;
779    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
780        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
781
782        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
783            maxDurationUs = fetcherDurationUs;
784        }
785    }
786
787    *durationUs = maxDurationUs;
788
789    return OK;
790}
791
792bool LiveSession::isSeekable() const {
793    int64_t durationUs;
794    return getDuration(&durationUs) == OK && durationUs >= 0;
795}
796
797bool LiveSession::hasDynamicDuration() const {
798    return false;
799}
800
801status_t LiveSession::getTrackInfo(Parcel *reply) const {
802    return mPlaylist->getTrackInfo(reply);
803}
804
805status_t LiveSession::selectTrack(size_t index, bool select) {
806    status_t err = mPlaylist->selectTrack(index, select);
807    if (err == OK) {
808        (new AMessage(kWhatChangeConfiguration, id()))->post();
809    }
810    return err;
811}
812
813void LiveSession::changeConfiguration(
814        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
815    CHECK(!mReconfigurationInProgress);
816    mReconfigurationInProgress = true;
817
818    mPrevBandwidthIndex = bandwidthIndex;
819
820    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
821          timeUs, bandwidthIndex, pickTrack);
822
823    if (pickTrack) {
824        mPlaylist->pickRandomMediaItems();
825    }
826
827    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
828    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
829
830    uint32_t streamMask = 0;
831
832    AString audioURI;
833    if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
834        streamMask |= STREAMTYPE_AUDIO;
835    }
836
837    AString videoURI;
838    if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
839        streamMask |= STREAMTYPE_VIDEO;
840    }
841
842    AString subtitleURI;
843    if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
844        streamMask |= STREAMTYPE_SUBTITLES;
845    }
846
847    // Step 1, stop and discard fetchers that are no longer needed.
848    // Pause those that we'll reuse.
849    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
850        const AString &uri = mFetcherInfos.keyAt(i);
851
852        bool discardFetcher = true;
853
854        // If we're seeking all current fetchers are discarded.
855        if (timeUs < 0ll) {
856            if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
857                    || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
858                    || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
859                discardFetcher = false;
860            }
861        }
862
863        if (discardFetcher) {
864            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
865        } else {
866            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
867        }
868    }
869
870    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
871    msg->setInt32("streamMask", streamMask);
872    msg->setInt64("timeUs", timeUs);
873    if (streamMask & STREAMTYPE_AUDIO) {
874        msg->setString("audioURI", audioURI.c_str());
875    }
876    if (streamMask & STREAMTYPE_VIDEO) {
877        msg->setString("videoURI", videoURI.c_str());
878    }
879    if (streamMask & STREAMTYPE_SUBTITLES) {
880        msg->setString("subtitleURI", subtitleURI.c_str());
881    }
882
883    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
884    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
885    // fetchers have completed their asynchronous operation, we'll post
886    // mContinuation, which then is handled below in onChangeConfiguration2.
887    mContinuationCounter = mFetcherInfos.size();
888    mContinuation = msg;
889
890    if (mContinuationCounter == 0) {
891        msg->post();
892    }
893}
894
895void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
896    if (!mReconfigurationInProgress) {
897        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
898    } else {
899        msg->post(1000000ll); // retry in 1 sec
900    }
901}
902
903void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
904    mContinuation.clear();
905
906    // All fetchers are either suspended or have been removed now.
907
908    uint32_t streamMask;
909    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
910
911    AString audioURI, videoURI, subtitleURI;
912    if (streamMask & STREAMTYPE_AUDIO) {
913        CHECK(msg->findString("audioURI", &audioURI));
914        ALOGV("audioURI = '%s'", audioURI.c_str());
915    }
916    if (streamMask & STREAMTYPE_VIDEO) {
917        CHECK(msg->findString("videoURI", &videoURI));
918        ALOGV("videoURI = '%s'", videoURI.c_str());
919    }
920    if (streamMask & STREAMTYPE_SUBTITLES) {
921        CHECK(msg->findString("subtitleURI", &subtitleURI));
922        ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
923    }
924
925    // Determine which decoders to shutdown on the player side,
926    // a decoder has to be shutdown if either
927    // 1) its streamtype was active before but now longer isn't.
928    // or
929    // 2) its streamtype was already active and still is but the URI
930    //    has changed.
931    uint32_t changedMask = 0;
932    if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
933                && !(audioURI == mAudioURI))
934        || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
935        changedMask |= STREAMTYPE_AUDIO;
936    }
937    if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
938                && !(videoURI == mVideoURI))
939        || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
940        changedMask |= STREAMTYPE_VIDEO;
941    }
942
943    if (changedMask == 0) {
944        // If nothing changed as far as the audio/video decoders
945        // are concerned we can proceed.
946        onChangeConfiguration3(msg);
947        return;
948    }
949
950    // Something changed, inform the player which will shutdown the
951    // corresponding decoders and will post the reply once that's done.
952    // Handling the reply will continue executing below in
953    // onChangeConfiguration3.
954    sp<AMessage> notify = mNotify->dup();
955    notify->setInt32("what", kWhatStreamsChanged);
956    notify->setInt32("changedMask", changedMask);
957
958    msg->setWhat(kWhatChangeConfiguration3);
959    msg->setTarget(id());
960
961    notify->setMessage("reply", msg);
962    notify->post();
963}
964
965void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
966    // All remaining fetchers are still suspended, the player has shutdown
967    // any decoders that needed it.
968
969    uint32_t streamMask;
970    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
971
972    AString audioURI, videoURI, subtitleURI;
973    if (streamMask & STREAMTYPE_AUDIO) {
974        CHECK(msg->findString("audioURI", &audioURI));
975    }
976    if (streamMask & STREAMTYPE_VIDEO) {
977        CHECK(msg->findString("videoURI", &videoURI));
978    }
979    if (streamMask & STREAMTYPE_SUBTITLES) {
980        CHECK(msg->findString("subtitleURI", &subtitleURI));
981    }
982
983    int64_t timeUs;
984    CHECK(msg->findInt64("timeUs", &timeUs));
985
986    if (timeUs < 0ll) {
987        timeUs = mLastDequeuedTimeUs;
988    }
989    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
990
991    mStreamMask = streamMask;
992    mAudioURI = audioURI;
993    mVideoURI = videoURI;
994    mSubtitleURI = subtitleURI;
995
996    // Resume all existing fetchers and assign them packet sources.
997    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
998        const AString &uri = mFetcherInfos.keyAt(i);
999
1000        uint32_t resumeMask = 0;
1001
1002        sp<AnotherPacketSource> audioSource;
1003        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1004            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1005            resumeMask |= STREAMTYPE_AUDIO;
1006        }
1007
1008        sp<AnotherPacketSource> videoSource;
1009        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1010            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1011            resumeMask |= STREAMTYPE_VIDEO;
1012        }
1013
1014        sp<AnotherPacketSource> subtitleSource;
1015        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1016            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1017            resumeMask |= STREAMTYPE_SUBTITLES;
1018        }
1019
1020        CHECK_NE(resumeMask, 0u);
1021
1022        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1023
1024        streamMask &= ~resumeMask;
1025
1026        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1027                audioSource, videoSource, subtitleSource);
1028    }
1029
1030    // streamMask now only contains the types that need a new fetcher created.
1031
1032    if (streamMask != 0) {
1033        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1034    }
1035
1036    while (streamMask != 0) {
1037        StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
1038
1039        AString uri;
1040        switch (streamType) {
1041            case STREAMTYPE_AUDIO:
1042                uri = audioURI;
1043                break;
1044            case STREAMTYPE_VIDEO:
1045                uri = videoURI;
1046                break;
1047            case STREAMTYPE_SUBTITLES:
1048                uri = subtitleURI;
1049                break;
1050            default:
1051                TRESPASS();
1052        }
1053
1054        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1055        CHECK(fetcher != NULL);
1056
1057        sp<AnotherPacketSource> audioSource;
1058        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1059            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1060            audioSource->clear();
1061
1062            streamMask &= ~STREAMTYPE_AUDIO;
1063        }
1064
1065        sp<AnotherPacketSource> videoSource;
1066        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1067            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1068            videoSource->clear();
1069
1070            streamMask &= ~STREAMTYPE_VIDEO;
1071        }
1072
1073        sp<AnotherPacketSource> subtitleSource;
1074        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1075            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1076            subtitleSource->clear();
1077
1078            streamMask &= ~STREAMTYPE_SUBTITLES;
1079        }
1080
1081        fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
1082    }
1083
1084    // All fetchers have now been started, the configuration change
1085    // has completed.
1086
1087    scheduleCheckBandwidthEvent();
1088
1089    ALOGV("XXX configuration change completed.");
1090
1091    mReconfigurationInProgress = false;
1092
1093    if (mDisconnectReplyID != 0) {
1094        finishDisconnect();
1095    }
1096}
1097
1098void LiveSession::scheduleCheckBandwidthEvent() {
1099    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1100    msg->setInt32("generation", mCheckBandwidthGeneration);
1101    msg->post(10000000ll);
1102}
1103
1104void LiveSession::cancelCheckBandwidthEvent() {
1105    ++mCheckBandwidthGeneration;
1106}
1107
1108void LiveSession::onCheckBandwidth() {
1109    if (mReconfigurationInProgress) {
1110        scheduleCheckBandwidthEvent();
1111        return;
1112    }
1113
1114    size_t bandwidthIndex = getBandwidthIndex();
1115    if (mPrevBandwidthIndex < 0
1116            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1117        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1118    }
1119
1120    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1121    // schedule another one on return, only an explicit call to
1122    // scheduleCheckBandwidthEvent will do that.
1123    // This ensures that only one configuration change is ongoing at any
1124    // one time, once that completes it'll schedule another check bandwidth
1125    // event.
1126}
1127
1128void LiveSession::postPrepared(status_t err) {
1129    CHECK(mInPreparationPhase);
1130
1131    sp<AMessage> notify = mNotify->dup();
1132    if (err == OK || err == ERROR_END_OF_STREAM) {
1133        notify->setInt32("what", kWhatPrepared);
1134    } else {
1135        notify->setInt32("what", kWhatPreparationFailed);
1136        notify->setInt32("err", err);
1137    }
1138
1139    notify->post();
1140
1141    mInPreparationPhase = false;
1142}
1143
1144}  // namespace android
1145
1146