LiveSession.cpp revision 784faaf1d76902be6b36d3af01fb5325f0d45a04
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <openssl/aes.h>
42#include <openssl/md5.h>
43
44namespace android {
45
46LiveSession::LiveSession(
47        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
48    : mNotify(notify),
49      mFlags(flags),
50      mUIDValid(uidValid),
51      mUID(uid),
52      mInPreparationPhase(true),
53      mHTTPDataSource(
54              HTTPBase::Create(
55                  (mFlags & kFlagIncognito)
56                    ? HTTPBase::kFlagIncognito
57                    : 0)),
58      mPrevBandwidthIndex(-1),
59      mStreamMask(0),
60      mCheckBandwidthGeneration(0),
61      mLastDequeuedTimeUs(0ll),
62      mRealTimeBaseUs(0ll),
63      mReconfigurationInProgress(false),
64      mDisconnectReplyID(0) {
65    if (mUIDValid) {
66        mHTTPDataSource->setUID(mUID);
67    }
68
69    mPacketSources.add(
70            STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
71
72    mPacketSources.add(
73            STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
74
75    mPacketSources.add(
76            STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
77}
78
79LiveSession::~LiveSession() {
80}
81
82status_t LiveSession::dequeueAccessUnit(
83        StreamType stream, sp<ABuffer> *accessUnit) {
84    if (!(mStreamMask & stream)) {
85        return UNKNOWN_ERROR;
86    }
87
88    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
89
90    status_t finalResult;
91    if (!packetSource->hasBufferAvailable(&finalResult)) {
92        return finalResult == OK ? -EAGAIN : finalResult;
93    }
94
95    status_t err = packetSource->dequeueAccessUnit(accessUnit);
96
97    const char *streamStr;
98    switch (stream) {
99        case STREAMTYPE_AUDIO:
100            streamStr = "audio";
101            break;
102        case STREAMTYPE_VIDEO:
103            streamStr = "video";
104            break;
105        case STREAMTYPE_SUBTITLES:
106            streamStr = "subs";
107            break;
108        default:
109            TRESPASS();
110    }
111
112    if (err == INFO_DISCONTINUITY) {
113        int32_t type;
114        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
115
116        sp<AMessage> extra;
117        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
118            extra.clear();
119        }
120
121        ALOGI("[%s] read discontinuity of type %d, extra = %s",
122              streamStr,
123              type,
124              extra == NULL ? "NULL" : extra->debugString().c_str());
125    } else if (err == OK) {
126        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
127            int64_t timeUs;
128            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
129            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
130
131            mLastDequeuedTimeUs = timeUs;
132            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
133        } else if (stream == STREAMTYPE_SUBTITLES) {
134            (*accessUnit)->meta()->setInt32(
135                    "trackIndex", mPlaylist->getSelectedIndex());
136            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
137        }
138    } else {
139        ALOGI("[%s] encountered error %d", streamStr, err);
140    }
141
142    return err;
143}
144
145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
146    if (!(mStreamMask & stream)) {
147        return UNKNOWN_ERROR;
148    }
149
150    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
151
152    sp<MetaData> meta = packetSource->getFormat();
153
154    if (meta == NULL) {
155        return -EAGAIN;
156    }
157
158    return convertMetaDataToMessage(meta, format);
159}
160
161void LiveSession::connectAsync(
162        const char *url, const KeyedVector<String8, String8> *headers) {
163    sp<AMessage> msg = new AMessage(kWhatConnect, id());
164    msg->setString("url", url);
165
166    if (headers != NULL) {
167        msg->setPointer(
168                "headers",
169                new KeyedVector<String8, String8>(*headers));
170    }
171
172    msg->post();
173}
174
175status_t LiveSession::disconnect() {
176    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
177
178    sp<AMessage> response;
179    status_t err = msg->postAndAwaitResponse(&response);
180
181    return err;
182}
183
184status_t LiveSession::seekTo(int64_t timeUs) {
185    sp<AMessage> msg = new AMessage(kWhatSeek, id());
186    msg->setInt64("timeUs", timeUs);
187
188    sp<AMessage> response;
189    status_t err = msg->postAndAwaitResponse(&response);
190
191    return err;
192}
193
194void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
195    switch (msg->what()) {
196        case kWhatConnect:
197        {
198            onConnect(msg);
199            break;
200        }
201
202        case kWhatDisconnect:
203        {
204            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
205
206            if (mReconfigurationInProgress) {
207                break;
208            }
209
210            finishDisconnect();
211            break;
212        }
213
214        case kWhatSeek:
215        {
216            uint32_t replyID;
217            CHECK(msg->senderAwaitsResponse(&replyID));
218
219            status_t err = onSeek(msg);
220
221            sp<AMessage> response = new AMessage;
222            response->setInt32("err", err);
223
224            response->postReply(replyID);
225            break;
226        }
227
228        case kWhatFetcherNotify:
229        {
230            int32_t what;
231            CHECK(msg->findInt32("what", &what));
232
233            switch (what) {
234                case PlaylistFetcher::kWhatStarted:
235                    break;
236                case PlaylistFetcher::kWhatPaused:
237                case PlaylistFetcher::kWhatStopped:
238                {
239                    if (what == PlaylistFetcher::kWhatStopped) {
240                        AString uri;
241                        CHECK(msg->findString("uri", &uri));
242                        mFetcherInfos.removeItem(uri);
243                    }
244
245                    if (mContinuation != NULL) {
246                        CHECK_GT(mContinuationCounter, 0);
247                        if (--mContinuationCounter == 0) {
248                            mContinuation->post();
249                        }
250                    }
251                    break;
252                }
253
254                case PlaylistFetcher::kWhatDurationUpdate:
255                {
256                    AString uri;
257                    CHECK(msg->findString("uri", &uri));
258
259                    int64_t durationUs;
260                    CHECK(msg->findInt64("durationUs", &durationUs));
261
262                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
263                    info->mDurationUs = durationUs;
264                    break;
265                }
266
267                case PlaylistFetcher::kWhatError:
268                {
269                    status_t err;
270                    CHECK(msg->findInt32("err", &err));
271
272                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
273
274                    if (mInPreparationPhase) {
275                        postPrepared(err);
276                    }
277
278                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
279
280                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
281
282                    mPacketSources.valueFor(
283                            STREAMTYPE_SUBTITLES)->signalEOS(err);
284
285                    sp<AMessage> notify = mNotify->dup();
286                    notify->setInt32("what", kWhatError);
287                    notify->setInt32("err", err);
288                    notify->post();
289                    break;
290                }
291
292                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
293                {
294                    AString uri;
295                    CHECK(msg->findString("uri", &uri));
296
297                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
298                    info->mIsPrepared = true;
299
300                    if (mInPreparationPhase) {
301                        bool allFetchersPrepared = true;
302                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
303                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
304                                allFetchersPrepared = false;
305                                break;
306                            }
307                        }
308
309                        if (allFetchersPrepared) {
310                            postPrepared(OK);
311                        }
312                    }
313                    break;
314                }
315
316                default:
317                    TRESPASS();
318            }
319
320            break;
321        }
322
323        case kWhatCheckBandwidth:
324        {
325            int32_t generation;
326            CHECK(msg->findInt32("generation", &generation));
327
328            if (generation != mCheckBandwidthGeneration) {
329                break;
330            }
331
332            onCheckBandwidth();
333            break;
334        }
335
336        case kWhatChangeConfiguration:
337        {
338            onChangeConfiguration(msg);
339            break;
340        }
341
342        case kWhatChangeConfiguration2:
343        {
344            onChangeConfiguration2(msg);
345            break;
346        }
347
348        case kWhatChangeConfiguration3:
349        {
350            onChangeConfiguration3(msg);
351            break;
352        }
353
354        case kWhatFinishDisconnect2:
355        {
356            onFinishDisconnect2();
357            break;
358        }
359
360        default:
361            TRESPASS();
362            break;
363    }
364}
365
366// static
367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
368    if (a->mBandwidth < b->mBandwidth) {
369        return -1;
370    } else if (a->mBandwidth == b->mBandwidth) {
371        return 0;
372    }
373
374    return 1;
375}
376
377void LiveSession::onConnect(const sp<AMessage> &msg) {
378    AString url;
379    CHECK(msg->findString("url", &url));
380
381    KeyedVector<String8, String8> *headers = NULL;
382    if (!msg->findPointer("headers", (void **)&headers)) {
383        mExtraHeaders.clear();
384    } else {
385        mExtraHeaders = *headers;
386
387        delete headers;
388        headers = NULL;
389    }
390
391#if 1
392    ALOGI("onConnect <URL suppressed>");
393#else
394    ALOGI("onConnect %s", url.c_str());
395#endif
396
397    mMasterURL = url;
398
399    bool dummy;
400    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
401
402    if (mPlaylist == NULL) {
403        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
404
405        postPrepared(ERROR_IO);
406        return;
407    }
408
409    // We trust the content provider to make a reasonable choice of preferred
410    // initial bandwidth by listing it first in the variant playlist.
411    // At startup we really don't have a good estimate on the available
412    // network bandwidth since we haven't tranferred any data yet. Once
413    // we have we can make a better informed choice.
414    size_t initialBandwidth = 0;
415    size_t initialBandwidthIndex = 0;
416
417    if (mPlaylist->isVariantPlaylist()) {
418        for (size_t i = 0; i < mPlaylist->size(); ++i) {
419            BandwidthItem item;
420
421            item.mPlaylistIndex = i;
422
423            sp<AMessage> meta;
424            AString uri;
425            mPlaylist->itemAt(i, &uri, &meta);
426
427            unsigned long bandwidth;
428            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
429
430            if (initialBandwidth == 0) {
431                initialBandwidth = item.mBandwidth;
432            }
433
434            mBandwidthItems.push(item);
435        }
436
437        CHECK_GT(mBandwidthItems.size(), 0u);
438
439        mBandwidthItems.sort(SortByBandwidth);
440
441        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
442            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
443                initialBandwidthIndex = i;
444                break;
445            }
446        }
447    } else {
448        // dummy item.
449        BandwidthItem item;
450        item.mPlaylistIndex = 0;
451        item.mBandwidth = 0;
452        mBandwidthItems.push(item);
453    }
454
455    changeConfiguration(
456            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
457}
458
459void LiveSession::finishDisconnect() {
460    // No reconfiguration is currently pending, make sure none will trigger
461    // during disconnection either.
462    cancelCheckBandwidthEvent();
463
464    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
465        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
466    }
467
468    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
469
470    mContinuationCounter = mFetcherInfos.size();
471    mContinuation = msg;
472
473    if (mContinuationCounter == 0) {
474        msg->post();
475    }
476}
477
478void LiveSession::onFinishDisconnect2() {
479    mContinuation.clear();
480
481    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
482    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
483
484    mPacketSources.valueFor(
485            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
486
487    sp<AMessage> response = new AMessage;
488    response->setInt32("err", OK);
489
490    response->postReply(mDisconnectReplyID);
491    mDisconnectReplyID = 0;
492}
493
494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
495    ssize_t index = mFetcherInfos.indexOfKey(uri);
496
497    if (index >= 0) {
498        return NULL;
499    }
500
501    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
502    notify->setString("uri", uri);
503
504    FetcherInfo info;
505    info.mFetcher = new PlaylistFetcher(notify, this, uri);
506    info.mDurationUs = -1ll;
507    info.mIsPrepared = false;
508    looper()->registerHandler(info.mFetcher);
509
510    mFetcherInfos.add(uri, info);
511
512    return info.mFetcher;
513}
514
515status_t LiveSession::fetchFile(
516        const char *url, sp<ABuffer> *out,
517        int64_t range_offset, int64_t range_length,
518        String8 *actualUrl) {
519    *out = NULL;
520
521    sp<DataSource> source;
522
523    if (!strncasecmp(url, "file://", 7)) {
524        source = new FileSource(url + 7);
525    } else if (strncasecmp(url, "http://", 7)
526            && strncasecmp(url, "https://", 8)) {
527        return ERROR_UNSUPPORTED;
528    } else {
529        KeyedVector<String8, String8> headers = mExtraHeaders;
530        if (range_offset > 0 || range_length >= 0) {
531            headers.add(
532                    String8("Range"),
533                    String8(
534                        StringPrintf(
535                            "bytes=%lld-%s",
536                            range_offset,
537                            range_length < 0
538                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
539        }
540        status_t err = mHTTPDataSource->connect(url, &headers);
541
542        if (err != OK) {
543            return err;
544        }
545
546        source = mHTTPDataSource;
547    }
548
549    off64_t size;
550    status_t err = source->getSize(&size);
551
552    if (err != OK) {
553        size = 65536;
554    }
555
556    sp<ABuffer> buffer = new ABuffer(size);
557    buffer->setRange(0, 0);
558
559    for (;;) {
560        size_t bufferRemaining = buffer->capacity() - buffer->size();
561
562        if (bufferRemaining == 0) {
563            bufferRemaining = 32768;
564
565            ALOGV("increasing download buffer to %d bytes",
566                 buffer->size() + bufferRemaining);
567
568            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
569            memcpy(copy->data(), buffer->data(), buffer->size());
570            copy->setRange(0, buffer->size());
571
572            buffer = copy;
573        }
574
575        size_t maxBytesToRead = bufferRemaining;
576        if (range_length >= 0) {
577            int64_t bytesLeftInRange = range_length - buffer->size();
578            if (bytesLeftInRange < maxBytesToRead) {
579                maxBytesToRead = bytesLeftInRange;
580
581                if (bytesLeftInRange == 0) {
582                    break;
583                }
584            }
585        }
586
587        ssize_t n = source->readAt(
588                buffer->size(), buffer->data() + buffer->size(),
589                maxBytesToRead);
590
591        if (n < 0) {
592            return n;
593        }
594
595        if (n == 0) {
596            break;
597        }
598
599        buffer->setRange(0, buffer->size() + (size_t)n);
600    }
601
602    *out = buffer;
603    if (actualUrl != NULL) {
604        *actualUrl = source->getUri();
605        if (actualUrl->isEmpty()) {
606            *actualUrl = url;
607        }
608    }
609
610    return OK;
611}
612
613sp<M3UParser> LiveSession::fetchPlaylist(
614        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
615    ALOGV("fetchPlaylist '%s'", url);
616
617    *unchanged = false;
618
619    sp<ABuffer> buffer;
620    String8 actualUrl;
621    status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl);
622
623    if (err != OK) {
624        return NULL;
625    }
626
627    // MD5 functionality is not available on the simulator, treat all
628    // playlists as changed.
629
630#if defined(HAVE_ANDROID_OS)
631    uint8_t hash[16];
632
633    MD5_CTX m;
634    MD5_Init(&m);
635    MD5_Update(&m, buffer->data(), buffer->size());
636
637    MD5_Final(hash, &m);
638
639    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
640        // playlist unchanged
641        *unchanged = true;
642
643        ALOGV("Playlist unchanged, refresh state is now %d",
644             (int)mRefreshState);
645
646        return NULL;
647    }
648
649    if (curPlaylistHash != NULL) {
650        memcpy(curPlaylistHash, hash, sizeof(hash));
651    }
652#endif
653
654    sp<M3UParser> playlist =
655        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
656
657    if (playlist->initCheck() != OK) {
658        ALOGE("failed to parse .m3u8 playlist");
659
660        return NULL;
661    }
662
663    return playlist;
664}
665
666static double uniformRand() {
667    return (double)rand() / RAND_MAX;
668}
669
670size_t LiveSession::getBandwidthIndex() {
671    if (mBandwidthItems.size() == 0) {
672        return 0;
673    }
674
675#if 1
676    char value[PROPERTY_VALUE_MAX];
677    ssize_t index = -1;
678    if (property_get("media.httplive.bw-index", value, NULL)) {
679        char *end;
680        index = strtol(value, &end, 10);
681        CHECK(end > value && *end == '\0');
682
683        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
684            index = mBandwidthItems.size() - 1;
685        }
686    }
687
688    if (index < 0) {
689        int32_t bandwidthBps;
690        if (mHTTPDataSource != NULL
691                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
692            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
693        } else {
694            ALOGV("no bandwidth estimate.");
695            return 0;  // Pick the lowest bandwidth stream by default.
696        }
697
698        char value[PROPERTY_VALUE_MAX];
699        if (property_get("media.httplive.max-bw", value, NULL)) {
700            char *end;
701            long maxBw = strtoul(value, &end, 10);
702            if (end > value && *end == '\0') {
703                if (maxBw > 0 && bandwidthBps > maxBw) {
704                    ALOGV("bandwidth capped to %ld bps", maxBw);
705                    bandwidthBps = maxBw;
706                }
707            }
708        }
709
710        // Consider only 80% of the available bandwidth usable.
711        bandwidthBps = (bandwidthBps * 8) / 10;
712
713        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
714
715        index = mBandwidthItems.size() - 1;
716        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
717                                > (size_t)bandwidthBps) {
718            --index;
719        }
720    }
721#elif 0
722    // Change bandwidth at random()
723    size_t index = uniformRand() * mBandwidthItems.size();
724#elif 0
725    // There's a 50% chance to stay on the current bandwidth and
726    // a 50% chance to switch to the next higher bandwidth (wrapping around
727    // to lowest)
728    const size_t kMinIndex = 0;
729
730    static ssize_t mPrevBandwidthIndex = -1;
731
732    size_t index;
733    if (mPrevBandwidthIndex < 0) {
734        index = kMinIndex;
735    } else if (uniformRand() < 0.5) {
736        index = (size_t)mPrevBandwidthIndex;
737    } else {
738        index = mPrevBandwidthIndex + 1;
739        if (index == mBandwidthItems.size()) {
740            index = kMinIndex;
741        }
742    }
743    mPrevBandwidthIndex = index;
744#elif 0
745    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
746
747    size_t index = mBandwidthItems.size() - 1;
748    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
749        --index;
750    }
751#elif 1
752    char value[PROPERTY_VALUE_MAX];
753    size_t index;
754    if (property_get("media.httplive.bw-index", value, NULL)) {
755        char *end;
756        index = strtoul(value, &end, 10);
757        CHECK(end > value && *end == '\0');
758
759        if (index >= mBandwidthItems.size()) {
760            index = mBandwidthItems.size() - 1;
761        }
762    } else {
763        index = 0;
764    }
765#else
766    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
767#endif
768
769    CHECK_GE(index, 0);
770
771    return index;
772}
773
774status_t LiveSession::onSeek(const sp<AMessage> &msg) {
775    int64_t timeUs;
776    CHECK(msg->findInt64("timeUs", &timeUs));
777
778    if (!mReconfigurationInProgress) {
779        changeConfiguration(timeUs, getBandwidthIndex());
780    }
781
782    return OK;
783}
784
785status_t LiveSession::getDuration(int64_t *durationUs) const {
786    int64_t maxDurationUs = 0ll;
787    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
788        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
789
790        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
791            maxDurationUs = fetcherDurationUs;
792        }
793    }
794
795    *durationUs = maxDurationUs;
796
797    return OK;
798}
799
800bool LiveSession::isSeekable() const {
801    int64_t durationUs;
802    return getDuration(&durationUs) == OK && durationUs >= 0;
803}
804
805bool LiveSession::hasDynamicDuration() const {
806    return false;
807}
808
809status_t LiveSession::getTrackInfo(Parcel *reply) const {
810    return mPlaylist->getTrackInfo(reply);
811}
812
813status_t LiveSession::selectTrack(size_t index, bool select) {
814    status_t err = mPlaylist->selectTrack(index, select);
815    if (err == OK) {
816        (new AMessage(kWhatChangeConfiguration, id()))->post();
817    }
818    return err;
819}
820
821void LiveSession::changeConfiguration(
822        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
823    CHECK(!mReconfigurationInProgress);
824    mReconfigurationInProgress = true;
825
826    mPrevBandwidthIndex = bandwidthIndex;
827
828    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
829          timeUs, bandwidthIndex, pickTrack);
830
831    if (pickTrack) {
832        mPlaylist->pickRandomMediaItems();
833    }
834
835    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
836    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
837
838    uint32_t streamMask = 0;
839
840    AString audioURI;
841    if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
842        streamMask |= STREAMTYPE_AUDIO;
843    }
844
845    AString videoURI;
846    if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
847        streamMask |= STREAMTYPE_VIDEO;
848    }
849
850    AString subtitleURI;
851    if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
852        streamMask |= STREAMTYPE_SUBTITLES;
853    }
854
855    // Step 1, stop and discard fetchers that are no longer needed.
856    // Pause those that we'll reuse.
857    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
858        const AString &uri = mFetcherInfos.keyAt(i);
859
860        bool discardFetcher = true;
861
862        // If we're seeking all current fetchers are discarded.
863        if (timeUs < 0ll) {
864            if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
865                    || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
866                    || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
867                discardFetcher = false;
868            }
869        }
870
871        if (discardFetcher) {
872            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
873        } else {
874            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
875        }
876    }
877
878    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
879    msg->setInt32("streamMask", streamMask);
880    msg->setInt64("timeUs", timeUs);
881    if (streamMask & STREAMTYPE_AUDIO) {
882        msg->setString("audioURI", audioURI.c_str());
883    }
884    if (streamMask & STREAMTYPE_VIDEO) {
885        msg->setString("videoURI", videoURI.c_str());
886    }
887    if (streamMask & STREAMTYPE_SUBTITLES) {
888        msg->setString("subtitleURI", subtitleURI.c_str());
889    }
890
891    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
892    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
893    // fetchers have completed their asynchronous operation, we'll post
894    // mContinuation, which then is handled below in onChangeConfiguration2.
895    mContinuationCounter = mFetcherInfos.size();
896    mContinuation = msg;
897
898    if (mContinuationCounter == 0) {
899        msg->post();
900    }
901}
902
903void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
904    if (!mReconfigurationInProgress) {
905        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
906    } else {
907        msg->post(1000000ll); // retry in 1 sec
908    }
909}
910
911void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
912    mContinuation.clear();
913
914    // All fetchers are either suspended or have been removed now.
915
916    uint32_t streamMask;
917    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
918
919    AString audioURI, videoURI, subtitleURI;
920    if (streamMask & STREAMTYPE_AUDIO) {
921        CHECK(msg->findString("audioURI", &audioURI));
922        ALOGV("audioURI = '%s'", audioURI.c_str());
923    }
924    if (streamMask & STREAMTYPE_VIDEO) {
925        CHECK(msg->findString("videoURI", &videoURI));
926        ALOGV("videoURI = '%s'", videoURI.c_str());
927    }
928    if (streamMask & STREAMTYPE_SUBTITLES) {
929        CHECK(msg->findString("subtitleURI", &subtitleURI));
930        ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
931    }
932
933    // Determine which decoders to shutdown on the player side,
934    // a decoder has to be shutdown if either
935    // 1) its streamtype was active before but now longer isn't.
936    // or
937    // 2) its streamtype was already active and still is but the URI
938    //    has changed.
939    uint32_t changedMask = 0;
940    if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
941                && !(audioURI == mAudioURI))
942        || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
943        changedMask |= STREAMTYPE_AUDIO;
944    }
945    if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
946                && !(videoURI == mVideoURI))
947        || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
948        changedMask |= STREAMTYPE_VIDEO;
949    }
950
951    if (changedMask == 0) {
952        // If nothing changed as far as the audio/video decoders
953        // are concerned we can proceed.
954        onChangeConfiguration3(msg);
955        return;
956    }
957
958    // Something changed, inform the player which will shutdown the
959    // corresponding decoders and will post the reply once that's done.
960    // Handling the reply will continue executing below in
961    // onChangeConfiguration3.
962    sp<AMessage> notify = mNotify->dup();
963    notify->setInt32("what", kWhatStreamsChanged);
964    notify->setInt32("changedMask", changedMask);
965
966    msg->setWhat(kWhatChangeConfiguration3);
967    msg->setTarget(id());
968
969    notify->setMessage("reply", msg);
970    notify->post();
971}
972
973void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
974    // All remaining fetchers are still suspended, the player has shutdown
975    // any decoders that needed it.
976
977    uint32_t streamMask;
978    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
979
980    AString audioURI, videoURI, subtitleURI;
981    if (streamMask & STREAMTYPE_AUDIO) {
982        CHECK(msg->findString("audioURI", &audioURI));
983    }
984    if (streamMask & STREAMTYPE_VIDEO) {
985        CHECK(msg->findString("videoURI", &videoURI));
986    }
987    if (streamMask & STREAMTYPE_SUBTITLES) {
988        CHECK(msg->findString("subtitleURI", &subtitleURI));
989    }
990
991    int64_t timeUs;
992    CHECK(msg->findInt64("timeUs", &timeUs));
993
994    if (timeUs < 0ll) {
995        timeUs = mLastDequeuedTimeUs;
996    }
997    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
998
999    mStreamMask = streamMask;
1000    mAudioURI = audioURI;
1001    mVideoURI = videoURI;
1002    mSubtitleURI = subtitleURI;
1003
1004    // Resume all existing fetchers and assign them packet sources.
1005    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1006        const AString &uri = mFetcherInfos.keyAt(i);
1007
1008        uint32_t resumeMask = 0;
1009
1010        sp<AnotherPacketSource> audioSource;
1011        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1012            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1013            resumeMask |= STREAMTYPE_AUDIO;
1014        }
1015
1016        sp<AnotherPacketSource> videoSource;
1017        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1018            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1019            resumeMask |= STREAMTYPE_VIDEO;
1020        }
1021
1022        sp<AnotherPacketSource> subtitleSource;
1023        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1024            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1025            resumeMask |= STREAMTYPE_SUBTITLES;
1026        }
1027
1028        CHECK_NE(resumeMask, 0u);
1029
1030        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1031
1032        streamMask &= ~resumeMask;
1033
1034        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1035                audioSource, videoSource, subtitleSource);
1036    }
1037
1038    // streamMask now only contains the types that need a new fetcher created.
1039
1040    if (streamMask != 0) {
1041        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1042    }
1043
1044    while (streamMask != 0) {
1045        StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
1046
1047        AString uri;
1048        switch (streamType) {
1049            case STREAMTYPE_AUDIO:
1050                uri = audioURI;
1051                break;
1052            case STREAMTYPE_VIDEO:
1053                uri = videoURI;
1054                break;
1055            case STREAMTYPE_SUBTITLES:
1056                uri = subtitleURI;
1057                break;
1058            default:
1059                TRESPASS();
1060        }
1061
1062        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1063        CHECK(fetcher != NULL);
1064
1065        sp<AnotherPacketSource> audioSource;
1066        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1067            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1068            audioSource->clear();
1069
1070            streamMask &= ~STREAMTYPE_AUDIO;
1071        }
1072
1073        sp<AnotherPacketSource> videoSource;
1074        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1075            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1076            videoSource->clear();
1077
1078            streamMask &= ~STREAMTYPE_VIDEO;
1079        }
1080
1081        sp<AnotherPacketSource> subtitleSource;
1082        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1083            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1084            subtitleSource->clear();
1085
1086            streamMask &= ~STREAMTYPE_SUBTITLES;
1087        }
1088
1089        fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
1090    }
1091
1092    // All fetchers have now been started, the configuration change
1093    // has completed.
1094
1095    scheduleCheckBandwidthEvent();
1096
1097    ALOGV("XXX configuration change completed.");
1098
1099    mReconfigurationInProgress = false;
1100
1101    if (mDisconnectReplyID != 0) {
1102        finishDisconnect();
1103    }
1104}
1105
1106void LiveSession::scheduleCheckBandwidthEvent() {
1107    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1108    msg->setInt32("generation", mCheckBandwidthGeneration);
1109    msg->post(10000000ll);
1110}
1111
1112void LiveSession::cancelCheckBandwidthEvent() {
1113    ++mCheckBandwidthGeneration;
1114}
1115
1116void LiveSession::onCheckBandwidth() {
1117    if (mReconfigurationInProgress) {
1118        scheduleCheckBandwidthEvent();
1119        return;
1120    }
1121
1122    size_t bandwidthIndex = getBandwidthIndex();
1123    if (mPrevBandwidthIndex < 0
1124            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1125        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1126    }
1127
1128    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1129    // schedule another one on return, only an explicit call to
1130    // scheduleCheckBandwidthEvent will do that.
1131    // This ensures that only one configuration change is ongoing at any
1132    // one time, once that completes it'll schedule another check bandwidth
1133    // event.
1134}
1135
1136void LiveSession::postPrepared(status_t err) {
1137    CHECK(mInPreparationPhase);
1138
1139    sp<AMessage> notify = mNotify->dup();
1140    if (err == OK || err == ERROR_END_OF_STREAM) {
1141        notify->setInt32("what", kWhatPrepared);
1142    } else {
1143        notify->setInt32("what", kWhatPreparationFailed);
1144        notify->setInt32("err", err);
1145    }
1146
1147    notify->post();
1148
1149    mInPreparationPhase = false;
1150}
1151
1152}  // namespace android
1153
1154