LiveSession.cpp revision a8b8488f703bb6bda039d7d98f87e4f9d845664d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <openssl/aes.h>
42#include <openssl/md5.h>
43
44namespace android {
45
46LiveSession::LiveSession(
47        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
48    : mNotify(notify),
49      mFlags(flags),
50      mUIDValid(uidValid),
51      mUID(uid),
52      mInPreparationPhase(true),
53      mHTTPDataSource(
54              HTTPBase::Create(
55                  (mFlags & kFlagIncognito)
56                    ? HTTPBase::kFlagIncognito
57                    : 0)),
58      mPrevBandwidthIndex(-1),
59      mStreamMask(0),
60      mCheckBandwidthGeneration(0),
61      mLastDequeuedTimeUs(0ll),
62      mRealTimeBaseUs(0ll),
63      mReconfigurationInProgress(false),
64      mDisconnectReplyID(0) {
65    if (mUIDValid) {
66        mHTTPDataSource->setUID(mUID);
67    }
68
69    mPacketSources.add(
70            STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
71
72    mPacketSources.add(
73            STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
74
75    mPacketSources.add(
76            STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
77}
78
79LiveSession::~LiveSession() {
80}
81
82status_t LiveSession::dequeueAccessUnit(
83        StreamType stream, sp<ABuffer> *accessUnit) {
84    if (!(mStreamMask & stream)) {
85        return UNKNOWN_ERROR;
86    }
87
88    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
89
90    status_t finalResult;
91    if (!packetSource->hasBufferAvailable(&finalResult)) {
92        return finalResult == OK ? -EAGAIN : finalResult;
93    }
94
95    status_t err = packetSource->dequeueAccessUnit(accessUnit);
96
97    const char *streamStr;
98    switch (stream) {
99        case STREAMTYPE_AUDIO:
100            streamStr = "audio";
101            break;
102        case STREAMTYPE_VIDEO:
103            streamStr = "video";
104            break;
105        case STREAMTYPE_SUBTITLES:
106            streamStr = "subs";
107            break;
108        default:
109            TRESPASS();
110    }
111
112    if (err == INFO_DISCONTINUITY) {
113        int32_t type;
114        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
115
116        sp<AMessage> extra;
117        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
118            extra.clear();
119        }
120
121        ALOGI("[%s] read discontinuity of type %d, extra = %s",
122              streamStr,
123              type,
124              extra == NULL ? "NULL" : extra->debugString().c_str());
125    } else if (err == OK) {
126        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
127            int64_t timeUs;
128            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
129            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
130
131            mLastDequeuedTimeUs = timeUs;
132            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
133        } else if (stream == STREAMTYPE_SUBTITLES) {
134            (*accessUnit)->meta()->setInt32(
135                    "trackIndex", mPlaylist->getSelectedIndex());
136            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
137        }
138    } else {
139        ALOGI("[%s] encountered error %d", streamStr, err);
140    }
141
142    return err;
143}
144
145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
146    if (!(mStreamMask & stream)) {
147        return UNKNOWN_ERROR;
148    }
149
150    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
151
152    sp<MetaData> meta = packetSource->getFormat();
153
154    if (meta == NULL) {
155        return -EAGAIN;
156    }
157
158    return convertMetaDataToMessage(meta, format);
159}
160
161void LiveSession::connectAsync(
162        const char *url, const KeyedVector<String8, String8> *headers) {
163    sp<AMessage> msg = new AMessage(kWhatConnect, id());
164    msg->setString("url", url);
165
166    if (headers != NULL) {
167        msg->setPointer(
168                "headers",
169                new KeyedVector<String8, String8>(*headers));
170    }
171
172    msg->post();
173}
174
175status_t LiveSession::disconnect() {
176    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
177
178    sp<AMessage> response;
179    status_t err = msg->postAndAwaitResponse(&response);
180
181    return err;
182}
183
184status_t LiveSession::seekTo(int64_t timeUs) {
185    sp<AMessage> msg = new AMessage(kWhatSeek, id());
186    msg->setInt64("timeUs", timeUs);
187
188    sp<AMessage> response;
189    status_t err = msg->postAndAwaitResponse(&response);
190
191    return err;
192}
193
194void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
195    switch (msg->what()) {
196        case kWhatConnect:
197        {
198            onConnect(msg);
199            break;
200        }
201
202        case kWhatDisconnect:
203        {
204            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
205
206            if (mReconfigurationInProgress) {
207                break;
208            }
209
210            finishDisconnect();
211            break;
212        }
213
214        case kWhatSeek:
215        {
216            uint32_t replyID;
217            CHECK(msg->senderAwaitsResponse(&replyID));
218
219            status_t err = onSeek(msg);
220
221            sp<AMessage> response = new AMessage;
222            response->setInt32("err", err);
223
224            response->postReply(replyID);
225            break;
226        }
227
228        case kWhatFetcherNotify:
229        {
230            int32_t what;
231            CHECK(msg->findInt32("what", &what));
232
233            switch (what) {
234                case PlaylistFetcher::kWhatStarted:
235                    break;
236                case PlaylistFetcher::kWhatPaused:
237                case PlaylistFetcher::kWhatStopped:
238                {
239                    if (what == PlaylistFetcher::kWhatStopped) {
240                        AString uri;
241                        CHECK(msg->findString("uri", &uri));
242                        mFetcherInfos.removeItem(uri);
243                    }
244
245                    if (mContinuation != NULL) {
246                        CHECK_GT(mContinuationCounter, 0);
247                        if (--mContinuationCounter == 0) {
248                            mContinuation->post();
249                        }
250                    }
251                    break;
252                }
253
254                case PlaylistFetcher::kWhatDurationUpdate:
255                {
256                    AString uri;
257                    CHECK(msg->findString("uri", &uri));
258
259                    int64_t durationUs;
260                    CHECK(msg->findInt64("durationUs", &durationUs));
261
262                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
263                    info->mDurationUs = durationUs;
264                    break;
265                }
266
267                case PlaylistFetcher::kWhatError:
268                {
269                    status_t err;
270                    CHECK(msg->findInt32("err", &err));
271
272                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
273
274                    if (mInPreparationPhase) {
275                        postPrepared(err);
276                    }
277
278                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
279
280                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
281
282                    mPacketSources.valueFor(
283                            STREAMTYPE_SUBTITLES)->signalEOS(err);
284
285                    sp<AMessage> notify = mNotify->dup();
286                    notify->setInt32("what", kWhatError);
287                    notify->setInt32("err", err);
288                    notify->post();
289                    break;
290                }
291
292                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
293                {
294                    AString uri;
295                    CHECK(msg->findString("uri", &uri));
296
297                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
298                    info->mIsPrepared = true;
299
300                    if (mInPreparationPhase) {
301                        bool allFetchersPrepared = true;
302                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
303                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
304                                allFetchersPrepared = false;
305                                break;
306                            }
307                        }
308
309                        if (allFetchersPrepared) {
310                            postPrepared(OK);
311                        }
312                    }
313                    break;
314                }
315
316                default:
317                    TRESPASS();
318            }
319
320            break;
321        }
322
323        case kWhatCheckBandwidth:
324        {
325            int32_t generation;
326            CHECK(msg->findInt32("generation", &generation));
327
328            if (generation != mCheckBandwidthGeneration) {
329                break;
330            }
331
332            onCheckBandwidth();
333            break;
334        }
335
336        case kWhatChangeConfiguration:
337        {
338            onChangeConfiguration(msg);
339            break;
340        }
341
342        case kWhatChangeConfiguration2:
343        {
344            onChangeConfiguration2(msg);
345            break;
346        }
347
348        case kWhatChangeConfiguration3:
349        {
350            onChangeConfiguration3(msg);
351            break;
352        }
353
354        case kWhatFinishDisconnect2:
355        {
356            onFinishDisconnect2();
357            break;
358        }
359
360        default:
361            TRESPASS();
362            break;
363    }
364}
365
366// static
367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
368    if (a->mBandwidth < b->mBandwidth) {
369        return -1;
370    } else if (a->mBandwidth == b->mBandwidth) {
371        return 0;
372    }
373
374    return 1;
375}
376
377void LiveSession::onConnect(const sp<AMessage> &msg) {
378    AString url;
379    CHECK(msg->findString("url", &url));
380
381    KeyedVector<String8, String8> *headers = NULL;
382    if (!msg->findPointer("headers", (void **)&headers)) {
383        mExtraHeaders.clear();
384    } else {
385        mExtraHeaders = *headers;
386
387        delete headers;
388        headers = NULL;
389    }
390
391#if 1
392    ALOGI("onConnect <URL suppressed>");
393#else
394    ALOGI("onConnect %s", url.c_str());
395#endif
396
397    mMasterURL = url;
398
399    bool dummy;
400    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
401
402    if (mPlaylist == NULL) {
403        ALOGE("unable to fetch master playlist <URL suppressed>.");
404
405        postPrepared(ERROR_IO);
406        return;
407    }
408
409    // We trust the content provider to make a reasonable choice of preferred
410    // initial bandwidth by listing it first in the variant playlist.
411    // At startup we really don't have a good estimate on the available
412    // network bandwidth since we haven't tranferred any data yet. Once
413    // we have we can make a better informed choice.
414    size_t initialBandwidth = 0;
415    size_t initialBandwidthIndex = 0;
416
417    if (mPlaylist->isVariantPlaylist()) {
418        for (size_t i = 0; i < mPlaylist->size(); ++i) {
419            BandwidthItem item;
420
421            item.mPlaylistIndex = i;
422
423            sp<AMessage> meta;
424            AString uri;
425            mPlaylist->itemAt(i, &uri, &meta);
426
427            unsigned long bandwidth;
428            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
429
430            if (initialBandwidth == 0) {
431                initialBandwidth = item.mBandwidth;
432            }
433
434            mBandwidthItems.push(item);
435        }
436
437        CHECK_GT(mBandwidthItems.size(), 0u);
438
439        mBandwidthItems.sort(SortByBandwidth);
440
441        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
442            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
443                initialBandwidthIndex = i;
444                break;
445            }
446        }
447    } else {
448        // dummy item.
449        BandwidthItem item;
450        item.mPlaylistIndex = 0;
451        item.mBandwidth = 0;
452        mBandwidthItems.push(item);
453    }
454
455    changeConfiguration(
456            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
457}
458
459void LiveSession::finishDisconnect() {
460    // No reconfiguration is currently pending, make sure none will trigger
461    // during disconnection either.
462    cancelCheckBandwidthEvent();
463
464    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
465        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
466    }
467
468    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
469
470    mContinuationCounter = mFetcherInfos.size();
471    mContinuation = msg;
472
473    if (mContinuationCounter == 0) {
474        msg->post();
475    }
476}
477
478void LiveSession::onFinishDisconnect2() {
479    mContinuation.clear();
480
481    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
482    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
483
484    mPacketSources.valueFor(
485            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
486
487    sp<AMessage> response = new AMessage;
488    response->setInt32("err", OK);
489
490    response->postReply(mDisconnectReplyID);
491    mDisconnectReplyID = 0;
492}
493
494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
495    ssize_t index = mFetcherInfos.indexOfKey(uri);
496
497    if (index >= 0) {
498        return NULL;
499    }
500
501    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
502    notify->setString("uri", uri);
503
504    FetcherInfo info;
505    info.mFetcher = new PlaylistFetcher(notify, this, uri);
506    info.mDurationUs = -1ll;
507    info.mIsPrepared = false;
508    looper()->registerHandler(info.mFetcher);
509
510    mFetcherInfos.add(uri, info);
511
512    return info.mFetcher;
513}
514
515status_t LiveSession::fetchFile(
516        const char *url, sp<ABuffer> *out,
517        int64_t range_offset, int64_t range_length,
518        String8 *actualUrl) {
519    *out = NULL;
520
521    sp<DataSource> source;
522
523    if (!strncasecmp(url, "file://", 7)) {
524        source = new FileSource(url + 7);
525    } else if (strncasecmp(url, "http://", 7)
526            && strncasecmp(url, "https://", 8)) {
527        return ERROR_UNSUPPORTED;
528    } else {
529        KeyedVector<String8, String8> headers = mExtraHeaders;
530        if (range_offset > 0 || range_length >= 0) {
531            headers.add(
532                    String8("Range"),
533                    String8(
534                        StringPrintf(
535                            "bytes=%lld-%s",
536                            range_offset,
537                            range_length < 0
538                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
539        }
540        status_t err = mHTTPDataSource->connect(url, &headers);
541
542        if (err != OK) {
543            return err;
544        }
545
546        source = mHTTPDataSource;
547    }
548
549    off64_t size;
550    status_t err = source->getSize(&size);
551
552    if (err != OK) {
553        size = 65536;
554    }
555
556    sp<ABuffer> buffer = new ABuffer(size);
557    buffer->setRange(0, 0);
558
559    for (;;) {
560        size_t bufferRemaining = buffer->capacity() - buffer->size();
561
562        if (bufferRemaining == 0) {
563            bufferRemaining = 32768;
564
565            ALOGV("increasing download buffer to %d bytes",
566                 buffer->size() + bufferRemaining);
567
568            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
569            memcpy(copy->data(), buffer->data(), buffer->size());
570            copy->setRange(0, buffer->size());
571
572            buffer = copy;
573        }
574
575        size_t maxBytesToRead = bufferRemaining;
576        if (range_length >= 0) {
577            int64_t bytesLeftInRange = range_length - buffer->size();
578            if (bytesLeftInRange < maxBytesToRead) {
579                maxBytesToRead = bytesLeftInRange;
580
581                if (bytesLeftInRange == 0) {
582                    break;
583                }
584            }
585        }
586
587        ssize_t n = source->readAt(
588                buffer->size(), buffer->data() + buffer->size(),
589                maxBytesToRead);
590
591        if (n < 0) {
592            return n;
593        }
594
595        if (n == 0) {
596            break;
597        }
598
599        buffer->setRange(0, buffer->size() + (size_t)n);
600    }
601
602    *out = buffer;
603    if (actualUrl != NULL) {
604        *actualUrl = source->getUri();
605        if (actualUrl->isEmpty()) {
606            *actualUrl = url;
607        }
608    }
609
610    return OK;
611}
612
613sp<M3UParser> LiveSession::fetchPlaylist(
614        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
615    ALOGV("fetchPlaylist '%s'", url);
616
617    *unchanged = false;
618
619    sp<ABuffer> buffer;
620    String8 actualUrl;
621    status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl);
622
623    if (err != OK) {
624        return NULL;
625    }
626
627    // MD5 functionality is not available on the simulator, treat all
628    // playlists as changed.
629
630#if defined(HAVE_ANDROID_OS)
631    uint8_t hash[16];
632
633    MD5_CTX m;
634    MD5_Init(&m);
635    MD5_Update(&m, buffer->data(), buffer->size());
636
637    MD5_Final(hash, &m);
638
639    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
640        // playlist unchanged
641        *unchanged = true;
642
643        return NULL;
644    }
645
646    if (curPlaylistHash != NULL) {
647        memcpy(curPlaylistHash, hash, sizeof(hash));
648    }
649#endif
650
651    sp<M3UParser> playlist =
652        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
653
654    if (playlist->initCheck() != OK) {
655        ALOGE("failed to parse .m3u8 playlist");
656
657        return NULL;
658    }
659
660    return playlist;
661}
662
663static double uniformRand() {
664    return (double)rand() / RAND_MAX;
665}
666
667size_t LiveSession::getBandwidthIndex() {
668    if (mBandwidthItems.size() == 0) {
669        return 0;
670    }
671
672#if 1
673    char value[PROPERTY_VALUE_MAX];
674    ssize_t index = -1;
675    if (property_get("media.httplive.bw-index", value, NULL)) {
676        char *end;
677        index = strtol(value, &end, 10);
678        CHECK(end > value && *end == '\0');
679
680        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
681            index = mBandwidthItems.size() - 1;
682        }
683    }
684
685    if (index < 0) {
686        int32_t bandwidthBps;
687        if (mHTTPDataSource != NULL
688                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
689            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
690        } else {
691            ALOGV("no bandwidth estimate.");
692            return 0;  // Pick the lowest bandwidth stream by default.
693        }
694
695        char value[PROPERTY_VALUE_MAX];
696        if (property_get("media.httplive.max-bw", value, NULL)) {
697            char *end;
698            long maxBw = strtoul(value, &end, 10);
699            if (end > value && *end == '\0') {
700                if (maxBw > 0 && bandwidthBps > maxBw) {
701                    ALOGV("bandwidth capped to %ld bps", maxBw);
702                    bandwidthBps = maxBw;
703                }
704            }
705        }
706
707        // Consider only 80% of the available bandwidth usable.
708        bandwidthBps = (bandwidthBps * 8) / 10;
709
710        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
711
712        index = mBandwidthItems.size() - 1;
713        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
714                                > (size_t)bandwidthBps) {
715            --index;
716        }
717    }
718#elif 0
719    // Change bandwidth at random()
720    size_t index = uniformRand() * mBandwidthItems.size();
721#elif 0
722    // There's a 50% chance to stay on the current bandwidth and
723    // a 50% chance to switch to the next higher bandwidth (wrapping around
724    // to lowest)
725    const size_t kMinIndex = 0;
726
727    static ssize_t mPrevBandwidthIndex = -1;
728
729    size_t index;
730    if (mPrevBandwidthIndex < 0) {
731        index = kMinIndex;
732    } else if (uniformRand() < 0.5) {
733        index = (size_t)mPrevBandwidthIndex;
734    } else {
735        index = mPrevBandwidthIndex + 1;
736        if (index == mBandwidthItems.size()) {
737            index = kMinIndex;
738        }
739    }
740    mPrevBandwidthIndex = index;
741#elif 0
742    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
743
744    size_t index = mBandwidthItems.size() - 1;
745    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
746        --index;
747    }
748#elif 1
749    char value[PROPERTY_VALUE_MAX];
750    size_t index;
751    if (property_get("media.httplive.bw-index", value, NULL)) {
752        char *end;
753        index = strtoul(value, &end, 10);
754        CHECK(end > value && *end == '\0');
755
756        if (index >= mBandwidthItems.size()) {
757            index = mBandwidthItems.size() - 1;
758        }
759    } else {
760        index = 0;
761    }
762#else
763    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
764#endif
765
766    CHECK_GE(index, 0);
767
768    return index;
769}
770
771status_t LiveSession::onSeek(const sp<AMessage> &msg) {
772    int64_t timeUs;
773    CHECK(msg->findInt64("timeUs", &timeUs));
774
775    if (!mReconfigurationInProgress) {
776        changeConfiguration(timeUs, getBandwidthIndex());
777    }
778
779    return OK;
780}
781
782status_t LiveSession::getDuration(int64_t *durationUs) const {
783    int64_t maxDurationUs = 0ll;
784    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
785        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
786
787        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
788            maxDurationUs = fetcherDurationUs;
789        }
790    }
791
792    *durationUs = maxDurationUs;
793
794    return OK;
795}
796
797bool LiveSession::isSeekable() const {
798    int64_t durationUs;
799    return getDuration(&durationUs) == OK && durationUs >= 0;
800}
801
802bool LiveSession::hasDynamicDuration() const {
803    return false;
804}
805
806status_t LiveSession::getTrackInfo(Parcel *reply) const {
807    return mPlaylist->getTrackInfo(reply);
808}
809
810status_t LiveSession::selectTrack(size_t index, bool select) {
811    status_t err = mPlaylist->selectTrack(index, select);
812    if (err == OK) {
813        (new AMessage(kWhatChangeConfiguration, id()))->post();
814    }
815    return err;
816}
817
818void LiveSession::changeConfiguration(
819        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
820    CHECK(!mReconfigurationInProgress);
821    mReconfigurationInProgress = true;
822
823    mPrevBandwidthIndex = bandwidthIndex;
824
825    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
826          timeUs, bandwidthIndex, pickTrack);
827
828    if (pickTrack) {
829        mPlaylist->pickRandomMediaItems();
830    }
831
832    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
833    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
834
835    uint32_t streamMask = 0;
836
837    AString audioURI;
838    if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
839        streamMask |= STREAMTYPE_AUDIO;
840    }
841
842    AString videoURI;
843    if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
844        streamMask |= STREAMTYPE_VIDEO;
845    }
846
847    AString subtitleURI;
848    if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
849        streamMask |= STREAMTYPE_SUBTITLES;
850    }
851
852    // Step 1, stop and discard fetchers that are no longer needed.
853    // Pause those that we'll reuse.
854    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
855        const AString &uri = mFetcherInfos.keyAt(i);
856
857        bool discardFetcher = true;
858
859        // If we're seeking all current fetchers are discarded.
860        if (timeUs < 0ll) {
861            if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
862                    || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
863                    || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
864                discardFetcher = false;
865            }
866        }
867
868        if (discardFetcher) {
869            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
870        } else {
871            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
872        }
873    }
874
875    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
876    msg->setInt32("streamMask", streamMask);
877    msg->setInt64("timeUs", timeUs);
878    if (streamMask & STREAMTYPE_AUDIO) {
879        msg->setString("audioURI", audioURI.c_str());
880    }
881    if (streamMask & STREAMTYPE_VIDEO) {
882        msg->setString("videoURI", videoURI.c_str());
883    }
884    if (streamMask & STREAMTYPE_SUBTITLES) {
885        msg->setString("subtitleURI", subtitleURI.c_str());
886    }
887
888    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
889    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
890    // fetchers have completed their asynchronous operation, we'll post
891    // mContinuation, which then is handled below in onChangeConfiguration2.
892    mContinuationCounter = mFetcherInfos.size();
893    mContinuation = msg;
894
895    if (mContinuationCounter == 0) {
896        msg->post();
897    }
898}
899
900void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
901    if (!mReconfigurationInProgress) {
902        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
903    } else {
904        msg->post(1000000ll); // retry in 1 sec
905    }
906}
907
908void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
909    mContinuation.clear();
910
911    // All fetchers are either suspended or have been removed now.
912
913    uint32_t streamMask;
914    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
915
916    AString audioURI, videoURI, subtitleURI;
917    if (streamMask & STREAMTYPE_AUDIO) {
918        CHECK(msg->findString("audioURI", &audioURI));
919        ALOGV("audioURI = '%s'", audioURI.c_str());
920    }
921    if (streamMask & STREAMTYPE_VIDEO) {
922        CHECK(msg->findString("videoURI", &videoURI));
923        ALOGV("videoURI = '%s'", videoURI.c_str());
924    }
925    if (streamMask & STREAMTYPE_SUBTITLES) {
926        CHECK(msg->findString("subtitleURI", &subtitleURI));
927        ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
928    }
929
930    // Determine which decoders to shutdown on the player side,
931    // a decoder has to be shutdown if either
932    // 1) its streamtype was active before but now longer isn't.
933    // or
934    // 2) its streamtype was already active and still is but the URI
935    //    has changed.
936    uint32_t changedMask = 0;
937    if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
938                && !(audioURI == mAudioURI))
939        || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
940        changedMask |= STREAMTYPE_AUDIO;
941    }
942    if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
943                && !(videoURI == mVideoURI))
944        || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
945        changedMask |= STREAMTYPE_VIDEO;
946    }
947
948    if (changedMask == 0) {
949        // If nothing changed as far as the audio/video decoders
950        // are concerned we can proceed.
951        onChangeConfiguration3(msg);
952        return;
953    }
954
955    // Something changed, inform the player which will shutdown the
956    // corresponding decoders and will post the reply once that's done.
957    // Handling the reply will continue executing below in
958    // onChangeConfiguration3.
959    sp<AMessage> notify = mNotify->dup();
960    notify->setInt32("what", kWhatStreamsChanged);
961    notify->setInt32("changedMask", changedMask);
962
963    msg->setWhat(kWhatChangeConfiguration3);
964    msg->setTarget(id());
965
966    notify->setMessage("reply", msg);
967    notify->post();
968}
969
970void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
971    // All remaining fetchers are still suspended, the player has shutdown
972    // any decoders that needed it.
973
974    uint32_t streamMask;
975    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
976
977    AString audioURI, videoURI, subtitleURI;
978    if (streamMask & STREAMTYPE_AUDIO) {
979        CHECK(msg->findString("audioURI", &audioURI));
980    }
981    if (streamMask & STREAMTYPE_VIDEO) {
982        CHECK(msg->findString("videoURI", &videoURI));
983    }
984    if (streamMask & STREAMTYPE_SUBTITLES) {
985        CHECK(msg->findString("subtitleURI", &subtitleURI));
986    }
987
988    int64_t timeUs;
989    CHECK(msg->findInt64("timeUs", &timeUs));
990
991    if (timeUs < 0ll) {
992        timeUs = mLastDequeuedTimeUs;
993    }
994    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
995
996    mStreamMask = streamMask;
997    mAudioURI = audioURI;
998    mVideoURI = videoURI;
999    mSubtitleURI = subtitleURI;
1000
1001    // Resume all existing fetchers and assign them packet sources.
1002    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1003        const AString &uri = mFetcherInfos.keyAt(i);
1004
1005        uint32_t resumeMask = 0;
1006
1007        sp<AnotherPacketSource> audioSource;
1008        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1009            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1010            resumeMask |= STREAMTYPE_AUDIO;
1011        }
1012
1013        sp<AnotherPacketSource> videoSource;
1014        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1015            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1016            resumeMask |= STREAMTYPE_VIDEO;
1017        }
1018
1019        sp<AnotherPacketSource> subtitleSource;
1020        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1021            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1022            resumeMask |= STREAMTYPE_SUBTITLES;
1023        }
1024
1025        CHECK_NE(resumeMask, 0u);
1026
1027        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1028
1029        streamMask &= ~resumeMask;
1030
1031        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1032                audioSource, videoSource, subtitleSource);
1033    }
1034
1035    // streamMask now only contains the types that need a new fetcher created.
1036
1037    if (streamMask != 0) {
1038        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1039    }
1040
1041    while (streamMask != 0) {
1042        StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
1043
1044        AString uri;
1045        switch (streamType) {
1046            case STREAMTYPE_AUDIO:
1047                uri = audioURI;
1048                break;
1049            case STREAMTYPE_VIDEO:
1050                uri = videoURI;
1051                break;
1052            case STREAMTYPE_SUBTITLES:
1053                uri = subtitleURI;
1054                break;
1055            default:
1056                TRESPASS();
1057        }
1058
1059        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1060        CHECK(fetcher != NULL);
1061
1062        sp<AnotherPacketSource> audioSource;
1063        if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
1064            audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
1065            audioSource->clear();
1066
1067            streamMask &= ~STREAMTYPE_AUDIO;
1068        }
1069
1070        sp<AnotherPacketSource> videoSource;
1071        if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
1072            videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
1073            videoSource->clear();
1074
1075            streamMask &= ~STREAMTYPE_VIDEO;
1076        }
1077
1078        sp<AnotherPacketSource> subtitleSource;
1079        if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
1080            subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
1081            subtitleSource->clear();
1082
1083            streamMask &= ~STREAMTYPE_SUBTITLES;
1084        }
1085
1086        fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
1087    }
1088
1089    // All fetchers have now been started, the configuration change
1090    // has completed.
1091
1092    scheduleCheckBandwidthEvent();
1093
1094    ALOGV("XXX configuration change completed.");
1095
1096    mReconfigurationInProgress = false;
1097
1098    if (mDisconnectReplyID != 0) {
1099        finishDisconnect();
1100    }
1101}
1102
1103void LiveSession::scheduleCheckBandwidthEvent() {
1104    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1105    msg->setInt32("generation", mCheckBandwidthGeneration);
1106    msg->post(10000000ll);
1107}
1108
1109void LiveSession::cancelCheckBandwidthEvent() {
1110    ++mCheckBandwidthGeneration;
1111}
1112
1113void LiveSession::onCheckBandwidth() {
1114    if (mReconfigurationInProgress) {
1115        scheduleCheckBandwidthEvent();
1116        return;
1117    }
1118
1119    size_t bandwidthIndex = getBandwidthIndex();
1120    if (mPrevBandwidthIndex < 0
1121            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1122        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1123    }
1124
1125    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1126    // schedule another one on return, only an explicit call to
1127    // scheduleCheckBandwidthEvent will do that.
1128    // This ensures that only one configuration change is ongoing at any
1129    // one time, once that completes it'll schedule another check bandwidth
1130    // event.
1131}
1132
1133void LiveSession::postPrepared(status_t err) {
1134    CHECK(mInPreparationPhase);
1135
1136    sp<AMessage> notify = mNotify->dup();
1137    if (err == OK || err == ERROR_END_OF_STREAM) {
1138        notify->setInt32("what", kWhatPrepared);
1139    } else {
1140        notify->setInt32("what", kWhatPreparationFailed);
1141        notify->setInt32("err", err);
1142    }
1143
1144    notify->post();
1145
1146    mInPreparationPhase = false;
1147}
1148
1149}  // namespace android
1150
1151