LiveSession.cpp revision f6d0c1fd6d9e697bb3a891fae14c7e9d4b685de6
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <inttypes.h>
42#include <openssl/aes.h>
43#include <openssl/md5.h>
44
45namespace android {
46
47LiveSession::LiveSession(
48        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
49    : mNotify(notify),
50      mFlags(flags),
51      mUIDValid(uidValid),
52      mUID(uid),
53      mInPreparationPhase(true),
54      mHTTPDataSource(
55              HTTPBase::Create(
56                  (mFlags & kFlagIncognito)
57                    ? HTTPBase::kFlagIncognito
58                    : 0)),
59      mPrevBandwidthIndex(-1),
60      mStreamMask(0),
61      mCheckBandwidthGeneration(0),
62      mLastDequeuedTimeUs(0ll),
63      mRealTimeBaseUs(0ll),
64      mReconfigurationInProgress(false),
65      mDisconnectReplyID(0) {
66    if (mUIDValid) {
67        mHTTPDataSource->setUID(mUID);
68    }
69
70    mPacketSources.add(
71            STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
72
73    mPacketSources.add(
74            STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
75
76    mPacketSources.add(
77            STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
78}
79
80LiveSession::~LiveSession() {
81}
82
83status_t LiveSession::dequeueAccessUnit(
84        StreamType stream, sp<ABuffer> *accessUnit) {
85    if (!(mStreamMask & stream)) {
86        return UNKNOWN_ERROR;
87    }
88
89    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
90
91    status_t finalResult;
92    if (!packetSource->hasBufferAvailable(&finalResult)) {
93        return finalResult == OK ? -EAGAIN : finalResult;
94    }
95
96    status_t err = packetSource->dequeueAccessUnit(accessUnit);
97
98    const char *streamStr;
99    switch (stream) {
100        case STREAMTYPE_AUDIO:
101            streamStr = "audio";
102            break;
103        case STREAMTYPE_VIDEO:
104            streamStr = "video";
105            break;
106        case STREAMTYPE_SUBTITLES:
107            streamStr = "subs";
108            break;
109        default:
110            TRESPASS();
111    }
112
113    if (err == INFO_DISCONTINUITY) {
114        int32_t type;
115        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
116
117        sp<AMessage> extra;
118        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
119            extra.clear();
120        }
121
122        ALOGI("[%s] read discontinuity of type %d, extra = %s",
123              streamStr,
124              type,
125              extra == NULL ? "NULL" : extra->debugString().c_str());
126    } else if (err == OK) {
127        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
128            int64_t timeUs;
129            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
130            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
131
132            mLastDequeuedTimeUs = timeUs;
133            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
134        } else if (stream == STREAMTYPE_SUBTITLES) {
135            (*accessUnit)->meta()->setInt32(
136                    "trackIndex", mPlaylist->getSelectedIndex());
137            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
138        }
139    } else {
140        ALOGI("[%s] encountered error %d", streamStr, err);
141    }
142
143    return err;
144}
145
146status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
147    if (!(mStreamMask & stream)) {
148        return UNKNOWN_ERROR;
149    }
150
151    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
152
153    sp<MetaData> meta = packetSource->getFormat();
154
155    if (meta == NULL) {
156        return -EAGAIN;
157    }
158
159    return convertMetaDataToMessage(meta, format);
160}
161
162void LiveSession::connectAsync(
163        const char *url, const KeyedVector<String8, String8> *headers) {
164    sp<AMessage> msg = new AMessage(kWhatConnect, id());
165    msg->setString("url", url);
166
167    if (headers != NULL) {
168        msg->setPointer(
169                "headers",
170                new KeyedVector<String8, String8>(*headers));
171    }
172
173    msg->post();
174}
175
176status_t LiveSession::disconnect() {
177    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
178
179    sp<AMessage> response;
180    status_t err = msg->postAndAwaitResponse(&response);
181
182    return err;
183}
184
185status_t LiveSession::seekTo(int64_t timeUs) {
186    sp<AMessage> msg = new AMessage(kWhatSeek, id());
187    msg->setInt64("timeUs", timeUs);
188
189    sp<AMessage> response;
190    status_t err = msg->postAndAwaitResponse(&response);
191
192    return err;
193}
194
195void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
196    switch (msg->what()) {
197        case kWhatConnect:
198        {
199            onConnect(msg);
200            break;
201        }
202
203        case kWhatDisconnect:
204        {
205            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
206
207            if (mReconfigurationInProgress) {
208                break;
209            }
210
211            finishDisconnect();
212            break;
213        }
214
215        case kWhatSeek:
216        {
217            uint32_t replyID;
218            CHECK(msg->senderAwaitsResponse(&replyID));
219
220            status_t err = onSeek(msg);
221
222            sp<AMessage> response = new AMessage;
223            response->setInt32("err", err);
224
225            response->postReply(replyID);
226            break;
227        }
228
229        case kWhatFetcherNotify:
230        {
231            int32_t what;
232            CHECK(msg->findInt32("what", &what));
233
234            switch (what) {
235                case PlaylistFetcher::kWhatStarted:
236                    break;
237                case PlaylistFetcher::kWhatPaused:
238                case PlaylistFetcher::kWhatStopped:
239                {
240                    if (what == PlaylistFetcher::kWhatStopped) {
241                        AString uri;
242                        CHECK(msg->findString("uri", &uri));
243                        mFetcherInfos.removeItem(uri);
244                    }
245
246                    if (mContinuation != NULL) {
247                        CHECK_GT(mContinuationCounter, 0);
248                        if (--mContinuationCounter == 0) {
249                            mContinuation->post();
250                        }
251                    }
252                    break;
253                }
254
255                case PlaylistFetcher::kWhatDurationUpdate:
256                {
257                    AString uri;
258                    CHECK(msg->findString("uri", &uri));
259
260                    int64_t durationUs;
261                    CHECK(msg->findInt64("durationUs", &durationUs));
262
263                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
264                    info->mDurationUs = durationUs;
265                    break;
266                }
267
268                case PlaylistFetcher::kWhatError:
269                {
270                    status_t err;
271                    CHECK(msg->findInt32("err", &err));
272
273                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
274
275                    if (mInPreparationPhase) {
276                        postPrepared(err);
277                    }
278
279                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
280
281                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
282
283                    mPacketSources.valueFor(
284                            STREAMTYPE_SUBTITLES)->signalEOS(err);
285
286                    sp<AMessage> notify = mNotify->dup();
287                    notify->setInt32("what", kWhatError);
288                    notify->setInt32("err", err);
289                    notify->post();
290                    break;
291                }
292
293                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
294                {
295                    AString uri;
296                    CHECK(msg->findString("uri", &uri));
297
298                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
299                    info->mIsPrepared = true;
300
301                    if (mInPreparationPhase) {
302                        bool allFetchersPrepared = true;
303                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
304                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
305                                allFetchersPrepared = false;
306                                break;
307                            }
308                        }
309
310                        if (allFetchersPrepared) {
311                            postPrepared(OK);
312                        }
313                    }
314                    break;
315                }
316
317                default:
318                    TRESPASS();
319            }
320
321            break;
322        }
323
324        case kWhatCheckBandwidth:
325        {
326            int32_t generation;
327            CHECK(msg->findInt32("generation", &generation));
328
329            if (generation != mCheckBandwidthGeneration) {
330                break;
331            }
332
333            onCheckBandwidth();
334            break;
335        }
336
337        case kWhatChangeConfiguration:
338        {
339            onChangeConfiguration(msg);
340            break;
341        }
342
343        case kWhatChangeConfiguration2:
344        {
345            onChangeConfiguration2(msg);
346            break;
347        }
348
349        case kWhatChangeConfiguration3:
350        {
351            onChangeConfiguration3(msg);
352            break;
353        }
354
355        case kWhatFinishDisconnect2:
356        {
357            onFinishDisconnect2();
358            break;
359        }
360
361        default:
362            TRESPASS();
363            break;
364    }
365}
366
367// static
368int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
369    if (a->mBandwidth < b->mBandwidth) {
370        return -1;
371    } else if (a->mBandwidth == b->mBandwidth) {
372        return 0;
373    }
374
375    return 1;
376}
377
378void LiveSession::onConnect(const sp<AMessage> &msg) {
379    AString url;
380    CHECK(msg->findString("url", &url));
381
382    KeyedVector<String8, String8> *headers = NULL;
383    if (!msg->findPointer("headers", (void **)&headers)) {
384        mExtraHeaders.clear();
385    } else {
386        mExtraHeaders = *headers;
387
388        delete headers;
389        headers = NULL;
390    }
391
392#if 1
393    ALOGI("onConnect <URL suppressed>");
394#else
395    ALOGI("onConnect %s", url.c_str());
396#endif
397
398    mMasterURL = url;
399
400    bool dummy;
401    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
402
403    if (mPlaylist == NULL) {
404        ALOGE("unable to fetch master playlist <URL suppressed>.");
405
406        postPrepared(ERROR_IO);
407        return;
408    }
409
410    // We trust the content provider to make a reasonable choice of preferred
411    // initial bandwidth by listing it first in the variant playlist.
412    // At startup we really don't have a good estimate on the available
413    // network bandwidth since we haven't tranferred any data yet. Once
414    // we have we can make a better informed choice.
415    size_t initialBandwidth = 0;
416    size_t initialBandwidthIndex = 0;
417
418    if (mPlaylist->isVariantPlaylist()) {
419        for (size_t i = 0; i < mPlaylist->size(); ++i) {
420            BandwidthItem item;
421
422            item.mPlaylistIndex = i;
423
424            sp<AMessage> meta;
425            AString uri;
426            mPlaylist->itemAt(i, &uri, &meta);
427
428            unsigned long bandwidth;
429            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
430
431            if (initialBandwidth == 0) {
432                initialBandwidth = item.mBandwidth;
433            }
434
435            mBandwidthItems.push(item);
436        }
437
438        CHECK_GT(mBandwidthItems.size(), 0u);
439
440        mBandwidthItems.sort(SortByBandwidth);
441
442        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
443            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
444                initialBandwidthIndex = i;
445                break;
446            }
447        }
448    } else {
449        // dummy item.
450        BandwidthItem item;
451        item.mPlaylistIndex = 0;
452        item.mBandwidth = 0;
453        mBandwidthItems.push(item);
454    }
455
456    changeConfiguration(
457            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
458}
459
460void LiveSession::finishDisconnect() {
461    // No reconfiguration is currently pending, make sure none will trigger
462    // during disconnection either.
463    cancelCheckBandwidthEvent();
464
465    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
466        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
467    }
468
469    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
470
471    mContinuationCounter = mFetcherInfos.size();
472    mContinuation = msg;
473
474    if (mContinuationCounter == 0) {
475        msg->post();
476    }
477}
478
479void LiveSession::onFinishDisconnect2() {
480    mContinuation.clear();
481
482    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
483    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
484
485    mPacketSources.valueFor(
486            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
487
488    sp<AMessage> response = new AMessage;
489    response->setInt32("err", OK);
490
491    response->postReply(mDisconnectReplyID);
492    mDisconnectReplyID = 0;
493}
494
495sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
496    ssize_t index = mFetcherInfos.indexOfKey(uri);
497
498    if (index >= 0) {
499        return NULL;
500    }
501
502    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
503    notify->setString("uri", uri);
504
505    FetcherInfo info;
506    info.mFetcher = new PlaylistFetcher(notify, this, uri);
507    info.mDurationUs = -1ll;
508    info.mIsPrepared = false;
509    looper()->registerHandler(info.mFetcher);
510
511    mFetcherInfos.add(uri, info);
512
513    return info.mFetcher;
514}
515
516status_t LiveSession::fetchFile(
517        const char *url, sp<ABuffer> *out,
518        int64_t range_offset, int64_t range_length,
519        String8 *actualUrl) {
520    *out = NULL;
521
522    sp<DataSource> source;
523
524    if (!strncasecmp(url, "file://", 7)) {
525        source = new FileSource(url + 7);
526    } else if (strncasecmp(url, "http://", 7)
527            && strncasecmp(url, "https://", 8)) {
528        return ERROR_UNSUPPORTED;
529    } else {
530        KeyedVector<String8, String8> headers = mExtraHeaders;
531        if (range_offset > 0 || range_length >= 0) {
532            headers.add(
533                    String8("Range"),
534                    String8(
535                        StringPrintf(
536                            "bytes=%lld-%s",
537                            range_offset,
538                            range_length < 0
539                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
540        }
541        status_t err = mHTTPDataSource->connect(url, &headers);
542
543        if (err != OK) {
544            return err;
545        }
546
547        source = mHTTPDataSource;
548    }
549
550    off64_t size;
551    status_t err = source->getSize(&size);
552
553    if (err != OK) {
554        size = 65536;
555    }
556
557    sp<ABuffer> buffer = new ABuffer(size);
558    buffer->setRange(0, 0);
559
560    for (;;) {
561        size_t bufferRemaining = buffer->capacity() - buffer->size();
562
563        if (bufferRemaining == 0) {
564            bufferRemaining = 32768;
565
566            ALOGV("increasing download buffer to %zu bytes",
567                 buffer->size() + bufferRemaining);
568
569            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
570            memcpy(copy->data(), buffer->data(), buffer->size());
571            copy->setRange(0, buffer->size());
572
573            buffer = copy;
574        }
575
576        size_t maxBytesToRead = bufferRemaining;
577        if (range_length >= 0) {
578            int64_t bytesLeftInRange = range_length - buffer->size();
579            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
580                maxBytesToRead = bytesLeftInRange;
581
582                if (bytesLeftInRange == 0) {
583                    break;
584                }
585            }
586        }
587
588        ssize_t n = source->readAt(
589                buffer->size(), buffer->data() + buffer->size(),
590                maxBytesToRead);
591
592        if (n < 0) {
593            return n;
594        }
595
596        if (n == 0) {
597            break;
598        }
599
600        buffer->setRange(0, buffer->size() + (size_t)n);
601    }
602
603    *out = buffer;
604    if (actualUrl != NULL) {
605        *actualUrl = source->getUri();
606        if (actualUrl->isEmpty()) {
607            *actualUrl = url;
608        }
609    }
610
611    return OK;
612}
613
614sp<M3UParser> LiveSession::fetchPlaylist(
615        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
616    ALOGV("fetchPlaylist '%s'", url);
617
618    *unchanged = false;
619
620    sp<ABuffer> buffer;
621    String8 actualUrl;
622    status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl);
623
624    if (err != OK) {
625        return NULL;
626    }
627
628    // MD5 functionality is not available on the simulator, treat all
629    // playlists as changed.
630
631#if defined(HAVE_ANDROID_OS)
632    uint8_t hash[16];
633
634    MD5_CTX m;
635    MD5_Init(&m);
636    MD5_Update(&m, buffer->data(), buffer->size());
637
638    MD5_Final(hash, &m);
639
640    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
641        // playlist unchanged
642        *unchanged = true;
643
644        return NULL;
645    }
646
647    if (curPlaylistHash != NULL) {
648        memcpy(curPlaylistHash, hash, sizeof(hash));
649    }
650#endif
651
652    sp<M3UParser> playlist =
653        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
654
655    if (playlist->initCheck() != OK) {
656        ALOGE("failed to parse .m3u8 playlist");
657
658        return NULL;
659    }
660
661    return playlist;
662}
663
664static double uniformRand() {
665    return (double)rand() / RAND_MAX;
666}
667
668size_t LiveSession::getBandwidthIndex() {
669    if (mBandwidthItems.size() == 0) {
670        return 0;
671    }
672
673#if 1
674    char value[PROPERTY_VALUE_MAX];
675    ssize_t index = -1;
676    if (property_get("media.httplive.bw-index", value, NULL)) {
677        char *end;
678        index = strtol(value, &end, 10);
679        CHECK(end > value && *end == '\0');
680
681        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
682            index = mBandwidthItems.size() - 1;
683        }
684    }
685
686    if (index < 0) {
687        int32_t bandwidthBps;
688        if (mHTTPDataSource != NULL
689                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
690            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
691        } else {
692            ALOGV("no bandwidth estimate.");
693            return 0;  // Pick the lowest bandwidth stream by default.
694        }
695
696        char value[PROPERTY_VALUE_MAX];
697        if (property_get("media.httplive.max-bw", value, NULL)) {
698            char *end;
699            long maxBw = strtoul(value, &end, 10);
700            if (end > value && *end == '\0') {
701                if (maxBw > 0 && bandwidthBps > maxBw) {
702                    ALOGV("bandwidth capped to %ld bps", maxBw);
703                    bandwidthBps = maxBw;
704                }
705            }
706        }
707
708        // Consider only 80% of the available bandwidth usable.
709        bandwidthBps = (bandwidthBps * 8) / 10;
710
711        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
712
713        index = mBandwidthItems.size() - 1;
714        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
715                                > (size_t)bandwidthBps) {
716            --index;
717        }
718    }
719#elif 0
720    // Change bandwidth at random()
721    size_t index = uniformRand() * mBandwidthItems.size();
722#elif 0
723    // There's a 50% chance to stay on the current bandwidth and
724    // a 50% chance to switch to the next higher bandwidth (wrapping around
725    // to lowest)
726    const size_t kMinIndex = 0;
727
728    static ssize_t mPrevBandwidthIndex = -1;
729
730    size_t index;
731    if (mPrevBandwidthIndex < 0) {
732        index = kMinIndex;
733    } else if (uniformRand() < 0.5) {
734        index = (size_t)mPrevBandwidthIndex;
735    } else {
736        index = mPrevBandwidthIndex + 1;
737        if (index == mBandwidthItems.size()) {
738            index = kMinIndex;
739        }
740    }
741    mPrevBandwidthIndex = index;
742#elif 0
743    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
744
745    size_t index = mBandwidthItems.size() - 1;
746    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
747        --index;
748    }
749#elif 1
750    char value[PROPERTY_VALUE_MAX];
751    size_t index;
752    if (property_get("media.httplive.bw-index", value, NULL)) {
753        char *end;
754        index = strtoul(value, &end, 10);
755        CHECK(end > value && *end == '\0');
756
757        if (index >= mBandwidthItems.size()) {
758            index = mBandwidthItems.size() - 1;
759        }
760    } else {
761        index = 0;
762    }
763#else
764    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
765#endif
766
767    CHECK_GE(index, 0);
768
769    return index;
770}
771
772status_t LiveSession::onSeek(const sp<AMessage> &msg) {
773    int64_t timeUs;
774    CHECK(msg->findInt64("timeUs", &timeUs));
775
776    if (!mReconfigurationInProgress) {
777        changeConfiguration(timeUs, getBandwidthIndex());
778    }
779
780    return OK;
781}
782
783status_t LiveSession::getDuration(int64_t *durationUs) const {
784    int64_t maxDurationUs = 0ll;
785    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
786        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
787
788        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
789            maxDurationUs = fetcherDurationUs;
790        }
791    }
792
793    *durationUs = maxDurationUs;
794
795    return OK;
796}
797
798bool LiveSession::isSeekable() const {
799    int64_t durationUs;
800    return getDuration(&durationUs) == OK && durationUs >= 0;
801}
802
803bool LiveSession::hasDynamicDuration() const {
804    return false;
805}
806
807status_t LiveSession::getTrackInfo(Parcel *reply) const {
808    return mPlaylist->getTrackInfo(reply);
809}
810
811status_t LiveSession::selectTrack(size_t index, bool select) {
812    status_t err = mPlaylist->selectTrack(index, select);
813    if (err == OK) {
814        (new AMessage(kWhatChangeConfiguration, id()))->post();
815    }
816    return err;
817}
818
819void LiveSession::changeConfiguration(
820        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
821    CHECK(!mReconfigurationInProgress);
822    mReconfigurationInProgress = true;
823
824    mPrevBandwidthIndex = bandwidthIndex;
825
826    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
827          timeUs, bandwidthIndex, pickTrack);
828
829    if (pickTrack) {
830        mPlaylist->pickRandomMediaItems();
831    }
832
833    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
834    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
835
836    uint32_t streamMask = 0;
837
838    AString audioURI;
839    if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
840        streamMask |= STREAMTYPE_AUDIO;
841    }
842
843    AString videoURI;
844    if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
845        streamMask |= STREAMTYPE_VIDEO;
846    }
847
848    AString subtitleURI;
849    if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
850        streamMask |= STREAMTYPE_SUBTITLES;
851    }
852
853    // Step 1, stop and discard fetchers that are no longer needed.
854    // Pause those that we'll reuse.
855    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
856        const AString &uri = mFetcherInfos.keyAt(i);
857
858        bool discardFetcher = true;
859
860        // If we're seeking all current fetchers are discarded.
861        if (timeUs < 0ll) {
862            if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
863                    || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
864                    || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
865                discardFetcher = false;
866            }
867        }
868
869        if (discardFetcher) {
870            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
871        } else {
872            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
873        }
874    }
875
876    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
877    msg->setInt32("streamMask", streamMask);
878    msg->setInt64("timeUs", timeUs);
879    if (streamMask & STREAMTYPE_AUDIO) {
880        msg->setString("audioURI", audioURI.c_str());
881    }
882    if (streamMask & STREAMTYPE_VIDEO) {
883        msg->setString("videoURI", videoURI.c_str());
884    }
885    if (streamMask & STREAMTYPE_SUBTITLES) {
886        msg->setString("subtitleURI", subtitleURI.c_str());
887    }
888
889    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
890    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
891    // fetchers have completed their asynchronous operation, we'll post
892    // mContinuation, which then is handled below in onChangeConfiguration2.
893    mContinuationCounter = mFetcherInfos.size();
894    mContinuation = msg;
895
896    if (mContinuationCounter == 0) {
897        msg->post();
898    }
899}
900
901void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
902    if (!mReconfigurationInProgress) {
903        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
904    } else {
905        msg->post(1000000ll); // retry in 1 sec
906    }
907}
908
909void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
910    mContinuation.clear();
911
912    // All fetchers are either suspended or have been removed now.
913
914    uint32_t streamMask;
915    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
916
917    AString audioURI, videoURI, subtitleURI;
918    if (streamMask & STREAMTYPE_AUDIO) {
919        CHECK(msg->findString("audioURI", &audioURI));
920        ALOGV("audioURI = '%s'", audioURI.c_str());
921    }
922    if (streamMask & STREAMTYPE_VIDEO) {
923        CHECK(msg->findString("videoURI", &videoURI));
924        ALOGV("videoURI = '%s'", videoURI.c_str());
925    }
926    if (streamMask & STREAMTYPE_SUBTITLES) {
927        CHECK(msg->findString("subtitleURI", &subtitleURI));
928        ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
929    }
930
931    // Determine which decoders to shutdown on the player side,
932    // a decoder has to be shutdown if either
933    // 1) its streamtype was active before but now longer isn't.
934    // or
935    // 2) its streamtype was already active and still is but the URI
936    //    has changed.
937    uint32_t changedMask = 0;
938    if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
939                && !(audioURI == mAudioURI))
940        || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
941        changedMask |= STREAMTYPE_AUDIO;
942    }
943    if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
944                && !(videoURI == mVideoURI))
945        || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
946        changedMask |= STREAMTYPE_VIDEO;
947    }
948
949    if (changedMask == 0) {
950        // If nothing changed as far as the audio/video decoders
951        // are concerned we can proceed.
952        onChangeConfiguration3(msg);
953        return;
954    }
955
956    // Something changed, inform the player which will shutdown the
957    // corresponding decoders and will post the reply once that's done.
958    // Handling the reply will continue executing below in
959    // onChangeConfiguration3.
960    sp<AMessage> notify = mNotify->dup();
961    notify->setInt32("what", kWhatStreamsChanged);
962    notify->setInt32("changedMask", changedMask);
963
964    msg->setWhat(kWhatChangeConfiguration3);
965    msg->setTarget(id());
966
967    notify->setMessage("reply", msg);
968    notify->post();
969}
970
971void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
972    // All remaining fetchers are still suspended, the player has shutdown
973    // any decoders that needed it.
974
975    uint32_t streamMask;
976    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
977
978    AString audioURI, videoURI, subtitleURI;
979    if (streamMask & STREAMTYPE_AUDIO) {
980        CHECK(msg->findString("audioURI", &audioURI));
981    }
982    if (streamMask & STREAMTYPE_VIDEO) {
983        CHECK(msg->findString("videoURI", &videoURI));
984    }
985    if (streamMask & STREAMTYPE_SUBTITLES) {
986        CHECK(msg->findString("subtitleURI", &subtitleURI));
987    }
988
989    int64_t timeUs;
990    CHECK(msg->findInt64("timeUs", &timeUs));
991
992    if (timeUs < 0ll) {
993        timeUs = mLastDequeuedTimeUs;
994    }
995    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
996
997    mStreamMask = streamMask;
998    mAudioURI = audioURI;
999    mVideoURI = videoURI;
1000    mSubtitleURI = subtitleURI;
1001
1002    // Resume all existing fetchers and assign them packet sources.
1003    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1004        const AString &uri = mFetcherInfos.keyAt(i);
1005
1006        uint32_t resumeMask = 0;
1007
1008        sp<AnotherPacketSource> audioSource;
1009        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1010            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1011            resumeMask |= STREAMTYPE_AUDIO;
1012        }
1013
1014        sp<AnotherPacketSource> videoSource;
1015        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1016            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1017            resumeMask |= STREAMTYPE_VIDEO;
1018        }
1019
1020        sp<AnotherPacketSource> subtitleSource;
1021        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1022            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1023            resumeMask |= STREAMTYPE_SUBTITLES;
1024        }
1025
1026        CHECK_NE(resumeMask, 0u);
1027
1028        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1029
1030        streamMask &= ~resumeMask;
1031
1032        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1033                audioSource, videoSource, subtitleSource);
1034    }
1035
1036    // streamMask now only contains the types that need a new fetcher created.
1037
1038    if (streamMask != 0) {
1039        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1040    }
1041
1042    while (streamMask != 0) {
1043        StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
1044
1045        AString uri;
1046        switch (streamType) {
1047            case STREAMTYPE_AUDIO:
1048                uri = audioURI;
1049                break;
1050            case STREAMTYPE_VIDEO:
1051                uri = videoURI;
1052                break;
1053            case STREAMTYPE_SUBTITLES:
1054                uri = subtitleURI;
1055                break;
1056            default:
1057                TRESPASS();
1058        }
1059
1060        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1061        CHECK(fetcher != NULL);
1062
1063        sp<AnotherPacketSource> audioSource;
1064        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1065            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1066            audioSource->clear();
1067
1068            streamMask &= ~STREAMTYPE_AUDIO;
1069        }
1070
1071        sp<AnotherPacketSource> videoSource;
1072        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1073            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1074            videoSource->clear();
1075
1076            streamMask &= ~STREAMTYPE_VIDEO;
1077        }
1078
1079        sp<AnotherPacketSource> subtitleSource;
1080        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1081            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1082            subtitleSource->clear();
1083
1084            streamMask &= ~STREAMTYPE_SUBTITLES;
1085        }
1086
1087        fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
1088    }
1089
1090    // All fetchers have now been started, the configuration change
1091    // has completed.
1092
1093    scheduleCheckBandwidthEvent();
1094
1095    ALOGV("XXX configuration change completed.");
1096
1097    mReconfigurationInProgress = false;
1098
1099    if (mDisconnectReplyID != 0) {
1100        finishDisconnect();
1101    }
1102}
1103
1104void LiveSession::scheduleCheckBandwidthEvent() {
1105    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1106    msg->setInt32("generation", mCheckBandwidthGeneration);
1107    msg->post(10000000ll);
1108}
1109
1110void LiveSession::cancelCheckBandwidthEvent() {
1111    ++mCheckBandwidthGeneration;
1112}
1113
1114void LiveSession::onCheckBandwidth() {
1115    if (mReconfigurationInProgress) {
1116        scheduleCheckBandwidthEvent();
1117        return;
1118    }
1119
1120    size_t bandwidthIndex = getBandwidthIndex();
1121    if (mPrevBandwidthIndex < 0
1122            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1123        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1124    }
1125
1126    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1127    // schedule another one on return, only an explicit call to
1128    // scheduleCheckBandwidthEvent will do that.
1129    // This ensures that only one configuration change is ongoing at any
1130    // one time, once that completes it'll schedule another check bandwidth
1131    // event.
1132}
1133
1134void LiveSession::postPrepared(status_t err) {
1135    CHECK(mInPreparationPhase);
1136
1137    sp<AMessage> notify = mNotify->dup();
1138    if (err == OK || err == ERROR_END_OF_STREAM) {
1139        notify->setInt32("what", kWhatPrepared);
1140    } else {
1141        notify->setInt32("what", kWhatPreparationFailed);
1142        notify->setInt32("err", err);
1143    }
1144
1145    notify->post();
1146
1147    mInPreparationPhase = false;
1148}
1149
1150}  // namespace android
1151
1152