LiveSession.cpp revision a0b94395dc82c90ca437bb6fed7aa01fcbbffffe
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <utils/Mutex.h>
41
42#include <ctype.h>
43#include <openssl/aes.h>
44#include <openssl/md5.h>
45
46namespace android {
47
48LiveSession::LiveSession(
49        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
50    : mNotify(notify),
51      mFlags(flags),
52      mUIDValid(uidValid),
53      mUID(uid),
54      mInPreparationPhase(true),
55      mHTTPDataSource(
56              HTTPBase::Create(
57                  (mFlags & kFlagIncognito)
58                    ? HTTPBase::kFlagIncognito
59                    : 0)),
60      mPrevBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0) {
72    if (mUIDValid) {
73        mHTTPDataSource->setUID(mUID);
74    }
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitle");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83    }
84}
85
86LiveSession::~LiveSession() {
87}
88
89sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
90    ABuffer *discontinuity = new ABuffer(0);
91    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
92    discontinuity->meta()->setInt32("swapPacketSource", swap);
93    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
94    discontinuity->meta()->setInt64("timeUs", -1);
95    return discontinuity;
96}
97
98void LiveSession::swapPacketSource(StreamType stream) {
99    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
100    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
101    sp<AnotherPacketSource> tmp = aps;
102    aps = aps2;
103    aps2 = tmp;
104    aps2->clear();
105}
106
107status_t LiveSession::dequeueAccessUnit(
108        StreamType stream, sp<ABuffer> *accessUnit) {
109    if (!(mStreamMask & stream)) {
110        // return -EWOULDBLOCK to avoid halting the decoder
111        // when switching between audio/video and audio only.
112        return -EWOULDBLOCK;
113    }
114
115    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
116
117    status_t finalResult;
118    if (!packetSource->hasBufferAvailable(&finalResult)) {
119        return finalResult == OK ? -EAGAIN : finalResult;
120    }
121
122    status_t err = packetSource->dequeueAccessUnit(accessUnit);
123
124    const char *streamStr;
125    switch (stream) {
126        case STREAMTYPE_AUDIO:
127            streamStr = "audio";
128            break;
129        case STREAMTYPE_VIDEO:
130            streamStr = "video";
131            break;
132        case STREAMTYPE_SUBTITLES:
133            streamStr = "subs";
134            break;
135        default:
136            TRESPASS();
137    }
138
139    if (err == INFO_DISCONTINUITY) {
140        int32_t type;
141        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
142
143        sp<AMessage> extra;
144        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
145            extra.clear();
146        }
147
148        ALOGI("[%s] read discontinuity of type %d, extra = %s",
149              streamStr,
150              type,
151              extra == NULL ? "NULL" : extra->debugString().c_str());
152
153        int32_t swap;
154        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
155                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
156                && swap) {
157
158            int32_t switchGeneration;
159            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
160            {
161                Mutex::Autolock lock(mSwapMutex);
162                if (switchGeneration == mSwitchGeneration) {
163                    swapPacketSource(stream);
164                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
165                    msg->setInt32("stream", stream);
166                    msg->setInt32("switchGeneration", switchGeneration);
167                    msg->post();
168                }
169            }
170        }
171    } else if (err == OK) {
172        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
173            int64_t timeUs;
174            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
175            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
176
177            mLastDequeuedTimeUs = timeUs;
178            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
179        } else if (stream == STREAMTYPE_SUBTITLES) {
180            (*accessUnit)->meta()->setInt32(
181                    "trackIndex", mPlaylist->getSelectedIndex());
182            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
183        }
184    } else {
185        ALOGI("[%s] encountered error %d", streamStr, err);
186    }
187
188    return err;
189}
190
191status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
192    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
193    if (!(mStreamMask & stream)) {
194        return UNKNOWN_ERROR;
195    }
196
197    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
198
199    sp<MetaData> meta = packetSource->getFormat();
200
201    if (meta == NULL) {
202        return -EAGAIN;
203    }
204
205    return convertMetaDataToMessage(meta, format);
206}
207
208void LiveSession::connectAsync(
209        const char *url, const KeyedVector<String8, String8> *headers) {
210    sp<AMessage> msg = new AMessage(kWhatConnect, id());
211    msg->setString("url", url);
212
213    if (headers != NULL) {
214        msg->setPointer(
215                "headers",
216                new KeyedVector<String8, String8>(*headers));
217    }
218
219    msg->post();
220}
221
222status_t LiveSession::disconnect() {
223    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
224
225    sp<AMessage> response;
226    status_t err = msg->postAndAwaitResponse(&response);
227
228    return err;
229}
230
231status_t LiveSession::seekTo(int64_t timeUs) {
232    sp<AMessage> msg = new AMessage(kWhatSeek, id());
233    msg->setInt64("timeUs", timeUs);
234
235    sp<AMessage> response;
236    status_t err = msg->postAndAwaitResponse(&response);
237
238    uint32_t replyID;
239    CHECK(response == mSeekReply && 0 != mSeekReplyID);
240    mSeekReply.clear();
241    mSeekReplyID = 0;
242    return err;
243}
244
245void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
246    switch (msg->what()) {
247        case kWhatConnect:
248        {
249            onConnect(msg);
250            break;
251        }
252
253        case kWhatDisconnect:
254        {
255            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
256
257            if (mReconfigurationInProgress) {
258                break;
259            }
260
261            finishDisconnect();
262            break;
263        }
264
265        case kWhatSeek:
266        {
267            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
268
269            status_t err = onSeek(msg);
270
271            mSeekReply = new AMessage;
272            mSeekReply->setInt32("err", err);
273            break;
274        }
275
276        case kWhatFetcherNotify:
277        {
278            int32_t what;
279            CHECK(msg->findInt32("what", &what));
280
281            switch (what) {
282                case PlaylistFetcher::kWhatStarted:
283                    break;
284                case PlaylistFetcher::kWhatPaused:
285                case PlaylistFetcher::kWhatStopped:
286                {
287                    if (what == PlaylistFetcher::kWhatStopped) {
288                        AString uri;
289                        CHECK(msg->findString("uri", &uri));
290                        if (mFetcherInfos.removeItem(uri) < 0) {
291                            // ignore duplicated kWhatStopped messages.
292                            break;
293                        }
294
295                        tryToFinishBandwidthSwitch();
296                    }
297
298                    if (mContinuation != NULL) {
299                        CHECK_GT(mContinuationCounter, 0);
300                        if (--mContinuationCounter == 0) {
301                            mContinuation->post();
302
303                            if (mSeekReplyID != 0) {
304                                CHECK(mSeekReply != NULL);
305                                mSeekReply->postReply(mSeekReplyID);
306                            }
307                        }
308                    }
309                    break;
310                }
311
312                case PlaylistFetcher::kWhatDurationUpdate:
313                {
314                    AString uri;
315                    CHECK(msg->findString("uri", &uri));
316
317                    int64_t durationUs;
318                    CHECK(msg->findInt64("durationUs", &durationUs));
319
320                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
321                    info->mDurationUs = durationUs;
322                    break;
323                }
324
325                case PlaylistFetcher::kWhatError:
326                {
327                    status_t err;
328                    CHECK(msg->findInt32("err", &err));
329
330                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
331
332                    if (mInPreparationPhase) {
333                        postPrepared(err);
334                    }
335
336                    cancelBandwidthSwitch();
337
338                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
339
340                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
341
342                    mPacketSources.valueFor(
343                            STREAMTYPE_SUBTITLES)->signalEOS(err);
344
345                    sp<AMessage> notify = mNotify->dup();
346                    notify->setInt32("what", kWhatError);
347                    notify->setInt32("err", err);
348                    notify->post();
349                    break;
350                }
351
352                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
353                {
354                    AString uri;
355                    CHECK(msg->findString("uri", &uri));
356
357                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
358                    info->mIsPrepared = true;
359
360                    if (mInPreparationPhase) {
361                        bool allFetchersPrepared = true;
362                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
363                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
364                                allFetchersPrepared = false;
365                                break;
366                            }
367                        }
368
369                        if (allFetchersPrepared) {
370                            postPrepared(OK);
371                        }
372                    }
373                    break;
374                }
375
376                case PlaylistFetcher::kWhatStartedAt:
377                {
378                    int32_t switchGeneration;
379                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
380
381                    if (switchGeneration != mSwitchGeneration) {
382                        break;
383                    }
384
385                    // Resume fetcher for the original variant; the resumed fetcher should
386                    // continue until the timestamps found in msg, which is stored by the
387                    // new fetcher to indicate where the new variant has started buffering.
388                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
389                        const FetcherInfo info = mFetcherInfos.valueAt(i);
390                        if (info.mToBeRemoved) {
391                            info.mFetcher->resumeUntilAsync(msg);
392                        }
393                    }
394                    break;
395                }
396
397                default:
398                    TRESPASS();
399            }
400
401            break;
402        }
403
404        case kWhatCheckBandwidth:
405        {
406            int32_t generation;
407            CHECK(msg->findInt32("generation", &generation));
408
409            if (generation != mCheckBandwidthGeneration) {
410                break;
411            }
412
413            onCheckBandwidth();
414            break;
415        }
416
417        case kWhatChangeConfiguration:
418        {
419            onChangeConfiguration(msg);
420            break;
421        }
422
423        case kWhatChangeConfiguration2:
424        {
425            onChangeConfiguration2(msg);
426            break;
427        }
428
429        case kWhatChangeConfiguration3:
430        {
431            onChangeConfiguration3(msg);
432            break;
433        }
434
435        case kWhatFinishDisconnect2:
436        {
437            onFinishDisconnect2();
438            break;
439        }
440
441        case kWhatSwapped:
442        {
443            onSwapped(msg);
444            break;
445        }
446        default:
447            TRESPASS();
448            break;
449    }
450}
451
452// static
453int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
454    if (a->mBandwidth < b->mBandwidth) {
455        return -1;
456    } else if (a->mBandwidth == b->mBandwidth) {
457        return 0;
458    }
459
460    return 1;
461}
462
463// static
464LiveSession::StreamType LiveSession::indexToType(int idx) {
465    CHECK(idx >= 0 && idx < kMaxStreams);
466    return (StreamType)(1 << idx);
467}
468
469void LiveSession::onConnect(const sp<AMessage> &msg) {
470    AString url;
471    CHECK(msg->findString("url", &url));
472
473    KeyedVector<String8, String8> *headers = NULL;
474    if (!msg->findPointer("headers", (void **)&headers)) {
475        mExtraHeaders.clear();
476    } else {
477        mExtraHeaders = *headers;
478
479        delete headers;
480        headers = NULL;
481    }
482
483#if 1
484    ALOGI("onConnect <URL suppressed>");
485#else
486    ALOGI("onConnect %s", url.c_str());
487#endif
488
489    mMasterURL = url;
490
491    bool dummy;
492    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
493
494    if (mPlaylist == NULL) {
495        ALOGE("unable to fetch master playlist <URL suppressed>.");
496
497        postPrepared(ERROR_IO);
498        return;
499    }
500
501    // We trust the content provider to make a reasonable choice of preferred
502    // initial bandwidth by listing it first in the variant playlist.
503    // At startup we really don't have a good estimate on the available
504    // network bandwidth since we haven't tranferred any data yet. Once
505    // we have we can make a better informed choice.
506    size_t initialBandwidth = 0;
507    size_t initialBandwidthIndex = 0;
508
509    if (mPlaylist->isVariantPlaylist()) {
510        for (size_t i = 0; i < mPlaylist->size(); ++i) {
511            BandwidthItem item;
512
513            item.mPlaylistIndex = i;
514
515            sp<AMessage> meta;
516            AString uri;
517            mPlaylist->itemAt(i, &uri, &meta);
518
519            unsigned long bandwidth;
520            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
521
522            if (initialBandwidth == 0) {
523                initialBandwidth = item.mBandwidth;
524            }
525
526            mBandwidthItems.push(item);
527        }
528
529        CHECK_GT(mBandwidthItems.size(), 0u);
530
531        mBandwidthItems.sort(SortByBandwidth);
532
533        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
534            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
535                initialBandwidthIndex = i;
536                break;
537            }
538        }
539    } else {
540        // dummy item.
541        BandwidthItem item;
542        item.mPlaylistIndex = 0;
543        item.mBandwidth = 0;
544        mBandwidthItems.push(item);
545    }
546
547    changeConfiguration(
548            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
549}
550
551void LiveSession::finishDisconnect() {
552    // No reconfiguration is currently pending, make sure none will trigger
553    // during disconnection either.
554    cancelCheckBandwidthEvent();
555
556    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
557    // (finishDisconnect, onFinishDisconnect2)
558    cancelBandwidthSwitch();
559
560    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
561        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
562    }
563
564    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
565
566    mContinuationCounter = mFetcherInfos.size();
567    mContinuation = msg;
568
569    if (mContinuationCounter == 0) {
570        msg->post();
571    }
572}
573
574void LiveSession::onFinishDisconnect2() {
575    mContinuation.clear();
576
577    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
578    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
579
580    mPacketSources.valueFor(
581            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
582
583    sp<AMessage> response = new AMessage;
584    response->setInt32("err", OK);
585
586    response->postReply(mDisconnectReplyID);
587    mDisconnectReplyID = 0;
588}
589
590sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
591    ssize_t index = mFetcherInfos.indexOfKey(uri);
592
593    if (index >= 0) {
594        return NULL;
595    }
596
597    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
598    notify->setString("uri", uri);
599    notify->setInt32("switchGeneration", mSwitchGeneration);
600
601    FetcherInfo info;
602    info.mFetcher = new PlaylistFetcher(notify, this, uri);
603    info.mDurationUs = -1ll;
604    info.mIsPrepared = false;
605    info.mToBeRemoved = false;
606    looper()->registerHandler(info.mFetcher);
607
608    mFetcherInfos.add(uri, info);
609
610    return info.mFetcher;
611}
612
613/*
614 * Illustration of parameters:
615 *
616 * 0      `range_offset`
617 * +------------+-------------------------------------------------------+--+--+
618 * |            |                                 | next block to fetch |  |  |
619 * |            | `source` handle => `out` buffer |                     |  |  |
620 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
621 * |            |<----------- `range_length` / buffer capacity ----------->|  |
622 * |<------------------------------ file_size ------------------------------->|
623 *
624 * Special parameter values:
625 * - range_length == -1 means entire file
626 * - block_size == 0 means entire range
627 *
628 */
629status_t LiveSession::fetchFile(
630        const char *url, sp<ABuffer> *out,
631        int64_t range_offset, int64_t range_length,
632        uint32_t block_size, /* download block size */
633        sp<DataSource> *source, /* to return and reuse source */
634        String8 *actualUrl) {
635    off64_t size;
636    sp<DataSource> temp_source;
637    if (source == NULL) {
638        source = &temp_source;
639    }
640
641    if (*source == NULL) {
642        if (!strncasecmp(url, "file://", 7)) {
643            *source = new FileSource(url + 7);
644        } else if (strncasecmp(url, "http://", 7)
645                && strncasecmp(url, "https://", 8)) {
646            return ERROR_UNSUPPORTED;
647        } else {
648            KeyedVector<String8, String8> headers = mExtraHeaders;
649            if (range_offset > 0 || range_length >= 0) {
650                headers.add(
651                        String8("Range"),
652                        String8(
653                            StringPrintf(
654                                "bytes=%lld-%s",
655                                range_offset,
656                                range_length < 0
657                                    ? "" : StringPrintf("%lld",
658                                            range_offset + range_length - 1).c_str()).c_str()));
659            }
660            status_t err = mHTTPDataSource->connect(url, &headers);
661
662            if (err != OK) {
663                return err;
664            }
665
666            *source = mHTTPDataSource;
667        }
668    }
669
670    status_t getSizeErr = (*source)->getSize(&size);
671    if (getSizeErr != OK) {
672        size = 65536;
673    }
674
675    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
676    if (*out == NULL) {
677        buffer->setRange(0, 0);
678    }
679
680    // adjust range_length if only reading partial block
681    if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
682        range_length = buffer->size() + block_size;
683    }
684    for (;;) {
685        // Only resize when we don't know the size.
686        size_t bufferRemaining = buffer->capacity() - buffer->size();
687        if (bufferRemaining == 0 && getSizeErr != OK) {
688            bufferRemaining = 32768;
689
690            ALOGV("increasing download buffer to %d bytes",
691                 buffer->size() + bufferRemaining);
692
693            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
694            memcpy(copy->data(), buffer->data(), buffer->size());
695            copy->setRange(0, buffer->size());
696
697            buffer = copy;
698        }
699
700        size_t maxBytesToRead = bufferRemaining;
701        if (range_length >= 0) {
702            int64_t bytesLeftInRange = range_length - buffer->size();
703            if (bytesLeftInRange < maxBytesToRead) {
704                maxBytesToRead = bytesLeftInRange;
705
706                if (bytesLeftInRange == 0) {
707                    break;
708                }
709            }
710        }
711
712        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
713        // to help us break out of the loop.
714        ssize_t n = (*source)->readAt(
715                buffer->size(), buffer->data() + buffer->size(),
716                maxBytesToRead);
717
718        if (n < 0) {
719            return n;
720        }
721
722        if (n == 0) {
723            break;
724        }
725
726        buffer->setRange(0, buffer->size() + (size_t)n);
727    }
728
729    *out = buffer;
730    if (actualUrl != NULL) {
731        *actualUrl = (*source)->getUri();
732        if (actualUrl->isEmpty()) {
733            *actualUrl = url;
734        }
735    }
736
737    return OK;
738}
739
740sp<M3UParser> LiveSession::fetchPlaylist(
741        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
742    ALOGV("fetchPlaylist '%s'", url);
743
744    *unchanged = false;
745
746    sp<ABuffer> buffer;
747    String8 actualUrl;
748    status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
749
750    if (err != OK) {
751        return NULL;
752    }
753
754    // MD5 functionality is not available on the simulator, treat all
755    // playlists as changed.
756
757#if defined(HAVE_ANDROID_OS)
758    uint8_t hash[16];
759
760    MD5_CTX m;
761    MD5_Init(&m);
762    MD5_Update(&m, buffer->data(), buffer->size());
763
764    MD5_Final(hash, &m);
765
766    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
767        // playlist unchanged
768        *unchanged = true;
769
770        return NULL;
771    }
772
773    if (curPlaylistHash != NULL) {
774        memcpy(curPlaylistHash, hash, sizeof(hash));
775    }
776#endif
777
778    sp<M3UParser> playlist =
779        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
780
781    if (playlist->initCheck() != OK) {
782        ALOGE("failed to parse .m3u8 playlist");
783
784        return NULL;
785    }
786
787    return playlist;
788}
789
790static double uniformRand() {
791    return (double)rand() / RAND_MAX;
792}
793
794size_t LiveSession::getBandwidthIndex() {
795    if (mBandwidthItems.size() == 0) {
796        return 0;
797    }
798
799#if 1
800    char value[PROPERTY_VALUE_MAX];
801    ssize_t index = -1;
802    if (property_get("media.httplive.bw-index", value, NULL)) {
803        char *end;
804        index = strtol(value, &end, 10);
805        CHECK(end > value && *end == '\0');
806
807        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
808            index = mBandwidthItems.size() - 1;
809        }
810    }
811
812    if (index < 0) {
813        int32_t bandwidthBps;
814        if (mHTTPDataSource != NULL
815                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
816            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
817        } else {
818            ALOGV("no bandwidth estimate.");
819            return 0;  // Pick the lowest bandwidth stream by default.
820        }
821
822        char value[PROPERTY_VALUE_MAX];
823        if (property_get("media.httplive.max-bw", value, NULL)) {
824            char *end;
825            long maxBw = strtoul(value, &end, 10);
826            if (end > value && *end == '\0') {
827                if (maxBw > 0 && bandwidthBps > maxBw) {
828                    ALOGV("bandwidth capped to %ld bps", maxBw);
829                    bandwidthBps = maxBw;
830                }
831            }
832        }
833
834        // Consider only 80% of the available bandwidth usable.
835        bandwidthBps = (bandwidthBps * 8) / 10;
836
837        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
838
839        index = mBandwidthItems.size() - 1;
840        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
841                                > (size_t)bandwidthBps) {
842            --index;
843        }
844    }
845#elif 0
846    // Change bandwidth at random()
847    size_t index = uniformRand() * mBandwidthItems.size();
848#elif 0
849    // There's a 50% chance to stay on the current bandwidth and
850    // a 50% chance to switch to the next higher bandwidth (wrapping around
851    // to lowest)
852    const size_t kMinIndex = 0;
853
854    static ssize_t mPrevBandwidthIndex = -1;
855
856    size_t index;
857    if (mPrevBandwidthIndex < 0) {
858        index = kMinIndex;
859    } else if (uniformRand() < 0.5) {
860        index = (size_t)mPrevBandwidthIndex;
861    } else {
862        index = mPrevBandwidthIndex + 1;
863        if (index == mBandwidthItems.size()) {
864            index = kMinIndex;
865        }
866    }
867    mPrevBandwidthIndex = index;
868#elif 0
869    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
870
871    size_t index = mBandwidthItems.size() - 1;
872    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
873        --index;
874    }
875#elif 1
876    char value[PROPERTY_VALUE_MAX];
877    size_t index;
878    if (property_get("media.httplive.bw-index", value, NULL)) {
879        char *end;
880        index = strtoul(value, &end, 10);
881        CHECK(end > value && *end == '\0');
882
883        if (index >= mBandwidthItems.size()) {
884            index = mBandwidthItems.size() - 1;
885        }
886    } else {
887        index = 0;
888    }
889#else
890    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
891#endif
892
893    CHECK_GE(index, 0);
894
895    return index;
896}
897
898status_t LiveSession::onSeek(const sp<AMessage> &msg) {
899    int64_t timeUs;
900    CHECK(msg->findInt64("timeUs", &timeUs));
901
902    if (!mReconfigurationInProgress) {
903        changeConfiguration(timeUs, getBandwidthIndex());
904    }
905
906    return OK;
907}
908
909status_t LiveSession::getDuration(int64_t *durationUs) const {
910    int64_t maxDurationUs = 0ll;
911    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
912        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
913
914        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
915            maxDurationUs = fetcherDurationUs;
916        }
917    }
918
919    *durationUs = maxDurationUs;
920
921    return OK;
922}
923
924bool LiveSession::isSeekable() const {
925    int64_t durationUs;
926    return getDuration(&durationUs) == OK && durationUs >= 0;
927}
928
929bool LiveSession::hasDynamicDuration() const {
930    return false;
931}
932
933status_t LiveSession::getTrackInfo(Parcel *reply) const {
934    return mPlaylist->getTrackInfo(reply);
935}
936
937status_t LiveSession::selectTrack(size_t index, bool select) {
938    status_t err = mPlaylist->selectTrack(index, select);
939    if (err == OK) {
940        (new AMessage(kWhatChangeConfiguration, id()))->post();
941    }
942    return err;
943}
944
945bool LiveSession::canSwitchUp() {
946    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
947    status_t err = OK;
948    for (size_t i = 0; i < mPacketSources.size(); ++i) {
949        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
950        int64_t dur = source->getBufferedDurationUs(&err);
951        if (err == OK && dur > 10000000) {
952            return true;
953        }
954    }
955    return false;
956}
957
958void LiveSession::changeConfiguration(
959        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
960    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
961    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
962    cancelBandwidthSwitch();
963
964    CHECK(!mReconfigurationInProgress);
965    mReconfigurationInProgress = true;
966
967    mPrevBandwidthIndex = bandwidthIndex;
968
969    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
970          timeUs, bandwidthIndex, pickTrack);
971
972    if (pickTrack) {
973        mPlaylist->pickRandomMediaItems();
974    }
975
976    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
977    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
978
979    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
980    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
981
982    AString URIs[kMaxStreams];
983    for (size_t i = 0; i < kMaxStreams; ++i) {
984        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
985            streamMask |= indexToType(i);
986        }
987    }
988
989    // Step 1, stop and discard fetchers that are no longer needed.
990    // Pause those that we'll reuse.
991    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
992        const AString &uri = mFetcherInfos.keyAt(i);
993
994        bool discardFetcher = true;
995
996        // If we're seeking all current fetchers are discarded.
997        if (timeUs < 0ll) {
998            // delay fetcher removal
999            discardFetcher = false;
1000
1001            for (size_t j = 0; j < kMaxStreams; ++j) {
1002                StreamType type = indexToType(j);
1003                if ((streamMask & type) && uri == URIs[j]) {
1004                    resumeMask |= type;
1005                    streamMask &= ~type;
1006                }
1007            }
1008        }
1009
1010        if (discardFetcher) {
1011            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1012        } else {
1013            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1014        }
1015    }
1016
1017    sp<AMessage> msg;
1018    if (timeUs < 0ll) {
1019        // skip onChangeConfiguration2 (decoder destruction) if switching.
1020        msg = new AMessage(kWhatChangeConfiguration3, id());
1021    } else {
1022        msg = new AMessage(kWhatChangeConfiguration2, id());
1023    }
1024    msg->setInt32("streamMask", streamMask);
1025    msg->setInt32("resumeMask", resumeMask);
1026    msg->setInt64("timeUs", timeUs);
1027    for (size_t i = 0; i < kMaxStreams; ++i) {
1028        if (streamMask & indexToType(i)) {
1029            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1030        }
1031    }
1032
1033    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1034    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1035    // fetchers have completed their asynchronous operation, we'll post
1036    // mContinuation, which then is handled below in onChangeConfiguration2.
1037    mContinuationCounter = mFetcherInfos.size();
1038    mContinuation = msg;
1039
1040    if (mContinuationCounter == 0) {
1041        msg->post();
1042
1043        if (mSeekReplyID != 0) {
1044            CHECK(mSeekReply != NULL);
1045            mSeekReply->postReply(mSeekReplyID);
1046        }
1047    }
1048}
1049
1050void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1051    if (!mReconfigurationInProgress) {
1052        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
1053    } else {
1054        msg->post(1000000ll); // retry in 1 sec
1055    }
1056}
1057
1058void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1059    mContinuation.clear();
1060
1061    // All fetchers are either suspended or have been removed now.
1062
1063    uint32_t streamMask;
1064    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1065
1066    AString URIs[kMaxStreams];
1067    for (size_t i = 0; i < kMaxStreams; ++i) {
1068        if (streamMask & indexToType(i)) {
1069            const AString &uriKey = mStreams[i].uriKey();
1070            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1071            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1072        }
1073    }
1074
1075    // Determine which decoders to shutdown on the player side,
1076    // a decoder has to be shutdown if either
1077    // 1) its streamtype was active before but now longer isn't.
1078    // or
1079    // 2) its streamtype was already active and still is but the URI
1080    //    has changed.
1081    uint32_t changedMask = 0;
1082    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1083        if (((mStreamMask & streamMask & indexToType(i))
1084                && !(URIs[i] == mStreams[i].mUri))
1085                || (mStreamMask & ~streamMask & indexToType(i))) {
1086            changedMask |= indexToType(i);
1087        }
1088    }
1089
1090    if (changedMask == 0) {
1091        // If nothing changed as far as the audio/video decoders
1092        // are concerned we can proceed.
1093        onChangeConfiguration3(msg);
1094        return;
1095    }
1096
1097    // Something changed, inform the player which will shutdown the
1098    // corresponding decoders and will post the reply once that's done.
1099    // Handling the reply will continue executing below in
1100    // onChangeConfiguration3.
1101    sp<AMessage> notify = mNotify->dup();
1102    notify->setInt32("what", kWhatStreamsChanged);
1103    notify->setInt32("changedMask", changedMask);
1104
1105    msg->setWhat(kWhatChangeConfiguration3);
1106    msg->setTarget(id());
1107
1108    notify->setMessage("reply", msg);
1109    notify->post();
1110}
1111
1112void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1113    mContinuation.clear();
1114    // All remaining fetchers are still suspended, the player has shutdown
1115    // any decoders that needed it.
1116
1117    uint32_t streamMask, resumeMask;
1118    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1119    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1120
1121    for (size_t i = 0; i < kMaxStreams; ++i) {
1122        if (streamMask & indexToType(i)) {
1123            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1124        }
1125    }
1126
1127    int64_t timeUs;
1128    bool switching = false;
1129    CHECK(msg->findInt64("timeUs", &timeUs));
1130
1131    if (timeUs < 0ll) {
1132        timeUs = mLastDequeuedTimeUs;
1133        switching = true;
1134    }
1135    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1136
1137    mNewStreamMask = streamMask;
1138
1139    // Of all existing fetchers:
1140    // * Resume fetchers that are still needed and assign them original packet sources.
1141    // * Mark otherwise unneeded fetchers for removal.
1142    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1143    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1144        const AString &uri = mFetcherInfos.keyAt(i);
1145
1146        sp<AnotherPacketSource> sources[kMaxStreams];
1147        for (size_t j = 0; j < kMaxStreams; ++j) {
1148            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1149                sources[j] = mPacketSources.valueFor(indexToType(j));
1150            }
1151        }
1152
1153        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1154        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1155                || sources[kSubtitleIndex] != NULL) {
1156            info.mFetcher->startAsync(
1157                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1158        } else {
1159            info.mToBeRemoved = true;
1160        }
1161    }
1162
1163    // streamMask now only contains the types that need a new fetcher created.
1164
1165    if (streamMask != 0) {
1166        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1167    }
1168
1169    // Find out when the original fetchers have buffered up to and start the new fetchers
1170    // at a later timestamp.
1171    for (size_t i = 0; i < kMaxStreams; i++) {
1172        if (!(indexToType(i) & streamMask)) {
1173            continue;
1174        }
1175
1176        AString uri;
1177        uri = mStreams[i].mUri;
1178
1179        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1180        CHECK(fetcher != NULL);
1181
1182        int32_t latestSeq = -1;
1183        int64_t latestTimeUs = 0ll;
1184        sp<AnotherPacketSource> sources[kMaxStreams];
1185
1186        // TRICKY: looping from i as earlier streams are already removed from streamMask
1187        for (size_t j = i; j < kMaxStreams; ++j) {
1188            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1189                sources[j] = mPacketSources.valueFor(indexToType(j));
1190
1191                if (!switching) {
1192                    sources[j]->clear();
1193                } else {
1194                    int32_t type, seq;
1195                    int64_t srcTimeUs;
1196                    sp<AMessage> meta = sources[j]->getLatestMeta();
1197
1198                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1199                        CHECK(meta->findInt32("seq", &seq));
1200                        if (seq > latestSeq) {
1201                            latestSeq = seq;
1202                        }
1203                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
1204                        if (srcTimeUs > latestTimeUs) {
1205                            latestTimeUs = srcTimeUs;
1206                        }
1207                    }
1208
1209                    sources[j] = mPacketSources2.valueFor(indexToType(j));
1210                    sources[j]->clear();
1211                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1212                    if (extraStreams & indexToType(j)) {
1213                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
1214                    }
1215                }
1216
1217                streamMask &= ~indexToType(j);
1218            }
1219        }
1220
1221        fetcher->startAsync(
1222                sources[kAudioIndex],
1223                sources[kVideoIndex],
1224                sources[kSubtitleIndex],
1225                timeUs,
1226                latestTimeUs /* min start time(us) */,
1227                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
1228    }
1229
1230    // All fetchers have now been started, the configuration change
1231    // has completed.
1232
1233    scheduleCheckBandwidthEvent();
1234
1235    ALOGV("XXX configuration change completed.");
1236    mReconfigurationInProgress = false;
1237    if (switching) {
1238        mSwitchInProgress = true;
1239    } else {
1240        mStreamMask = mNewStreamMask;
1241    }
1242
1243    if (mDisconnectReplyID != 0) {
1244        finishDisconnect();
1245    }
1246}
1247
1248void LiveSession::onSwapped(const sp<AMessage> &msg) {
1249    int32_t switchGeneration;
1250    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1251    if (switchGeneration != mSwitchGeneration) {
1252        return;
1253    }
1254
1255    int32_t stream;
1256    CHECK(msg->findInt32("stream", &stream));
1257    mSwapMask |= stream;
1258    if (mSwapMask != mStreamMask) {
1259        return;
1260    }
1261
1262    // Check if new variant contains extra streams.
1263    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1264    while (extraStreams) {
1265        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1266        swapPacketSource(extraStream);
1267        extraStreams &= ~extraStream;
1268    }
1269
1270    tryToFinishBandwidthSwitch();
1271}
1272
1273// Mark switch done when:
1274//   1. all old buffers are swapped out, AND
1275//   2. all old fetchers are removed.
1276void LiveSession::tryToFinishBandwidthSwitch() {
1277    bool needToRemoveFetchers = false;
1278    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1279        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1280            needToRemoveFetchers = true;
1281            break;
1282        }
1283    }
1284    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
1285        mStreamMask = mNewStreamMask;
1286        mSwitchInProgress = false;
1287        mSwapMask = 0;
1288    }
1289}
1290
1291void LiveSession::scheduleCheckBandwidthEvent() {
1292    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1293    msg->setInt32("generation", mCheckBandwidthGeneration);
1294    msg->post(10000000ll);
1295}
1296
1297void LiveSession::cancelCheckBandwidthEvent() {
1298    ++mCheckBandwidthGeneration;
1299}
1300
1301void LiveSession::cancelBandwidthSwitch() {
1302    Mutex::Autolock lock(mSwapMutex);
1303    mSwitchGeneration++;
1304    mSwitchInProgress = false;
1305    mSwapMask = 0;
1306}
1307
1308bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1309    if (mReconfigurationInProgress || mSwitchInProgress) {
1310        return false;
1311    }
1312
1313    if (mPrevBandwidthIndex < 0) {
1314        return true;
1315    }
1316
1317    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
1318        return false;
1319    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
1320        return canSwitchUp();
1321    } else {
1322        return true;
1323    }
1324}
1325
1326void LiveSession::onCheckBandwidth() {
1327    size_t bandwidthIndex = getBandwidthIndex();
1328    if (canSwitchBandwidthTo(bandwidthIndex)) {
1329        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1330    } else {
1331        scheduleCheckBandwidthEvent();
1332    }
1333
1334    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1335    // schedule another one on return, only an explicit call to
1336    // scheduleCheckBandwidthEvent will do that.
1337    // This ensures that only one configuration change is ongoing at any
1338    // one time, once that completes it'll schedule another check bandwidth
1339    // event.
1340}
1341
1342void LiveSession::postPrepared(status_t err) {
1343    CHECK(mInPreparationPhase);
1344
1345    sp<AMessage> notify = mNotify->dup();
1346    if (err == OK || err == ERROR_END_OF_STREAM) {
1347        notify->setInt32("what", kWhatPrepared);
1348    } else {
1349        notify->setInt32("what", kWhatPreparationFailed);
1350        notify->setInt32("err", err);
1351    }
1352
1353    notify->post();
1354
1355    mInPreparationPhase = false;
1356}
1357
1358}  // namespace android
1359
1360