LiveSession.cpp revision 6c8495c8f1ccc35db972ee7ac0dbb8baf5843548
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <ctype.h>
41#include <openssl/aes.h>
42#include <openssl/md5.h>
43
44namespace android {
45
46LiveSession::LiveSession(
47        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
48    : mNotify(notify),
49      mFlags(flags),
50      mUIDValid(uidValid),
51      mUID(uid),
52      mInPreparationPhase(true),
53      mHTTPDataSource(
54              HTTPBase::Create(
55                  (mFlags & kFlagIncognito)
56                    ? HTTPBase::kFlagIncognito
57                    : 0)),
58      mPrevBandwidthIndex(-1),
59      mStreamMask(0),
60      mCheckBandwidthGeneration(0),
61      mLastDequeuedTimeUs(0ll),
62      mRealTimeBaseUs(0ll),
63      mReconfigurationInProgress(false),
64      mDisconnectReplyID(0) {
65    if (mUIDValid) {
66        mHTTPDataSource->setUID(mUID);
67    }
68
69    mStreams[kAudioIndex] = StreamItem("audio");
70    mStreams[kVideoIndex] = StreamItem("video");
71    mStreams[kSubtitleIndex] = StreamItem("subtitle");
72
73    for (size_t i = 0; i < kMaxStreams; ++i) {
74        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
75    }
76}
77
78LiveSession::~LiveSession() {
79}
80
81status_t LiveSession::dequeueAccessUnit(
82        StreamType stream, sp<ABuffer> *accessUnit) {
83    if (!(mStreamMask & stream)) {
84        return UNKNOWN_ERROR;
85    }
86
87    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
88
89    status_t finalResult;
90    if (!packetSource->hasBufferAvailable(&finalResult)) {
91        return finalResult == OK ? -EAGAIN : finalResult;
92    }
93
94    status_t err = packetSource->dequeueAccessUnit(accessUnit);
95
96    const char *streamStr;
97    switch (stream) {
98        case STREAMTYPE_AUDIO:
99            streamStr = "audio";
100            break;
101        case STREAMTYPE_VIDEO:
102            streamStr = "video";
103            break;
104        case STREAMTYPE_SUBTITLES:
105            streamStr = "subs";
106            break;
107        default:
108            TRESPASS();
109    }
110
111    if (err == INFO_DISCONTINUITY) {
112        int32_t type;
113        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
114
115        sp<AMessage> extra;
116        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
117            extra.clear();
118        }
119
120        ALOGI("[%s] read discontinuity of type %d, extra = %s",
121              streamStr,
122              type,
123              extra == NULL ? "NULL" : extra->debugString().c_str());
124    } else if (err == OK) {
125        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
126            int64_t timeUs;
127            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
128            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
129
130            mLastDequeuedTimeUs = timeUs;
131            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
132        } else if (stream == STREAMTYPE_SUBTITLES) {
133            (*accessUnit)->meta()->setInt32(
134                    "trackIndex", mPlaylist->getSelectedIndex());
135            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
136        }
137    } else {
138        ALOGI("[%s] encountered error %d", streamStr, err);
139    }
140
141    return err;
142}
143
144status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
145    if (!(mStreamMask & stream)) {
146        return UNKNOWN_ERROR;
147    }
148
149    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
150
151    sp<MetaData> meta = packetSource->getFormat();
152
153    if (meta == NULL) {
154        return -EAGAIN;
155    }
156
157    return convertMetaDataToMessage(meta, format);
158}
159
160void LiveSession::connectAsync(
161        const char *url, const KeyedVector<String8, String8> *headers) {
162    sp<AMessage> msg = new AMessage(kWhatConnect, id());
163    msg->setString("url", url);
164
165    if (headers != NULL) {
166        msg->setPointer(
167                "headers",
168                new KeyedVector<String8, String8>(*headers));
169    }
170
171    msg->post();
172}
173
174status_t LiveSession::disconnect() {
175    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
176
177    sp<AMessage> response;
178    status_t err = msg->postAndAwaitResponse(&response);
179
180    return err;
181}
182
183status_t LiveSession::seekTo(int64_t timeUs) {
184    sp<AMessage> msg = new AMessage(kWhatSeek, id());
185    msg->setInt64("timeUs", timeUs);
186
187    sp<AMessage> response;
188    status_t err = msg->postAndAwaitResponse(&response);
189
190    return err;
191}
192
193void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
194    switch (msg->what()) {
195        case kWhatConnect:
196        {
197            onConnect(msg);
198            break;
199        }
200
201        case kWhatDisconnect:
202        {
203            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
204
205            if (mReconfigurationInProgress) {
206                break;
207            }
208
209            finishDisconnect();
210            break;
211        }
212
213        case kWhatSeek:
214        {
215            uint32_t replyID;
216            CHECK(msg->senderAwaitsResponse(&replyID));
217
218            status_t err = onSeek(msg);
219
220            sp<AMessage> response = new AMessage;
221            response->setInt32("err", err);
222
223            response->postReply(replyID);
224            break;
225        }
226
227        case kWhatFetcherNotify:
228        {
229            int32_t what;
230            CHECK(msg->findInt32("what", &what));
231
232            switch (what) {
233                case PlaylistFetcher::kWhatStarted:
234                    break;
235                case PlaylistFetcher::kWhatPaused:
236                case PlaylistFetcher::kWhatStopped:
237                {
238                    if (what == PlaylistFetcher::kWhatStopped) {
239                        AString uri;
240                        CHECK(msg->findString("uri", &uri));
241                        mFetcherInfos.removeItem(uri);
242                    }
243
244                    if (mContinuation != NULL) {
245                        CHECK_GT(mContinuationCounter, 0);
246                        if (--mContinuationCounter == 0) {
247                            mContinuation->post();
248                        }
249                    }
250                    break;
251                }
252
253                case PlaylistFetcher::kWhatDurationUpdate:
254                {
255                    AString uri;
256                    CHECK(msg->findString("uri", &uri));
257
258                    int64_t durationUs;
259                    CHECK(msg->findInt64("durationUs", &durationUs));
260
261                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
262                    info->mDurationUs = durationUs;
263                    break;
264                }
265
266                case PlaylistFetcher::kWhatError:
267                {
268                    status_t err;
269                    CHECK(msg->findInt32("err", &err));
270
271                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
272
273                    if (mInPreparationPhase) {
274                        postPrepared(err);
275                    }
276
277                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
278
279                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
280
281                    mPacketSources.valueFor(
282                            STREAMTYPE_SUBTITLES)->signalEOS(err);
283
284                    sp<AMessage> notify = mNotify->dup();
285                    notify->setInt32("what", kWhatError);
286                    notify->setInt32("err", err);
287                    notify->post();
288                    break;
289                }
290
291                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
292                {
293                    AString uri;
294                    CHECK(msg->findString("uri", &uri));
295
296                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
297                    info->mIsPrepared = true;
298
299                    if (mInPreparationPhase) {
300                        bool allFetchersPrepared = true;
301                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
302                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
303                                allFetchersPrepared = false;
304                                break;
305                            }
306                        }
307
308                        if (allFetchersPrepared) {
309                            postPrepared(OK);
310                        }
311                    }
312                    break;
313                }
314
315                default:
316                    TRESPASS();
317            }
318
319            break;
320        }
321
322        case kWhatCheckBandwidth:
323        {
324            int32_t generation;
325            CHECK(msg->findInt32("generation", &generation));
326
327            if (generation != mCheckBandwidthGeneration) {
328                break;
329            }
330
331            onCheckBandwidth();
332            break;
333        }
334
335        case kWhatChangeConfiguration:
336        {
337            onChangeConfiguration(msg);
338            break;
339        }
340
341        case kWhatChangeConfiguration2:
342        {
343            onChangeConfiguration2(msg);
344            break;
345        }
346
347        case kWhatChangeConfiguration3:
348        {
349            onChangeConfiguration3(msg);
350            break;
351        }
352
353        case kWhatFinishDisconnect2:
354        {
355            onFinishDisconnect2();
356            break;
357        }
358
359        default:
360            TRESPASS();
361            break;
362    }
363}
364
365// static
366int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
367    if (a->mBandwidth < b->mBandwidth) {
368        return -1;
369    } else if (a->mBandwidth == b->mBandwidth) {
370        return 0;
371    }
372
373    return 1;
374}
375
376// static
377LiveSession::StreamType LiveSession::indexToType(int idx) {
378    CHECK(idx >= 0 && idx < kMaxStreams);
379    return (StreamType)(1 << idx);
380}
381
382void LiveSession::onConnect(const sp<AMessage> &msg) {
383    AString url;
384    CHECK(msg->findString("url", &url));
385
386    KeyedVector<String8, String8> *headers = NULL;
387    if (!msg->findPointer("headers", (void **)&headers)) {
388        mExtraHeaders.clear();
389    } else {
390        mExtraHeaders = *headers;
391
392        delete headers;
393        headers = NULL;
394    }
395
396#if 1
397    ALOGI("onConnect <URL suppressed>");
398#else
399    ALOGI("onConnect %s", url.c_str());
400#endif
401
402    mMasterURL = url;
403
404    bool dummy;
405    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
406
407    if (mPlaylist == NULL) {
408        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
409
410        postPrepared(ERROR_IO);
411        return;
412    }
413
414    // We trust the content provider to make a reasonable choice of preferred
415    // initial bandwidth by listing it first in the variant playlist.
416    // At startup we really don't have a good estimate on the available
417    // network bandwidth since we haven't tranferred any data yet. Once
418    // we have we can make a better informed choice.
419    size_t initialBandwidth = 0;
420    size_t initialBandwidthIndex = 0;
421
422    if (mPlaylist->isVariantPlaylist()) {
423        for (size_t i = 0; i < mPlaylist->size(); ++i) {
424            BandwidthItem item;
425
426            item.mPlaylistIndex = i;
427
428            sp<AMessage> meta;
429            AString uri;
430            mPlaylist->itemAt(i, &uri, &meta);
431
432            unsigned long bandwidth;
433            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
434
435            if (initialBandwidth == 0) {
436                initialBandwidth = item.mBandwidth;
437            }
438
439            mBandwidthItems.push(item);
440        }
441
442        CHECK_GT(mBandwidthItems.size(), 0u);
443
444        mBandwidthItems.sort(SortByBandwidth);
445
446        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
447            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
448                initialBandwidthIndex = i;
449                break;
450            }
451        }
452    } else {
453        // dummy item.
454        BandwidthItem item;
455        item.mPlaylistIndex = 0;
456        item.mBandwidth = 0;
457        mBandwidthItems.push(item);
458    }
459
460    changeConfiguration(
461            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
462}
463
464void LiveSession::finishDisconnect() {
465    // No reconfiguration is currently pending, make sure none will trigger
466    // during disconnection either.
467    cancelCheckBandwidthEvent();
468
469    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
470        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
471    }
472
473    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
474
475    mContinuationCounter = mFetcherInfos.size();
476    mContinuation = msg;
477
478    if (mContinuationCounter == 0) {
479        msg->post();
480    }
481}
482
483void LiveSession::onFinishDisconnect2() {
484    mContinuation.clear();
485
486    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
487    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
488
489    mPacketSources.valueFor(
490            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
491
492    sp<AMessage> response = new AMessage;
493    response->setInt32("err", OK);
494
495    response->postReply(mDisconnectReplyID);
496    mDisconnectReplyID = 0;
497}
498
499sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
500    ssize_t index = mFetcherInfos.indexOfKey(uri);
501
502    if (index >= 0) {
503        return NULL;
504    }
505
506    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
507    notify->setString("uri", uri);
508
509    FetcherInfo info;
510    info.mFetcher = new PlaylistFetcher(notify, this, uri);
511    info.mDurationUs = -1ll;
512    info.mIsPrepared = false;
513    looper()->registerHandler(info.mFetcher);
514
515    mFetcherInfos.add(uri, info);
516
517    return info.mFetcher;
518}
519
520/*
521 * Illustration of parameters:
522 *
523 * 0      `range_offset`
524 * +------------+-------------------------------------------------------+--+--+
525 * |            |                                 | next block to fetch |  |  |
526 * |            | `source` handle => `out` buffer |                     |  |  |
527 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
528 * |            |<----------- `range_length` / buffer capacity ----------->|  |
529 * |<------------------------------ file_size ------------------------------->|
530 *
531 * Special parameter values:
532 * - range_length == -1 means entire file
533 * - block_size == 0 means entire range
534 *
535 */
536status_t LiveSession::fetchFile(
537        const char *url, sp<ABuffer> *out,
538        int64_t range_offset, int64_t range_length,
539        uint32_t block_size, /* download block size */
540        sp<DataSource> *source /* to return and reuse source */) {
541    off64_t size;
542    sp<DataSource> temp_source;
543    if (source == NULL) {
544        source = &temp_source;
545    }
546
547    if (*source == NULL) {
548        if (!strncasecmp(url, "file://", 7)) {
549            *source = new FileSource(url + 7);
550        } else if (strncasecmp(url, "http://", 7)
551                && strncasecmp(url, "https://", 8)) {
552            return ERROR_UNSUPPORTED;
553        } else {
554            KeyedVector<String8, String8> headers = mExtraHeaders;
555            if (range_offset > 0 || range_length >= 0) {
556                headers.add(
557                        String8("Range"),
558                        String8(
559                            StringPrintf(
560                                "bytes=%lld-%s",
561                                range_offset,
562                                range_length < 0
563                                    ? "" : StringPrintf("%lld",
564                                            range_offset + range_length - 1).c_str()).c_str()));
565            }
566            status_t err = mHTTPDataSource->connect(url, &headers);
567
568            if (err != OK) {
569                return err;
570            }
571
572            *source = mHTTPDataSource;
573        }
574    }
575
576    status_t getSizeErr = (*source)->getSize(&size);
577    if (getSizeErr != OK) {
578        size = 65536;
579    }
580
581    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
582    if (*out == NULL) {
583        buffer->setRange(0, 0);
584    }
585
586    // adjust range_length if only reading partial block
587    if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
588        range_length = buffer->size() + block_size;
589    }
590    for (;;) {
591        // Only resize when we don't know the size.
592        size_t bufferRemaining = buffer->capacity() - buffer->size();
593        if (bufferRemaining == 0 && getSizeErr != OK) {
594            bufferRemaining = 32768;
595
596            ALOGV("increasing download buffer to %d bytes",
597                 buffer->size() + bufferRemaining);
598
599            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
600            memcpy(copy->data(), buffer->data(), buffer->size());
601            copy->setRange(0, buffer->size());
602
603            buffer = copy;
604        }
605
606        size_t maxBytesToRead = bufferRemaining;
607        if (range_length >= 0) {
608            int64_t bytesLeftInRange = range_length - buffer->size();
609            if (bytesLeftInRange < maxBytesToRead) {
610                maxBytesToRead = bytesLeftInRange;
611
612                if (bytesLeftInRange == 0) {
613                    break;
614                }
615            }
616        }
617
618        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
619        // to help us break out of the loop.
620        ssize_t n = (*source)->readAt(
621                buffer->size(), buffer->data() + buffer->size(),
622                maxBytesToRead);
623
624        if (n < 0) {
625            return n;
626        }
627
628        if (n == 0) {
629            break;
630        }
631
632        buffer->setRange(0, buffer->size() + (size_t)n);
633    }
634
635    *out = buffer;
636
637    return OK;
638}
639
640sp<M3UParser> LiveSession::fetchPlaylist(
641        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
642    ALOGV("fetchPlaylist '%s'", url);
643
644    *unchanged = false;
645
646    sp<ABuffer> buffer;
647    status_t err = fetchFile(url, &buffer);
648
649    if (err != OK) {
650        return NULL;
651    }
652
653    // MD5 functionality is not available on the simulator, treat all
654    // playlists as changed.
655
656#if defined(HAVE_ANDROID_OS)
657    uint8_t hash[16];
658
659    MD5_CTX m;
660    MD5_Init(&m);
661    MD5_Update(&m, buffer->data(), buffer->size());
662
663    MD5_Final(hash, &m);
664
665    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
666        // playlist unchanged
667        *unchanged = true;
668
669        ALOGV("Playlist unchanged, refresh state is now %d",
670             (int)mRefreshState);
671
672        return NULL;
673    }
674
675    if (curPlaylistHash != NULL) {
676        memcpy(curPlaylistHash, hash, sizeof(hash));
677    }
678#endif
679
680    sp<M3UParser> playlist =
681        new M3UParser(url, buffer->data(), buffer->size());
682
683    if (playlist->initCheck() != OK) {
684        ALOGE("failed to parse .m3u8 playlist");
685
686        return NULL;
687    }
688
689    return playlist;
690}
691
692static double uniformRand() {
693    return (double)rand() / RAND_MAX;
694}
695
696size_t LiveSession::getBandwidthIndex() {
697    if (mBandwidthItems.size() == 0) {
698        return 0;
699    }
700
701#if 1
702    char value[PROPERTY_VALUE_MAX];
703    ssize_t index = -1;
704    if (property_get("media.httplive.bw-index", value, NULL)) {
705        char *end;
706        index = strtol(value, &end, 10);
707        CHECK(end > value && *end == '\0');
708
709        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
710            index = mBandwidthItems.size() - 1;
711        }
712    }
713
714    if (index < 0) {
715        int32_t bandwidthBps;
716        if (mHTTPDataSource != NULL
717                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
718            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
719        } else {
720            ALOGV("no bandwidth estimate.");
721            return 0;  // Pick the lowest bandwidth stream by default.
722        }
723
724        char value[PROPERTY_VALUE_MAX];
725        if (property_get("media.httplive.max-bw", value, NULL)) {
726            char *end;
727            long maxBw = strtoul(value, &end, 10);
728            if (end > value && *end == '\0') {
729                if (maxBw > 0 && bandwidthBps > maxBw) {
730                    ALOGV("bandwidth capped to %ld bps", maxBw);
731                    bandwidthBps = maxBw;
732                }
733            }
734        }
735
736        // Consider only 80% of the available bandwidth usable.
737        bandwidthBps = (bandwidthBps * 8) / 10;
738
739        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
740
741        index = mBandwidthItems.size() - 1;
742        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
743                                > (size_t)bandwidthBps) {
744            --index;
745        }
746    }
747#elif 0
748    // Change bandwidth at random()
749    size_t index = uniformRand() * mBandwidthItems.size();
750#elif 0
751    // There's a 50% chance to stay on the current bandwidth and
752    // a 50% chance to switch to the next higher bandwidth (wrapping around
753    // to lowest)
754    const size_t kMinIndex = 0;
755
756    static ssize_t mPrevBandwidthIndex = -1;
757
758    size_t index;
759    if (mPrevBandwidthIndex < 0) {
760        index = kMinIndex;
761    } else if (uniformRand() < 0.5) {
762        index = (size_t)mPrevBandwidthIndex;
763    } else {
764        index = mPrevBandwidthIndex + 1;
765        if (index == mBandwidthItems.size()) {
766            index = kMinIndex;
767        }
768    }
769    mPrevBandwidthIndex = index;
770#elif 0
771    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
772
773    size_t index = mBandwidthItems.size() - 1;
774    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
775        --index;
776    }
777#elif 1
778    char value[PROPERTY_VALUE_MAX];
779    size_t index;
780    if (property_get("media.httplive.bw-index", value, NULL)) {
781        char *end;
782        index = strtoul(value, &end, 10);
783        CHECK(end > value && *end == '\0');
784
785        if (index >= mBandwidthItems.size()) {
786            index = mBandwidthItems.size() - 1;
787        }
788    } else {
789        index = 0;
790    }
791#else
792    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
793#endif
794
795    CHECK_GE(index, 0);
796
797    return index;
798}
799
800status_t LiveSession::onSeek(const sp<AMessage> &msg) {
801    int64_t timeUs;
802    CHECK(msg->findInt64("timeUs", &timeUs));
803
804    if (!mReconfigurationInProgress) {
805        changeConfiguration(timeUs, getBandwidthIndex());
806    }
807
808    return OK;
809}
810
811status_t LiveSession::getDuration(int64_t *durationUs) const {
812    int64_t maxDurationUs = 0ll;
813    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
814        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
815
816        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
817            maxDurationUs = fetcherDurationUs;
818        }
819    }
820
821    *durationUs = maxDurationUs;
822
823    return OK;
824}
825
826bool LiveSession::isSeekable() const {
827    int64_t durationUs;
828    return getDuration(&durationUs) == OK && durationUs >= 0;
829}
830
831bool LiveSession::hasDynamicDuration() const {
832    return false;
833}
834
835status_t LiveSession::getTrackInfo(Parcel *reply) const {
836    return mPlaylist->getTrackInfo(reply);
837}
838
839status_t LiveSession::selectTrack(size_t index, bool select) {
840    status_t err = mPlaylist->selectTrack(index, select);
841    if (err == OK) {
842        (new AMessage(kWhatChangeConfiguration, id()))->post();
843    }
844    return err;
845}
846
847void LiveSession::changeConfiguration(
848        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
849    CHECK(!mReconfigurationInProgress);
850    mReconfigurationInProgress = true;
851
852    mPrevBandwidthIndex = bandwidthIndex;
853
854    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
855          timeUs, bandwidthIndex, pickTrack);
856
857    if (pickTrack) {
858        mPlaylist->pickRandomMediaItems();
859    }
860
861    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
862    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
863
864    uint32_t streamMask = 0;
865
866    AString URIs[kMaxStreams];
867    for (size_t i = 0; i < kMaxStreams; ++i) {
868        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
869            streamMask |= indexToType(i);
870        }
871    }
872
873    // Step 1, stop and discard fetchers that are no longer needed.
874    // Pause those that we'll reuse.
875    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
876        const AString &uri = mFetcherInfos.keyAt(i);
877
878        bool discardFetcher = true;
879
880        // If we're seeking all current fetchers are discarded.
881        if (timeUs < 0ll) {
882            for (size_t j = 0; j < kMaxStreams; ++j) {
883                if ((streamMask & indexToType(j)) && uri == URIs[j]) {
884                    discardFetcher = false;
885                }
886            }
887        }
888
889        if (discardFetcher) {
890            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
891        } else {
892            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
893        }
894    }
895
896    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
897    msg->setInt32("streamMask", streamMask);
898    msg->setInt64("timeUs", timeUs);
899    for (size_t i = 0; i < kMaxStreams; ++i) {
900        if (streamMask & indexToType(i)) {
901            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
902        }
903    }
904
905    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
906    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
907    // fetchers have completed their asynchronous operation, we'll post
908    // mContinuation, which then is handled below in onChangeConfiguration2.
909    mContinuationCounter = mFetcherInfos.size();
910    mContinuation = msg;
911
912    if (mContinuationCounter == 0) {
913        msg->post();
914    }
915}
916
917void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
918    if (!mReconfigurationInProgress) {
919        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
920    } else {
921        msg->post(1000000ll); // retry in 1 sec
922    }
923}
924
925void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
926    mContinuation.clear();
927
928    // All fetchers are either suspended or have been removed now.
929
930    uint32_t streamMask;
931    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
932
933    AString URIs[kMaxStreams];
934    for (size_t i = 0; i < kMaxStreams; ++i) {
935        if (streamMask & indexToType(i)) {
936            const AString &uriKey = mStreams[i].uriKey();
937            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
938            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
939        }
940    }
941
942    // Determine which decoders to shutdown on the player side,
943    // a decoder has to be shutdown if either
944    // 1) its streamtype was active before but now longer isn't.
945    // or
946    // 2) its streamtype was already active and still is but the URI
947    //    has changed.
948    uint32_t changedMask = 0;
949    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
950        if (((mStreamMask & streamMask & indexToType(i))
951                && !(URIs[i] == mStreams[i].mUri))
952                || (mStreamMask & ~streamMask & indexToType(i))) {
953            changedMask |= indexToType(i);
954        }
955    }
956
957    if (changedMask == 0) {
958        // If nothing changed as far as the audio/video decoders
959        // are concerned we can proceed.
960        onChangeConfiguration3(msg);
961        return;
962    }
963
964    // Something changed, inform the player which will shutdown the
965    // corresponding decoders and will post the reply once that's done.
966    // Handling the reply will continue executing below in
967    // onChangeConfiguration3.
968    sp<AMessage> notify = mNotify->dup();
969    notify->setInt32("what", kWhatStreamsChanged);
970    notify->setInt32("changedMask", changedMask);
971
972    msg->setWhat(kWhatChangeConfiguration3);
973    msg->setTarget(id());
974
975    notify->setMessage("reply", msg);
976    notify->post();
977}
978
979void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
980    // All remaining fetchers are still suspended, the player has shutdown
981    // any decoders that needed it.
982
983    uint32_t streamMask;
984    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
985
986    for (size_t i = 0; i < kMaxStreams; ++i) {
987        if (streamMask & indexToType(i)) {
988            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
989        }
990    }
991
992    int64_t timeUs;
993    CHECK(msg->findInt64("timeUs", &timeUs));
994
995    if (timeUs < 0ll) {
996        timeUs = mLastDequeuedTimeUs;
997    }
998    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
999
1000    mStreamMask = streamMask;
1001
1002    // Resume all existing fetchers and assign them packet sources.
1003    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1004        const AString &uri = mFetcherInfos.keyAt(i);
1005
1006        uint32_t resumeMask = 0;
1007
1008        sp<AnotherPacketSource> sources[kMaxStreams];
1009        for (size_t j = 0; j < kMaxStreams; ++j) {
1010            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1011                sources[j] = mPacketSources.valueFor(indexToType(j));
1012                resumeMask |= indexToType(j);
1013            }
1014        }
1015
1016        CHECK_NE(resumeMask, 0u);
1017
1018        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1019
1020        streamMask &= ~resumeMask;
1021
1022        mFetcherInfos.valueAt(i).mFetcher->startAsync(
1023                sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1024    }
1025
1026    // streamMask now only contains the types that need a new fetcher created.
1027
1028    if (streamMask != 0) {
1029        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1030    }
1031
1032    for (size_t i = 0; i < kMaxStreams; i++) {
1033        if (!(indexToType(i) & streamMask)) {
1034            continue;
1035        }
1036
1037        AString uri;
1038        uri = mStreams[i].mUri;
1039
1040        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1041        CHECK(fetcher != NULL);
1042
1043        sp<AnotherPacketSource> sources[kMaxStreams];
1044        // TRICKY: looping from i as earlier streams are already removed from streamMask
1045        for (size_t j = i; j < kMaxStreams; ++j) {
1046            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1047                sources[j] = mPacketSources.valueFor(indexToType(j));
1048                sources[j]->clear();
1049
1050                streamMask &= ~indexToType(j);
1051            }
1052        }
1053
1054        fetcher->startAsync(
1055                sources[kAudioIndex],
1056                sources[kVideoIndex],
1057                sources[kSubtitleIndex],
1058                timeUs);
1059    }
1060
1061    // All fetchers have now been started, the configuration change
1062    // has completed.
1063
1064    scheduleCheckBandwidthEvent();
1065
1066    ALOGV("XXX configuration change completed.");
1067
1068    mReconfigurationInProgress = false;
1069
1070    if (mDisconnectReplyID != 0) {
1071        finishDisconnect();
1072    }
1073}
1074
1075void LiveSession::scheduleCheckBandwidthEvent() {
1076    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1077    msg->setInt32("generation", mCheckBandwidthGeneration);
1078    msg->post(10000000ll);
1079}
1080
1081void LiveSession::cancelCheckBandwidthEvent() {
1082    ++mCheckBandwidthGeneration;
1083}
1084
1085void LiveSession::onCheckBandwidth() {
1086    if (mReconfigurationInProgress) {
1087        scheduleCheckBandwidthEvent();
1088        return;
1089    }
1090
1091    size_t bandwidthIndex = getBandwidthIndex();
1092    if (mPrevBandwidthIndex < 0
1093            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
1094        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1095    }
1096
1097    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1098    // schedule another one on return, only an explicit call to
1099    // scheduleCheckBandwidthEvent will do that.
1100    // This ensures that only one configuration change is ongoing at any
1101    // one time, once that completes it'll schedule another check bandwidth
1102    // event.
1103}
1104
1105void LiveSession::postPrepared(status_t err) {
1106    CHECK(mInPreparationPhase);
1107
1108    sp<AMessage> notify = mNotify->dup();
1109    if (err == OK || err == ERROR_END_OF_STREAM) {
1110        notify->setInt32("what", kWhatPrepared);
1111    } else {
1112        notify->setInt32("what", kWhatPreparationFailed);
1113        notify->setInt32("err", err);
1114    }
1115
1116    notify->post();
1117
1118    mInPreparationPhase = false;
1119}
1120
1121}  // namespace android
1122
1123