LiveSession.cpp revision ceeabe15f4e7bc73efdfcafed917202de9d515cb
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/stagefright/foundation/hexdump.h>
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/DataSource.h>
35#include <media/stagefright/FileSource.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/MetaData.h>
38#include <media/stagefright/Utils.h>
39
40#include <utils/Mutex.h>
41
42#include <ctype.h>
43#include <openssl/aes.h>
44#include <openssl/md5.h>
45
46namespace android {
47
48LiveSession::LiveSession(
49        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
50    : mNotify(notify),
51      mFlags(flags),
52      mUIDValid(uidValid),
53      mUID(uid),
54      mInPreparationPhase(true),
55      mHTTPDataSource(
56              HTTPBase::Create(
57                  (mFlags & kFlagIncognito)
58                    ? HTTPBase::kFlagIncognito
59                    : 0)),
60      mPrevBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0) {
72    if (mUIDValid) {
73        mHTTPDataSource->setUID(mUID);
74    }
75
76    mStreams[kAudioIndex] = StreamItem("audio");
77    mStreams[kVideoIndex] = StreamItem("video");
78    mStreams[kSubtitleIndex] = StreamItem("subtitle");
79
80    for (size_t i = 0; i < kMaxStreams; ++i) {
81        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
82        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
83    }
84}
85
86LiveSession::~LiveSession() {
87}
88
89sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
90    ABuffer *discontinuity = new ABuffer(0);
91    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
92    discontinuity->meta()->setInt32("swapPacketSource", swap);
93    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
94    discontinuity->meta()->setInt64("timeUs", -1);
95    return discontinuity;
96}
97
98void LiveSession::swapPacketSource(StreamType stream) {
99    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
100    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
101    sp<AnotherPacketSource> tmp = aps;
102    aps = aps2;
103    aps2 = tmp;
104    aps2->clear();
105}
106
107status_t LiveSession::dequeueAccessUnit(
108        StreamType stream, sp<ABuffer> *accessUnit) {
109    if (!(mStreamMask & stream)) {
110        // return -EWOULDBLOCK to avoid halting the decoder
111        // when switching between audio/video and audio only.
112        return -EWOULDBLOCK;
113    }
114
115    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
116
117    status_t finalResult;
118    if (!packetSource->hasBufferAvailable(&finalResult)) {
119        return finalResult == OK ? -EAGAIN : finalResult;
120    }
121
122    status_t err = packetSource->dequeueAccessUnit(accessUnit);
123
124    const char *streamStr;
125    switch (stream) {
126        case STREAMTYPE_AUDIO:
127            streamStr = "audio";
128            break;
129        case STREAMTYPE_VIDEO:
130            streamStr = "video";
131            break;
132        case STREAMTYPE_SUBTITLES:
133            streamStr = "subs";
134            break;
135        default:
136            TRESPASS();
137    }
138
139    if (err == INFO_DISCONTINUITY) {
140        int32_t type;
141        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
142
143        sp<AMessage> extra;
144        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
145            extra.clear();
146        }
147
148        ALOGI("[%s] read discontinuity of type %d, extra = %s",
149              streamStr,
150              type,
151              extra == NULL ? "NULL" : extra->debugString().c_str());
152
153        int32_t swap;
154        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
155                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
156                && swap) {
157
158            int32_t switchGeneration;
159            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
160            {
161                Mutex::Autolock lock(mSwapMutex);
162                if (switchGeneration == mSwitchGeneration) {
163                    swapPacketSource(stream);
164                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
165                    msg->setInt32("stream", stream);
166                    msg->setInt32("switchGeneration", switchGeneration);
167                    msg->post();
168                }
169            }
170        }
171    } else if (err == OK) {
172        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
173            int64_t timeUs;
174            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
175            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
176
177            mLastDequeuedTimeUs = timeUs;
178            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
179        } else if (stream == STREAMTYPE_SUBTITLES) {
180            (*accessUnit)->meta()->setInt32(
181                    "trackIndex", mPlaylist->getSelectedIndex());
182            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
183        }
184    } else {
185        ALOGI("[%s] encountered error %d", streamStr, err);
186    }
187
188    return err;
189}
190
191status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
192    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
193    if (!(mStreamMask & stream)) {
194        return UNKNOWN_ERROR;
195    }
196
197    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
198
199    sp<MetaData> meta = packetSource->getFormat();
200
201    if (meta == NULL) {
202        return -EAGAIN;
203    }
204
205    return convertMetaDataToMessage(meta, format);
206}
207
208void LiveSession::connectAsync(
209        const char *url, const KeyedVector<String8, String8> *headers) {
210    sp<AMessage> msg = new AMessage(kWhatConnect, id());
211    msg->setString("url", url);
212
213    if (headers != NULL) {
214        msg->setPointer(
215                "headers",
216                new KeyedVector<String8, String8>(*headers));
217    }
218
219    msg->post();
220}
221
222status_t LiveSession::disconnect() {
223    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
224
225    sp<AMessage> response;
226    status_t err = msg->postAndAwaitResponse(&response);
227
228    return err;
229}
230
231status_t LiveSession::seekTo(int64_t timeUs) {
232    sp<AMessage> msg = new AMessage(kWhatSeek, id());
233    msg->setInt64("timeUs", timeUs);
234
235    sp<AMessage> response;
236    status_t err = msg->postAndAwaitResponse(&response);
237
238    uint32_t replyID;
239    CHECK(response == mSeekReply && 0 != mSeekReplyID);
240    mSeekReply.clear();
241    mSeekReplyID = 0;
242    return err;
243}
244
245void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
246    switch (msg->what()) {
247        case kWhatConnect:
248        {
249            onConnect(msg);
250            break;
251        }
252
253        case kWhatDisconnect:
254        {
255            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
256
257            if (mReconfigurationInProgress) {
258                break;
259            }
260
261            finishDisconnect();
262            break;
263        }
264
265        case kWhatSeek:
266        {
267            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
268
269            status_t err = onSeek(msg);
270
271            mSeekReply = new AMessage;
272            mSeekReply->setInt32("err", err);
273            break;
274        }
275
276        case kWhatFetcherNotify:
277        {
278            int32_t what;
279            CHECK(msg->findInt32("what", &what));
280
281            switch (what) {
282                case PlaylistFetcher::kWhatStarted:
283                    break;
284                case PlaylistFetcher::kWhatPaused:
285                case PlaylistFetcher::kWhatStopped:
286                {
287                    if (what == PlaylistFetcher::kWhatStopped) {
288                        AString uri;
289                        CHECK(msg->findString("uri", &uri));
290                        if (mFetcherInfos.removeItem(uri) < 0) {
291                            // ignore duplicated kWhatStopped messages.
292                            break;
293                        }
294
295                        tryToFinishBandwidthSwitch();
296                    }
297
298                    if (mContinuation != NULL) {
299                        CHECK_GT(mContinuationCounter, 0);
300                        if (--mContinuationCounter == 0) {
301                            mContinuation->post();
302
303                            if (mSeekReplyID != 0) {
304                                CHECK(mSeekReply != NULL);
305                                mSeekReply->postReply(mSeekReplyID);
306                            }
307                        }
308                    }
309                    break;
310                }
311
312                case PlaylistFetcher::kWhatDurationUpdate:
313                {
314                    AString uri;
315                    CHECK(msg->findString("uri", &uri));
316
317                    int64_t durationUs;
318                    CHECK(msg->findInt64("durationUs", &durationUs));
319
320                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
321                    info->mDurationUs = durationUs;
322                    break;
323                }
324
325                case PlaylistFetcher::kWhatError:
326                {
327                    status_t err;
328                    CHECK(msg->findInt32("err", &err));
329
330                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
331
332                    if (mInPreparationPhase) {
333                        postPrepared(err);
334                    }
335
336                    cancelBandwidthSwitch();
337
338                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
339
340                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
341
342                    mPacketSources.valueFor(
343                            STREAMTYPE_SUBTITLES)->signalEOS(err);
344
345                    sp<AMessage> notify = mNotify->dup();
346                    notify->setInt32("what", kWhatError);
347                    notify->setInt32("err", err);
348                    notify->post();
349                    break;
350                }
351
352                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
353                {
354                    AString uri;
355                    CHECK(msg->findString("uri", &uri));
356
357                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
358                    info->mIsPrepared = true;
359
360                    if (mInPreparationPhase) {
361                        bool allFetchersPrepared = true;
362                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
363                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
364                                allFetchersPrepared = false;
365                                break;
366                            }
367                        }
368
369                        if (allFetchersPrepared) {
370                            postPrepared(OK);
371                        }
372                    }
373                    break;
374                }
375
376                case PlaylistFetcher::kWhatStartedAt:
377                {
378                    int32_t switchGeneration;
379                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
380
381                    if (switchGeneration != mSwitchGeneration) {
382                        break;
383                    }
384
385                    // Resume fetcher for the original variant; the resumed fetcher should
386                    // continue until the timestamps found in msg, which is stored by the
387                    // new fetcher to indicate where the new variant has started buffering.
388                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
389                        const FetcherInfo info = mFetcherInfos.valueAt(i);
390                        if (info.mToBeRemoved) {
391                            info.mFetcher->resumeUntilAsync(msg);
392                        }
393                    }
394                    break;
395                }
396
397                default:
398                    TRESPASS();
399            }
400
401            break;
402        }
403
404        case kWhatCheckBandwidth:
405        {
406            int32_t generation;
407            CHECK(msg->findInt32("generation", &generation));
408
409            if (generation != mCheckBandwidthGeneration) {
410                break;
411            }
412
413            onCheckBandwidth();
414            break;
415        }
416
417        case kWhatChangeConfiguration:
418        {
419            onChangeConfiguration(msg);
420            break;
421        }
422
423        case kWhatChangeConfiguration2:
424        {
425            onChangeConfiguration2(msg);
426            break;
427        }
428
429        case kWhatChangeConfiguration3:
430        {
431            onChangeConfiguration3(msg);
432            break;
433        }
434
435        case kWhatFinishDisconnect2:
436        {
437            onFinishDisconnect2();
438            break;
439        }
440
441        case kWhatSwapped:
442        {
443            onSwapped(msg);
444            break;
445        }
446        default:
447            TRESPASS();
448            break;
449    }
450}
451
452// static
453int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
454    if (a->mBandwidth < b->mBandwidth) {
455        return -1;
456    } else if (a->mBandwidth == b->mBandwidth) {
457        return 0;
458    }
459
460    return 1;
461}
462
463// static
464LiveSession::StreamType LiveSession::indexToType(int idx) {
465    CHECK(idx >= 0 && idx < kMaxStreams);
466    return (StreamType)(1 << idx);
467}
468
469void LiveSession::onConnect(const sp<AMessage> &msg) {
470    AString url;
471    CHECK(msg->findString("url", &url));
472
473    KeyedVector<String8, String8> *headers = NULL;
474    if (!msg->findPointer("headers", (void **)&headers)) {
475        mExtraHeaders.clear();
476    } else {
477        mExtraHeaders = *headers;
478
479        delete headers;
480        headers = NULL;
481    }
482
483#if 1
484    ALOGI("onConnect <URL suppressed>");
485#else
486    ALOGI("onConnect %s", url.c_str());
487#endif
488
489    mMasterURL = url;
490
491    bool dummy;
492    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
493
494    if (mPlaylist == NULL) {
495        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
496
497        postPrepared(ERROR_IO);
498        return;
499    }
500
501    // We trust the content provider to make a reasonable choice of preferred
502    // initial bandwidth by listing it first in the variant playlist.
503    // At startup we really don't have a good estimate on the available
504    // network bandwidth since we haven't tranferred any data yet. Once
505    // we have we can make a better informed choice.
506    size_t initialBandwidth = 0;
507    size_t initialBandwidthIndex = 0;
508
509    if (mPlaylist->isVariantPlaylist()) {
510        for (size_t i = 0; i < mPlaylist->size(); ++i) {
511            BandwidthItem item;
512
513            item.mPlaylistIndex = i;
514
515            sp<AMessage> meta;
516            AString uri;
517            mPlaylist->itemAt(i, &uri, &meta);
518
519            unsigned long bandwidth;
520            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
521
522            if (initialBandwidth == 0) {
523                initialBandwidth = item.mBandwidth;
524            }
525
526            mBandwidthItems.push(item);
527        }
528
529        CHECK_GT(mBandwidthItems.size(), 0u);
530
531        mBandwidthItems.sort(SortByBandwidth);
532
533        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
534            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
535                initialBandwidthIndex = i;
536                break;
537            }
538        }
539    } else {
540        // dummy item.
541        BandwidthItem item;
542        item.mPlaylistIndex = 0;
543        item.mBandwidth = 0;
544        mBandwidthItems.push(item);
545    }
546
547    changeConfiguration(
548            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
549}
550
551void LiveSession::finishDisconnect() {
552    // No reconfiguration is currently pending, make sure none will trigger
553    // during disconnection either.
554    cancelCheckBandwidthEvent();
555
556    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
557    // (finishDisconnect, onFinishDisconnect2)
558    cancelBandwidthSwitch();
559
560    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
561        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
562    }
563
564    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
565
566    mContinuationCounter = mFetcherInfos.size();
567    mContinuation = msg;
568
569    if (mContinuationCounter == 0) {
570        msg->post();
571    }
572}
573
574void LiveSession::onFinishDisconnect2() {
575    mContinuation.clear();
576
577    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
578    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
579
580    mPacketSources.valueFor(
581            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
582
583    sp<AMessage> response = new AMessage;
584    response->setInt32("err", OK);
585
586    response->postReply(mDisconnectReplyID);
587    mDisconnectReplyID = 0;
588}
589
590sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
591    ssize_t index = mFetcherInfos.indexOfKey(uri);
592
593    if (index >= 0) {
594        return NULL;
595    }
596
597    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
598    notify->setString("uri", uri);
599    notify->setInt32("switchGeneration", mSwitchGeneration);
600
601    FetcherInfo info;
602    info.mFetcher = new PlaylistFetcher(notify, this, uri);
603    info.mDurationUs = -1ll;
604    info.mIsPrepared = false;
605    info.mToBeRemoved = false;
606    looper()->registerHandler(info.mFetcher);
607
608    mFetcherInfos.add(uri, info);
609
610    return info.mFetcher;
611}
612
613/*
614 * Illustration of parameters:
615 *
616 * 0      `range_offset`
617 * +------------+-------------------------------------------------------+--+--+
618 * |            |                                 | next block to fetch |  |  |
619 * |            | `source` handle => `out` buffer |                     |  |  |
620 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
621 * |            |<----------- `range_length` / buffer capacity ----------->|  |
622 * |<------------------------------ file_size ------------------------------->|
623 *
624 * Special parameter values:
625 * - range_length == -1 means entire file
626 * - block_size == 0 means entire range
627 *
628 */
629status_t LiveSession::fetchFile(
630        const char *url, sp<ABuffer> *out,
631        int64_t range_offset, int64_t range_length,
632        uint32_t block_size, /* download block size */
633        sp<DataSource> *source, /* to return and reuse source */
634        String8 *actualUrl) {
635    off64_t size;
636    sp<DataSource> temp_source;
637    if (source == NULL) {
638        source = &temp_source;
639    }
640
641    if (*source == NULL) {
642        if (!strncasecmp(url, "file://", 7)) {
643            *source = new FileSource(url + 7);
644        } else if (strncasecmp(url, "http://", 7)
645                && strncasecmp(url, "https://", 8)) {
646            return ERROR_UNSUPPORTED;
647        } else {
648            KeyedVector<String8, String8> headers = mExtraHeaders;
649            if (range_offset > 0 || range_length >= 0) {
650                headers.add(
651                        String8("Range"),
652                        String8(
653                            StringPrintf(
654                                "bytes=%lld-%s",
655                                range_offset,
656                                range_length < 0
657                                    ? "" : StringPrintf("%lld",
658                                            range_offset + range_length - 1).c_str()).c_str()));
659            }
660            status_t err = mHTTPDataSource->connect(url, &headers);
661
662            if (err != OK) {
663                return err;
664            }
665
666            *source = mHTTPDataSource;
667        }
668    }
669
670    status_t getSizeErr = (*source)->getSize(&size);
671    if (getSizeErr != OK) {
672        size = 65536;
673    }
674
675    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
676    if (*out == NULL) {
677        buffer->setRange(0, 0);
678    }
679
680    // adjust range_length if only reading partial block
681    if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
682        range_length = buffer->size() + block_size;
683    }
684    for (;;) {
685        // Only resize when we don't know the size.
686        size_t bufferRemaining = buffer->capacity() - buffer->size();
687        if (bufferRemaining == 0 && getSizeErr != OK) {
688            bufferRemaining = 32768;
689
690            ALOGV("increasing download buffer to %d bytes",
691                 buffer->size() + bufferRemaining);
692
693            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
694            memcpy(copy->data(), buffer->data(), buffer->size());
695            copy->setRange(0, buffer->size());
696
697            buffer = copy;
698        }
699
700        size_t maxBytesToRead = bufferRemaining;
701        if (range_length >= 0) {
702            int64_t bytesLeftInRange = range_length - buffer->size();
703            if (bytesLeftInRange < maxBytesToRead) {
704                maxBytesToRead = bytesLeftInRange;
705
706                if (bytesLeftInRange == 0) {
707                    break;
708                }
709            }
710        }
711
712        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
713        // to help us break out of the loop.
714        ssize_t n = (*source)->readAt(
715                buffer->size(), buffer->data() + buffer->size(),
716                maxBytesToRead);
717
718        if (n < 0) {
719            return n;
720        }
721
722        if (n == 0) {
723            break;
724        }
725
726        buffer->setRange(0, buffer->size() + (size_t)n);
727    }
728
729    *out = buffer;
730    if (actualUrl != NULL) {
731        *actualUrl = (*source)->getUri();
732        if (actualUrl->isEmpty()) {
733            *actualUrl = url;
734        }
735    }
736
737    return OK;
738}
739
740sp<M3UParser> LiveSession::fetchPlaylist(
741        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
742    ALOGV("fetchPlaylist '%s'", url);
743
744    *unchanged = false;
745
746    sp<ABuffer> buffer;
747    String8 actualUrl;
748    status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
749
750    if (err != OK) {
751        return NULL;
752    }
753
754    // MD5 functionality is not available on the simulator, treat all
755    // playlists as changed.
756
757#if defined(HAVE_ANDROID_OS)
758    uint8_t hash[16];
759
760    MD5_CTX m;
761    MD5_Init(&m);
762    MD5_Update(&m, buffer->data(), buffer->size());
763
764    MD5_Final(hash, &m);
765
766    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
767        // playlist unchanged
768        *unchanged = true;
769
770        ALOGV("Playlist unchanged, refresh state is now %d",
771             (int)mRefreshState);
772
773        return NULL;
774    }
775
776    if (curPlaylistHash != NULL) {
777        memcpy(curPlaylistHash, hash, sizeof(hash));
778    }
779#endif
780
781    sp<M3UParser> playlist =
782        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
783
784    if (playlist->initCheck() != OK) {
785        ALOGE("failed to parse .m3u8 playlist");
786
787        return NULL;
788    }
789
790    return playlist;
791}
792
793static double uniformRand() {
794    return (double)rand() / RAND_MAX;
795}
796
797size_t LiveSession::getBandwidthIndex() {
798    if (mBandwidthItems.size() == 0) {
799        return 0;
800    }
801
802#if 1
803    char value[PROPERTY_VALUE_MAX];
804    ssize_t index = -1;
805    if (property_get("media.httplive.bw-index", value, NULL)) {
806        char *end;
807        index = strtol(value, &end, 10);
808        CHECK(end > value && *end == '\0');
809
810        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
811            index = mBandwidthItems.size() - 1;
812        }
813    }
814
815    if (index < 0) {
816        int32_t bandwidthBps;
817        if (mHTTPDataSource != NULL
818                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
819            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
820        } else {
821            ALOGV("no bandwidth estimate.");
822            return 0;  // Pick the lowest bandwidth stream by default.
823        }
824
825        char value[PROPERTY_VALUE_MAX];
826        if (property_get("media.httplive.max-bw", value, NULL)) {
827            char *end;
828            long maxBw = strtoul(value, &end, 10);
829            if (end > value && *end == '\0') {
830                if (maxBw > 0 && bandwidthBps > maxBw) {
831                    ALOGV("bandwidth capped to %ld bps", maxBw);
832                    bandwidthBps = maxBw;
833                }
834            }
835        }
836
837        // Consider only 80% of the available bandwidth usable.
838        bandwidthBps = (bandwidthBps * 8) / 10;
839
840        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
841
842        index = mBandwidthItems.size() - 1;
843        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
844                                > (size_t)bandwidthBps) {
845            --index;
846        }
847    }
848#elif 0
849    // Change bandwidth at random()
850    size_t index = uniformRand() * mBandwidthItems.size();
851#elif 0
852    // There's a 50% chance to stay on the current bandwidth and
853    // a 50% chance to switch to the next higher bandwidth (wrapping around
854    // to lowest)
855    const size_t kMinIndex = 0;
856
857    static ssize_t mPrevBandwidthIndex = -1;
858
859    size_t index;
860    if (mPrevBandwidthIndex < 0) {
861        index = kMinIndex;
862    } else if (uniformRand() < 0.5) {
863        index = (size_t)mPrevBandwidthIndex;
864    } else {
865        index = mPrevBandwidthIndex + 1;
866        if (index == mBandwidthItems.size()) {
867            index = kMinIndex;
868        }
869    }
870    mPrevBandwidthIndex = index;
871#elif 0
872    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
873
874    size_t index = mBandwidthItems.size() - 1;
875    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
876        --index;
877    }
878#elif 1
879    char value[PROPERTY_VALUE_MAX];
880    size_t index;
881    if (property_get("media.httplive.bw-index", value, NULL)) {
882        char *end;
883        index = strtoul(value, &end, 10);
884        CHECK(end > value && *end == '\0');
885
886        if (index >= mBandwidthItems.size()) {
887            index = mBandwidthItems.size() - 1;
888        }
889    } else {
890        index = 0;
891    }
892#else
893    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
894#endif
895
896    CHECK_GE(index, 0);
897
898    return index;
899}
900
901status_t LiveSession::onSeek(const sp<AMessage> &msg) {
902    int64_t timeUs;
903    CHECK(msg->findInt64("timeUs", &timeUs));
904
905    if (!mReconfigurationInProgress) {
906        changeConfiguration(timeUs, getBandwidthIndex());
907    }
908
909    return OK;
910}
911
912status_t LiveSession::getDuration(int64_t *durationUs) const {
913    int64_t maxDurationUs = 0ll;
914    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
915        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
916
917        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
918            maxDurationUs = fetcherDurationUs;
919        }
920    }
921
922    *durationUs = maxDurationUs;
923
924    return OK;
925}
926
927bool LiveSession::isSeekable() const {
928    int64_t durationUs;
929    return getDuration(&durationUs) == OK && durationUs >= 0;
930}
931
932bool LiveSession::hasDynamicDuration() const {
933    return false;
934}
935
936status_t LiveSession::getTrackInfo(Parcel *reply) const {
937    return mPlaylist->getTrackInfo(reply);
938}
939
940status_t LiveSession::selectTrack(size_t index, bool select) {
941    status_t err = mPlaylist->selectTrack(index, select);
942    if (err == OK) {
943        (new AMessage(kWhatChangeConfiguration, id()))->post();
944    }
945    return err;
946}
947
948bool LiveSession::canSwitchUp() {
949    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
950    status_t err = OK;
951    for (size_t i = 0; i < mPacketSources.size(); ++i) {
952        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
953        int64_t dur = source->getBufferedDurationUs(&err);
954        if (err == OK && dur > 10000000) {
955            return true;
956        }
957    }
958    return false;
959}
960
961void LiveSession::changeConfiguration(
962        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
963    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
964    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
965    cancelBandwidthSwitch();
966
967    CHECK(!mReconfigurationInProgress);
968    mReconfigurationInProgress = true;
969
970    mPrevBandwidthIndex = bandwidthIndex;
971
972    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
973          timeUs, bandwidthIndex, pickTrack);
974
975    if (pickTrack) {
976        mPlaylist->pickRandomMediaItems();
977    }
978
979    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
980    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
981
982    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
983    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
984
985    AString URIs[kMaxStreams];
986    for (size_t i = 0; i < kMaxStreams; ++i) {
987        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
988            streamMask |= indexToType(i);
989        }
990    }
991
992    // Step 1, stop and discard fetchers that are no longer needed.
993    // Pause those that we'll reuse.
994    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
995        const AString &uri = mFetcherInfos.keyAt(i);
996
997        bool discardFetcher = true;
998
999        // If we're seeking all current fetchers are discarded.
1000        if (timeUs < 0ll) {
1001            // delay fetcher removal
1002            discardFetcher = false;
1003
1004            for (size_t j = 0; j < kMaxStreams; ++j) {
1005                StreamType type = indexToType(j);
1006                if ((streamMask & type) && uri == URIs[j]) {
1007                    resumeMask |= type;
1008                    streamMask &= ~type;
1009                }
1010            }
1011        }
1012
1013        if (discardFetcher) {
1014            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1015        } else {
1016            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1017        }
1018    }
1019
1020    sp<AMessage> msg;
1021    if (timeUs < 0ll) {
1022        // skip onChangeConfiguration2 (decoder destruction) if switching.
1023        msg = new AMessage(kWhatChangeConfiguration3, id());
1024    } else {
1025        msg = new AMessage(kWhatChangeConfiguration2, id());
1026    }
1027    msg->setInt32("streamMask", streamMask);
1028    msg->setInt32("resumeMask", resumeMask);
1029    msg->setInt64("timeUs", timeUs);
1030    for (size_t i = 0; i < kMaxStreams; ++i) {
1031        if (streamMask & indexToType(i)) {
1032            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1033        }
1034    }
1035
1036    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1037    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1038    // fetchers have completed their asynchronous operation, we'll post
1039    // mContinuation, which then is handled below in onChangeConfiguration2.
1040    mContinuationCounter = mFetcherInfos.size();
1041    mContinuation = msg;
1042
1043    if (mContinuationCounter == 0) {
1044        msg->post();
1045
1046        if (mSeekReplyID != 0) {
1047            CHECK(mSeekReply != NULL);
1048            mSeekReply->postReply(mSeekReplyID);
1049        }
1050    }
1051}
1052
1053void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1054    if (!mReconfigurationInProgress) {
1055        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
1056    } else {
1057        msg->post(1000000ll); // retry in 1 sec
1058    }
1059}
1060
1061void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1062    mContinuation.clear();
1063
1064    // All fetchers are either suspended or have been removed now.
1065
1066    uint32_t streamMask;
1067    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1068
1069    AString URIs[kMaxStreams];
1070    for (size_t i = 0; i < kMaxStreams; ++i) {
1071        if (streamMask & indexToType(i)) {
1072            const AString &uriKey = mStreams[i].uriKey();
1073            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1074            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1075        }
1076    }
1077
1078    // Determine which decoders to shutdown on the player side,
1079    // a decoder has to be shutdown if either
1080    // 1) its streamtype was active before but now longer isn't.
1081    // or
1082    // 2) its streamtype was already active and still is but the URI
1083    //    has changed.
1084    uint32_t changedMask = 0;
1085    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1086        if (((mStreamMask & streamMask & indexToType(i))
1087                && !(URIs[i] == mStreams[i].mUri))
1088                || (mStreamMask & ~streamMask & indexToType(i))) {
1089            changedMask |= indexToType(i);
1090        }
1091    }
1092
1093    if (changedMask == 0) {
1094        // If nothing changed as far as the audio/video decoders
1095        // are concerned we can proceed.
1096        onChangeConfiguration3(msg);
1097        return;
1098    }
1099
1100    // Something changed, inform the player which will shutdown the
1101    // corresponding decoders and will post the reply once that's done.
1102    // Handling the reply will continue executing below in
1103    // onChangeConfiguration3.
1104    sp<AMessage> notify = mNotify->dup();
1105    notify->setInt32("what", kWhatStreamsChanged);
1106    notify->setInt32("changedMask", changedMask);
1107
1108    msg->setWhat(kWhatChangeConfiguration3);
1109    msg->setTarget(id());
1110
1111    notify->setMessage("reply", msg);
1112    notify->post();
1113}
1114
1115void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1116    mContinuation.clear();
1117    // All remaining fetchers are still suspended, the player has shutdown
1118    // any decoders that needed it.
1119
1120    uint32_t streamMask, resumeMask;
1121    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1122    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1123
1124    for (size_t i = 0; i < kMaxStreams; ++i) {
1125        if (streamMask & indexToType(i)) {
1126            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1127        }
1128    }
1129
1130    int64_t timeUs;
1131    bool switching = false;
1132    CHECK(msg->findInt64("timeUs", &timeUs));
1133
1134    if (timeUs < 0ll) {
1135        timeUs = mLastDequeuedTimeUs;
1136        switching = true;
1137    }
1138    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1139
1140    mNewStreamMask = streamMask;
1141
1142    // Of all existing fetchers:
1143    // * Resume fetchers that are still needed and assign them original packet sources.
1144    // * Mark otherwise unneeded fetchers for removal.
1145    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1146    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1147        const AString &uri = mFetcherInfos.keyAt(i);
1148
1149        sp<AnotherPacketSource> sources[kMaxStreams];
1150        for (size_t j = 0; j < kMaxStreams; ++j) {
1151            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1152                sources[j] = mPacketSources.valueFor(indexToType(j));
1153            }
1154        }
1155
1156        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1157        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1158                || sources[kSubtitleIndex] != NULL) {
1159            info.mFetcher->startAsync(
1160                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1161        } else {
1162            info.mToBeRemoved = true;
1163        }
1164    }
1165
1166    // streamMask now only contains the types that need a new fetcher created.
1167
1168    if (streamMask != 0) {
1169        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1170    }
1171
1172    // Find out when the original fetchers have buffered up to and start the new fetchers
1173    // at a later timestamp.
1174    for (size_t i = 0; i < kMaxStreams; i++) {
1175        if (!(indexToType(i) & streamMask)) {
1176            continue;
1177        }
1178
1179        AString uri;
1180        uri = mStreams[i].mUri;
1181
1182        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1183        CHECK(fetcher != NULL);
1184
1185        int32_t latestSeq = -1;
1186        int64_t latestTimeUs = 0ll;
1187        sp<AnotherPacketSource> sources[kMaxStreams];
1188
1189        // TRICKY: looping from i as earlier streams are already removed from streamMask
1190        for (size_t j = i; j < kMaxStreams; ++j) {
1191            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1192                sources[j] = mPacketSources.valueFor(indexToType(j));
1193
1194                if (!switching) {
1195                    sources[j]->clear();
1196                } else {
1197                    int32_t type, seq;
1198                    int64_t srcTimeUs;
1199                    sp<AMessage> meta = sources[j]->getLatestMeta();
1200
1201                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1202                        CHECK(meta->findInt32("seq", &seq));
1203                        if (seq > latestSeq) {
1204                            latestSeq = seq;
1205                        }
1206                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
1207                        if (srcTimeUs > latestTimeUs) {
1208                            latestTimeUs = srcTimeUs;
1209                        }
1210                    }
1211
1212                    sources[j] = mPacketSources2.valueFor(indexToType(j));
1213                    sources[j]->clear();
1214                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1215                    if (extraStreams & indexToType(j)) {
1216                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
1217                    }
1218                }
1219
1220                streamMask &= ~indexToType(j);
1221            }
1222        }
1223
1224        fetcher->startAsync(
1225                sources[kAudioIndex],
1226                sources[kVideoIndex],
1227                sources[kSubtitleIndex],
1228                timeUs,
1229                latestTimeUs /* min start time(us) */,
1230                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
1231    }
1232
1233    // All fetchers have now been started, the configuration change
1234    // has completed.
1235
1236    scheduleCheckBandwidthEvent();
1237
1238    ALOGV("XXX configuration change completed.");
1239    mReconfigurationInProgress = false;
1240    if (switching) {
1241        mSwitchInProgress = true;
1242    } else {
1243        mStreamMask = mNewStreamMask;
1244    }
1245
1246    if (mDisconnectReplyID != 0) {
1247        finishDisconnect();
1248    }
1249}
1250
1251void LiveSession::onSwapped(const sp<AMessage> &msg) {
1252    int32_t switchGeneration;
1253    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1254    if (switchGeneration != mSwitchGeneration) {
1255        return;
1256    }
1257
1258    int32_t stream;
1259    CHECK(msg->findInt32("stream", &stream));
1260    mSwapMask |= stream;
1261    if (mSwapMask != mStreamMask) {
1262        return;
1263    }
1264
1265    // Check if new variant contains extra streams.
1266    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1267    while (extraStreams) {
1268        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1269        swapPacketSource(extraStream);
1270        extraStreams &= ~extraStream;
1271    }
1272
1273    tryToFinishBandwidthSwitch();
1274}
1275
1276// Mark switch done when:
1277//   1. all old buffers are swapped out, AND
1278//   2. all old fetchers are removed.
1279void LiveSession::tryToFinishBandwidthSwitch() {
1280    bool needToRemoveFetchers = false;
1281    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1282        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1283            needToRemoveFetchers = true;
1284            break;
1285        }
1286    }
1287    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
1288        mStreamMask = mNewStreamMask;
1289        mSwitchInProgress = false;
1290        mSwapMask = 0;
1291    }
1292}
1293
1294void LiveSession::scheduleCheckBandwidthEvent() {
1295    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1296    msg->setInt32("generation", mCheckBandwidthGeneration);
1297    msg->post(10000000ll);
1298}
1299
1300void LiveSession::cancelCheckBandwidthEvent() {
1301    ++mCheckBandwidthGeneration;
1302}
1303
1304void LiveSession::cancelBandwidthSwitch() {
1305    Mutex::Autolock lock(mSwapMutex);
1306    mSwitchGeneration++;
1307    mSwitchInProgress = false;
1308    mSwapMask = 0;
1309}
1310
1311bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1312    if (mReconfigurationInProgress || mSwitchInProgress) {
1313        return false;
1314    }
1315
1316    if (mPrevBandwidthIndex < 0) {
1317        return true;
1318    }
1319
1320    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
1321        return false;
1322    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
1323        return canSwitchUp();
1324    } else {
1325        return true;
1326    }
1327}
1328
1329void LiveSession::onCheckBandwidth() {
1330    size_t bandwidthIndex = getBandwidthIndex();
1331    if (canSwitchBandwidthTo(bandwidthIndex)) {
1332        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1333    } else {
1334        scheduleCheckBandwidthEvent();
1335    }
1336
1337    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1338    // schedule another one on return, only an explicit call to
1339    // scheduleCheckBandwidthEvent will do that.
1340    // This ensures that only one configuration change is ongoing at any
1341    // one time, once that completes it'll schedule another check bandwidth
1342    // event.
1343}
1344
1345void LiveSession::postPrepared(status_t err) {
1346    CHECK(mInPreparationPhase);
1347
1348    sp<AMessage> notify = mNotify->dup();
1349    if (err == OK || err == ERROR_END_OF_STREAM) {
1350        notify->setInt32("what", kWhatPrepared);
1351    } else {
1352        notify->setInt32("what", kWhatPreparationFailed);
1353        notify->setInt32("err", err);
1354    }
1355
1356    notify->post();
1357
1358    mInPreparationPhase = false;
1359}
1360
1361}  // namespace android
1362
1363