LiveSession.cpp revision e4f25c280a8f1655c31a745978e0fcbc61f91dee
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <openssl/aes.h>
47#include <openssl/md5.h>
48
49namespace android {
50
51LiveSession::LiveSession(
52        const sp<AMessage> &notify, uint32_t flags,
53        const sp<IMediaHTTPService> &httpService)
54    : mNotify(notify),
55      mFlags(flags),
56      mHTTPService(httpService),
57      mInPreparationPhase(true),
58      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
59      mPrevBandwidthIndex(-1),
60      mStreamMask(0),
61      mNewStreamMask(0),
62      mSwapMask(0),
63      mCheckBandwidthGeneration(0),
64      mSwitchGeneration(0),
65      mLastDequeuedTimeUs(0ll),
66      mRealTimeBaseUs(0ll),
67      mReconfigurationInProgress(false),
68      mSwitchInProgress(false),
69      mDisconnectReplyID(0),
70      mSeekReplyID(0) {
71
72    mStreams[kAudioIndex] = StreamItem("audio");
73    mStreams[kVideoIndex] = StreamItem("video");
74    mStreams[kSubtitleIndex] = StreamItem("subtitles");
75
76    for (size_t i = 0; i < kMaxStreams; ++i) {
77        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
78        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
79    }
80}
81
82LiveSession::~LiveSession() {
83}
84
85sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
86    ABuffer *discontinuity = new ABuffer(0);
87    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
88    discontinuity->meta()->setInt32("swapPacketSource", swap);
89    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
90    discontinuity->meta()->setInt64("timeUs", -1);
91    return discontinuity;
92}
93
94void LiveSession::swapPacketSource(StreamType stream) {
95    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
96    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
97    sp<AnotherPacketSource> tmp = aps;
98    aps = aps2;
99    aps2 = tmp;
100    aps2->clear();
101}
102
103status_t LiveSession::dequeueAccessUnit(
104        StreamType stream, sp<ABuffer> *accessUnit) {
105    if (!(mStreamMask & stream)) {
106        // return -EWOULDBLOCK to avoid halting the decoder
107        // when switching between audio/video and audio only.
108        return -EWOULDBLOCK;
109    }
110
111    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
112
113    status_t finalResult;
114    if (!packetSource->hasBufferAvailable(&finalResult)) {
115        return finalResult == OK ? -EAGAIN : finalResult;
116    }
117
118    status_t err = packetSource->dequeueAccessUnit(accessUnit);
119
120    const char *streamStr;
121    switch (stream) {
122        case STREAMTYPE_AUDIO:
123            streamStr = "audio";
124            break;
125        case STREAMTYPE_VIDEO:
126            streamStr = "video";
127            break;
128        case STREAMTYPE_SUBTITLES:
129            streamStr = "subs";
130            break;
131        default:
132            TRESPASS();
133    }
134
135    if (err == INFO_DISCONTINUITY) {
136        int32_t type;
137        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
138
139        sp<AMessage> extra;
140        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
141            extra.clear();
142        }
143
144        ALOGI("[%s] read discontinuity of type %d, extra = %s",
145              streamStr,
146              type,
147              extra == NULL ? "NULL" : extra->debugString().c_str());
148
149        int32_t swap;
150        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
151                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
152                && swap) {
153
154            int32_t switchGeneration;
155            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
156            {
157                Mutex::Autolock lock(mSwapMutex);
158                if (switchGeneration == mSwitchGeneration) {
159                    swapPacketSource(stream);
160                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
161                    msg->setInt32("stream", stream);
162                    msg->setInt32("switchGeneration", switchGeneration);
163                    msg->post();
164                }
165            }
166        }
167    } else if (err == OK) {
168        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
169            int64_t timeUs;
170            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
171            ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
172
173            mLastDequeuedTimeUs = timeUs;
174            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
175        } else if (stream == STREAMTYPE_SUBTITLES) {
176            (*accessUnit)->meta()->setInt32(
177                    "trackIndex", mPlaylist->getSelectedIndex());
178            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
179        }
180    } else {
181        ALOGI("[%s] encountered error %d", streamStr, err);
182    }
183
184    return err;
185}
186
187status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
188    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
189    if (!(mStreamMask & stream)) {
190        return UNKNOWN_ERROR;
191    }
192
193    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
194
195    sp<MetaData> meta = packetSource->getFormat();
196
197    if (meta == NULL) {
198        return -EAGAIN;
199    }
200
201    return convertMetaDataToMessage(meta, format);
202}
203
204void LiveSession::connectAsync(
205        const char *url, const KeyedVector<String8, String8> *headers) {
206    sp<AMessage> msg = new AMessage(kWhatConnect, id());
207    msg->setString("url", url);
208
209    if (headers != NULL) {
210        msg->setPointer(
211                "headers",
212                new KeyedVector<String8, String8>(*headers));
213    }
214
215    msg->post();
216}
217
218status_t LiveSession::disconnect() {
219    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
220
221    sp<AMessage> response;
222    status_t err = msg->postAndAwaitResponse(&response);
223
224    return err;
225}
226
227status_t LiveSession::seekTo(int64_t timeUs) {
228    sp<AMessage> msg = new AMessage(kWhatSeek, id());
229    msg->setInt64("timeUs", timeUs);
230
231    sp<AMessage> response;
232    status_t err = msg->postAndAwaitResponse(&response);
233
234    uint32_t replyID;
235    CHECK(response == mSeekReply && 0 != mSeekReplyID);
236    mSeekReply.clear();
237    mSeekReplyID = 0;
238    return err;
239}
240
241void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
242    switch (msg->what()) {
243        case kWhatConnect:
244        {
245            onConnect(msg);
246            break;
247        }
248
249        case kWhatDisconnect:
250        {
251            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
252
253            if (mReconfigurationInProgress) {
254                break;
255            }
256
257            finishDisconnect();
258            break;
259        }
260
261        case kWhatSeek:
262        {
263            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
264
265            status_t err = onSeek(msg);
266
267            mSeekReply = new AMessage;
268            mSeekReply->setInt32("err", err);
269            break;
270        }
271
272        case kWhatFetcherNotify:
273        {
274            int32_t what;
275            CHECK(msg->findInt32("what", &what));
276
277            switch (what) {
278                case PlaylistFetcher::kWhatStarted:
279                    break;
280                case PlaylistFetcher::kWhatPaused:
281                case PlaylistFetcher::kWhatStopped:
282                {
283                    if (what == PlaylistFetcher::kWhatStopped) {
284                        AString uri;
285                        CHECK(msg->findString("uri", &uri));
286                        if (mFetcherInfos.removeItem(uri) < 0) {
287                            // ignore duplicated kWhatStopped messages.
288                            break;
289                        }
290
291                        tryToFinishBandwidthSwitch();
292                    }
293
294                    if (mContinuation != NULL) {
295                        CHECK_GT(mContinuationCounter, 0);
296                        if (--mContinuationCounter == 0) {
297                            mContinuation->post();
298
299                            if (mSeekReplyID != 0) {
300                                CHECK(mSeekReply != NULL);
301                                mSeekReply->postReply(mSeekReplyID);
302                            }
303                        }
304                    }
305                    break;
306                }
307
308                case PlaylistFetcher::kWhatDurationUpdate:
309                {
310                    AString uri;
311                    CHECK(msg->findString("uri", &uri));
312
313                    int64_t durationUs;
314                    CHECK(msg->findInt64("durationUs", &durationUs));
315
316                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
317                    info->mDurationUs = durationUs;
318                    break;
319                }
320
321                case PlaylistFetcher::kWhatError:
322                {
323                    status_t err;
324                    CHECK(msg->findInt32("err", &err));
325
326                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
327
328                    if (mInPreparationPhase) {
329                        postPrepared(err);
330                    }
331
332                    cancelBandwidthSwitch();
333
334                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
335
336                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
337
338                    mPacketSources.valueFor(
339                            STREAMTYPE_SUBTITLES)->signalEOS(err);
340
341                    sp<AMessage> notify = mNotify->dup();
342                    notify->setInt32("what", kWhatError);
343                    notify->setInt32("err", err);
344                    notify->post();
345                    break;
346                }
347
348                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
349                {
350                    AString uri;
351                    CHECK(msg->findString("uri", &uri));
352
353                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
354                    info->mIsPrepared = true;
355
356                    if (mInPreparationPhase) {
357                        bool allFetchersPrepared = true;
358                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
359                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
360                                allFetchersPrepared = false;
361                                break;
362                            }
363                        }
364
365                        if (allFetchersPrepared) {
366                            postPrepared(OK);
367                        }
368                    }
369                    break;
370                }
371
372                case PlaylistFetcher::kWhatStartedAt:
373                {
374                    int32_t switchGeneration;
375                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
376
377                    if (switchGeneration != mSwitchGeneration) {
378                        break;
379                    }
380
381                    // Resume fetcher for the original variant; the resumed fetcher should
382                    // continue until the timestamps found in msg, which is stored by the
383                    // new fetcher to indicate where the new variant has started buffering.
384                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
385                        const FetcherInfo info = mFetcherInfos.valueAt(i);
386                        if (info.mToBeRemoved) {
387                            info.mFetcher->resumeUntilAsync(msg);
388                        }
389                    }
390                    break;
391                }
392
393                default:
394                    TRESPASS();
395            }
396
397            break;
398        }
399
400        case kWhatCheckBandwidth:
401        {
402            int32_t generation;
403            CHECK(msg->findInt32("generation", &generation));
404
405            if (generation != mCheckBandwidthGeneration) {
406                break;
407            }
408
409            onCheckBandwidth();
410            break;
411        }
412
413        case kWhatChangeConfiguration:
414        {
415            onChangeConfiguration(msg);
416            break;
417        }
418
419        case kWhatChangeConfiguration2:
420        {
421            onChangeConfiguration2(msg);
422            break;
423        }
424
425        case kWhatChangeConfiguration3:
426        {
427            onChangeConfiguration3(msg);
428            break;
429        }
430
431        case kWhatFinishDisconnect2:
432        {
433            onFinishDisconnect2();
434            break;
435        }
436
437        case kWhatSwapped:
438        {
439            onSwapped(msg);
440            break;
441        }
442        default:
443            TRESPASS();
444            break;
445    }
446}
447
448// static
449int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
450    if (a->mBandwidth < b->mBandwidth) {
451        return -1;
452    } else if (a->mBandwidth == b->mBandwidth) {
453        return 0;
454    }
455
456    return 1;
457}
458
459// static
460LiveSession::StreamType LiveSession::indexToType(int idx) {
461    CHECK(idx >= 0 && idx < kMaxStreams);
462    return (StreamType)(1 << idx);
463}
464
465void LiveSession::onConnect(const sp<AMessage> &msg) {
466    AString url;
467    CHECK(msg->findString("url", &url));
468
469    KeyedVector<String8, String8> *headers = NULL;
470    if (!msg->findPointer("headers", (void **)&headers)) {
471        mExtraHeaders.clear();
472    } else {
473        mExtraHeaders = *headers;
474
475        delete headers;
476        headers = NULL;
477    }
478
479#if 1
480    ALOGI("onConnect <URL suppressed>");
481#else
482    ALOGI("onConnect %s", url.c_str());
483#endif
484
485    mMasterURL = url;
486
487    bool dummy;
488    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
489
490    if (mPlaylist == NULL) {
491        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
492
493        postPrepared(ERROR_IO);
494        return;
495    }
496
497    // We trust the content provider to make a reasonable choice of preferred
498    // initial bandwidth by listing it first in the variant playlist.
499    // At startup we really don't have a good estimate on the available
500    // network bandwidth since we haven't tranferred any data yet. Once
501    // we have we can make a better informed choice.
502    size_t initialBandwidth = 0;
503    size_t initialBandwidthIndex = 0;
504
505    if (mPlaylist->isVariantPlaylist()) {
506        for (size_t i = 0; i < mPlaylist->size(); ++i) {
507            BandwidthItem item;
508
509            item.mPlaylistIndex = i;
510
511            sp<AMessage> meta;
512            AString uri;
513            mPlaylist->itemAt(i, &uri, &meta);
514
515            unsigned long bandwidth;
516            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
517
518            if (initialBandwidth == 0) {
519                initialBandwidth = item.mBandwidth;
520            }
521
522            mBandwidthItems.push(item);
523        }
524
525        CHECK_GT(mBandwidthItems.size(), 0u);
526
527        mBandwidthItems.sort(SortByBandwidth);
528
529        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
530            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
531                initialBandwidthIndex = i;
532                break;
533            }
534        }
535    } else {
536        // dummy item.
537        BandwidthItem item;
538        item.mPlaylistIndex = 0;
539        item.mBandwidth = 0;
540        mBandwidthItems.push(item);
541    }
542
543    changeConfiguration(
544            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
545}
546
547void LiveSession::finishDisconnect() {
548    // No reconfiguration is currently pending, make sure none will trigger
549    // during disconnection either.
550    cancelCheckBandwidthEvent();
551
552    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
553    // (finishDisconnect, onFinishDisconnect2)
554    cancelBandwidthSwitch();
555
556    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
557        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
558    }
559
560    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
561
562    mContinuationCounter = mFetcherInfos.size();
563    mContinuation = msg;
564
565    if (mContinuationCounter == 0) {
566        msg->post();
567    }
568}
569
570void LiveSession::onFinishDisconnect2() {
571    mContinuation.clear();
572
573    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
574    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
575
576    mPacketSources.valueFor(
577            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
578
579    sp<AMessage> response = new AMessage;
580    response->setInt32("err", OK);
581
582    response->postReply(mDisconnectReplyID);
583    mDisconnectReplyID = 0;
584}
585
586sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
587    ssize_t index = mFetcherInfos.indexOfKey(uri);
588
589    if (index >= 0) {
590        return NULL;
591    }
592
593    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
594    notify->setString("uri", uri);
595    notify->setInt32("switchGeneration", mSwitchGeneration);
596
597    FetcherInfo info;
598    info.mFetcher = new PlaylistFetcher(notify, this, uri);
599    info.mDurationUs = -1ll;
600    info.mIsPrepared = false;
601    info.mToBeRemoved = false;
602    looper()->registerHandler(info.mFetcher);
603
604    mFetcherInfos.add(uri, info);
605
606    return info.mFetcher;
607}
608
609/*
610 * Illustration of parameters:
611 *
612 * 0      `range_offset`
613 * +------------+-------------------------------------------------------+--+--+
614 * |            |                                 | next block to fetch |  |  |
615 * |            | `source` handle => `out` buffer |                     |  |  |
616 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
617 * |            |<----------- `range_length` / buffer capacity ----------->|  |
618 * |<------------------------------ file_size ------------------------------->|
619 *
620 * Special parameter values:
621 * - range_length == -1 means entire file
622 * - block_size == 0 means entire range
623 *
624 */
625status_t LiveSession::fetchFile(
626        const char *url, sp<ABuffer> *out,
627        int64_t range_offset, int64_t range_length,
628        uint32_t block_size, /* download block size */
629        sp<DataSource> *source, /* to return and reuse source */
630        String8 *actualUrl) {
631    off64_t size;
632    sp<DataSource> temp_source;
633    if (source == NULL) {
634        source = &temp_source;
635    }
636
637    if (*source == NULL) {
638        if (!strncasecmp(url, "file://", 7)) {
639            *source = new FileSource(url + 7);
640        } else if (strncasecmp(url, "http://", 7)
641                && strncasecmp(url, "https://", 8)) {
642            return ERROR_UNSUPPORTED;
643        } else {
644            KeyedVector<String8, String8> headers = mExtraHeaders;
645            if (range_offset > 0 || range_length >= 0) {
646                headers.add(
647                        String8("Range"),
648                        String8(
649                            StringPrintf(
650                                "bytes=%lld-%s",
651                                range_offset,
652                                range_length < 0
653                                    ? "" : StringPrintf("%lld",
654                                            range_offset + range_length - 1).c_str()).c_str()));
655            }
656            status_t err = mHTTPDataSource->connect(url, &headers);
657
658            if (err != OK) {
659                return err;
660            }
661
662            *source = mHTTPDataSource;
663        }
664    }
665
666    status_t getSizeErr = (*source)->getSize(&size);
667    if (getSizeErr != OK) {
668        size = 65536;
669    }
670
671    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
672    if (*out == NULL) {
673        buffer->setRange(0, 0);
674    }
675
676    // adjust range_length if only reading partial block
677    if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
678        range_length = buffer->size() + block_size;
679    }
680    for (;;) {
681        // Only resize when we don't know the size.
682        size_t bufferRemaining = buffer->capacity() - buffer->size();
683        if (bufferRemaining == 0 && getSizeErr != OK) {
684            bufferRemaining = 32768;
685
686            ALOGV("increasing download buffer to %d bytes",
687                 buffer->size() + bufferRemaining);
688
689            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
690            memcpy(copy->data(), buffer->data(), buffer->size());
691            copy->setRange(0, buffer->size());
692
693            buffer = copy;
694        }
695
696        size_t maxBytesToRead = bufferRemaining;
697        if (range_length >= 0) {
698            int64_t bytesLeftInRange = range_length - buffer->size();
699            if (bytesLeftInRange < maxBytesToRead) {
700                maxBytesToRead = bytesLeftInRange;
701
702                if (bytesLeftInRange == 0) {
703                    break;
704                }
705            }
706        }
707
708        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
709        // to help us break out of the loop.
710        ssize_t n = (*source)->readAt(
711                buffer->size(), buffer->data() + buffer->size(),
712                maxBytesToRead);
713
714        if (n < 0) {
715            return n;
716        }
717
718        if (n == 0) {
719            break;
720        }
721
722        buffer->setRange(0, buffer->size() + (size_t)n);
723    }
724
725    *out = buffer;
726    if (actualUrl != NULL) {
727        *actualUrl = (*source)->getUri();
728        if (actualUrl->isEmpty()) {
729            *actualUrl = url;
730        }
731    }
732
733    return OK;
734}
735
736sp<M3UParser> LiveSession::fetchPlaylist(
737        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
738    ALOGV("fetchPlaylist '%s'", url);
739
740    *unchanged = false;
741
742    sp<ABuffer> buffer;
743    String8 actualUrl;
744    status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
745
746    if (err != OK) {
747        return NULL;
748    }
749
750    // MD5 functionality is not available on the simulator, treat all
751    // playlists as changed.
752
753#if defined(HAVE_ANDROID_OS)
754    uint8_t hash[16];
755
756    MD5_CTX m;
757    MD5_Init(&m);
758    MD5_Update(&m, buffer->data(), buffer->size());
759
760    MD5_Final(hash, &m);
761
762    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
763        // playlist unchanged
764        *unchanged = true;
765
766        return NULL;
767    }
768
769    if (curPlaylistHash != NULL) {
770        memcpy(curPlaylistHash, hash, sizeof(hash));
771    }
772#endif
773
774    sp<M3UParser> playlist =
775        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
776
777    if (playlist->initCheck() != OK) {
778        ALOGE("failed to parse .m3u8 playlist");
779
780        return NULL;
781    }
782
783    return playlist;
784}
785
786static double uniformRand() {
787    return (double)rand() / RAND_MAX;
788}
789
790size_t LiveSession::getBandwidthIndex() {
791    if (mBandwidthItems.size() == 0) {
792        return 0;
793    }
794
795#if 1
796    char value[PROPERTY_VALUE_MAX];
797    ssize_t index = -1;
798    if (property_get("media.httplive.bw-index", value, NULL)) {
799        char *end;
800        index = strtol(value, &end, 10);
801        CHECK(end > value && *end == '\0');
802
803        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
804            index = mBandwidthItems.size() - 1;
805        }
806    }
807
808    if (index < 0) {
809        int32_t bandwidthBps;
810        if (mHTTPDataSource != NULL
811                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
812            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
813        } else {
814            ALOGV("no bandwidth estimate.");
815            return 0;  // Pick the lowest bandwidth stream by default.
816        }
817
818        char value[PROPERTY_VALUE_MAX];
819        if (property_get("media.httplive.max-bw", value, NULL)) {
820            char *end;
821            long maxBw = strtoul(value, &end, 10);
822            if (end > value && *end == '\0') {
823                if (maxBw > 0 && bandwidthBps > maxBw) {
824                    ALOGV("bandwidth capped to %ld bps", maxBw);
825                    bandwidthBps = maxBw;
826                }
827            }
828        }
829
830        // Consider only 80% of the available bandwidth usable.
831        bandwidthBps = (bandwidthBps * 8) / 10;
832
833        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
834
835        index = mBandwidthItems.size() - 1;
836        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
837                                > (size_t)bandwidthBps) {
838            --index;
839        }
840    }
841#elif 0
842    // Change bandwidth at random()
843    size_t index = uniformRand() * mBandwidthItems.size();
844#elif 0
845    // There's a 50% chance to stay on the current bandwidth and
846    // a 50% chance to switch to the next higher bandwidth (wrapping around
847    // to lowest)
848    const size_t kMinIndex = 0;
849
850    static ssize_t mPrevBandwidthIndex = -1;
851
852    size_t index;
853    if (mPrevBandwidthIndex < 0) {
854        index = kMinIndex;
855    } else if (uniformRand() < 0.5) {
856        index = (size_t)mPrevBandwidthIndex;
857    } else {
858        index = mPrevBandwidthIndex + 1;
859        if (index == mBandwidthItems.size()) {
860            index = kMinIndex;
861        }
862    }
863    mPrevBandwidthIndex = index;
864#elif 0
865    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
866
867    size_t index = mBandwidthItems.size() - 1;
868    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
869        --index;
870    }
871#elif 1
872    char value[PROPERTY_VALUE_MAX];
873    size_t index;
874    if (property_get("media.httplive.bw-index", value, NULL)) {
875        char *end;
876        index = strtoul(value, &end, 10);
877        CHECK(end > value && *end == '\0');
878
879        if (index >= mBandwidthItems.size()) {
880            index = mBandwidthItems.size() - 1;
881        }
882    } else {
883        index = 0;
884    }
885#else
886    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
887#endif
888
889    CHECK_GE(index, 0);
890
891    return index;
892}
893
894status_t LiveSession::onSeek(const sp<AMessage> &msg) {
895    int64_t timeUs;
896    CHECK(msg->findInt64("timeUs", &timeUs));
897
898    if (!mReconfigurationInProgress) {
899        changeConfiguration(timeUs, getBandwidthIndex());
900    }
901
902    return OK;
903}
904
905status_t LiveSession::getDuration(int64_t *durationUs) const {
906    int64_t maxDurationUs = 0ll;
907    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
908        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
909
910        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
911            maxDurationUs = fetcherDurationUs;
912        }
913    }
914
915    *durationUs = maxDurationUs;
916
917    return OK;
918}
919
920bool LiveSession::isSeekable() const {
921    int64_t durationUs;
922    return getDuration(&durationUs) == OK && durationUs >= 0;
923}
924
925bool LiveSession::hasDynamicDuration() const {
926    return false;
927}
928
929status_t LiveSession::getTrackInfo(Parcel *reply) const {
930    return mPlaylist->getTrackInfo(reply);
931}
932
933status_t LiveSession::selectTrack(size_t index, bool select) {
934    status_t err = mPlaylist->selectTrack(index, select);
935    if (err == OK) {
936        (new AMessage(kWhatChangeConfiguration, id()))->post();
937    }
938    return err;
939}
940
941bool LiveSession::canSwitchUp() {
942    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
943    status_t err = OK;
944    for (size_t i = 0; i < mPacketSources.size(); ++i) {
945        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
946        int64_t dur = source->getBufferedDurationUs(&err);
947        if (err == OK && dur > 10000000) {
948            return true;
949        }
950    }
951    return false;
952}
953
954void LiveSession::changeConfiguration(
955        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
956    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
957    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
958    cancelBandwidthSwitch();
959
960    CHECK(!mReconfigurationInProgress);
961    mReconfigurationInProgress = true;
962
963    mPrevBandwidthIndex = bandwidthIndex;
964
965    ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
966          timeUs, bandwidthIndex, pickTrack);
967
968    if (pickTrack) {
969        mPlaylist->pickRandomMediaItems();
970    }
971
972    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
973    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
974
975    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
976    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
977
978    AString URIs[kMaxStreams];
979    for (size_t i = 0; i < kMaxStreams; ++i) {
980        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
981            streamMask |= indexToType(i);
982        }
983    }
984
985    // Step 1, stop and discard fetchers that are no longer needed.
986    // Pause those that we'll reuse.
987    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
988        const AString &uri = mFetcherInfos.keyAt(i);
989
990        bool discardFetcher = true;
991
992        // If we're seeking all current fetchers are discarded.
993        if (timeUs < 0ll) {
994            // delay fetcher removal
995            discardFetcher = false;
996
997            for (size_t j = 0; j < kMaxStreams; ++j) {
998                StreamType type = indexToType(j);
999                if ((streamMask & type) && uri == URIs[j]) {
1000                    resumeMask |= type;
1001                    streamMask &= ~type;
1002                }
1003            }
1004        }
1005
1006        if (discardFetcher) {
1007            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1008        } else {
1009            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1010        }
1011    }
1012
1013    sp<AMessage> msg;
1014    if (timeUs < 0ll) {
1015        // skip onChangeConfiguration2 (decoder destruction) if switching.
1016        msg = new AMessage(kWhatChangeConfiguration3, id());
1017    } else {
1018        msg = new AMessage(kWhatChangeConfiguration2, id());
1019    }
1020    msg->setInt32("streamMask", streamMask);
1021    msg->setInt32("resumeMask", resumeMask);
1022    msg->setInt64("timeUs", timeUs);
1023    for (size_t i = 0; i < kMaxStreams; ++i) {
1024        if (streamMask & indexToType(i)) {
1025            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1026        }
1027    }
1028
1029    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1030    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1031    // fetchers have completed their asynchronous operation, we'll post
1032    // mContinuation, which then is handled below in onChangeConfiguration2.
1033    mContinuationCounter = mFetcherInfos.size();
1034    mContinuation = msg;
1035
1036    if (mContinuationCounter == 0) {
1037        msg->post();
1038
1039        if (mSeekReplyID != 0) {
1040            CHECK(mSeekReply != NULL);
1041            mSeekReply->postReply(mSeekReplyID);
1042        }
1043    }
1044}
1045
1046void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1047    if (!mReconfigurationInProgress) {
1048        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
1049    } else {
1050        msg->post(1000000ll); // retry in 1 sec
1051    }
1052}
1053
1054void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1055    mContinuation.clear();
1056
1057    // All fetchers are either suspended or have been removed now.
1058
1059    uint32_t streamMask;
1060    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1061
1062    AString URIs[kMaxStreams];
1063    for (size_t i = 0; i < kMaxStreams; ++i) {
1064        if (streamMask & indexToType(i)) {
1065            const AString &uriKey = mStreams[i].uriKey();
1066            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1067            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1068        }
1069    }
1070
1071    // Determine which decoders to shutdown on the player side,
1072    // a decoder has to be shutdown if either
1073    // 1) its streamtype was active before but now longer isn't.
1074    // or
1075    // 2) its streamtype was already active and still is but the URI
1076    //    has changed.
1077    uint32_t changedMask = 0;
1078    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1079        if (((mStreamMask & streamMask & indexToType(i))
1080                && !(URIs[i] == mStreams[i].mUri))
1081                || (mStreamMask & ~streamMask & indexToType(i))) {
1082            changedMask |= indexToType(i);
1083        }
1084    }
1085
1086    if (changedMask == 0) {
1087        // If nothing changed as far as the audio/video decoders
1088        // are concerned we can proceed.
1089        onChangeConfiguration3(msg);
1090        return;
1091    }
1092
1093    // Something changed, inform the player which will shutdown the
1094    // corresponding decoders and will post the reply once that's done.
1095    // Handling the reply will continue executing below in
1096    // onChangeConfiguration3.
1097    sp<AMessage> notify = mNotify->dup();
1098    notify->setInt32("what", kWhatStreamsChanged);
1099    notify->setInt32("changedMask", changedMask);
1100
1101    msg->setWhat(kWhatChangeConfiguration3);
1102    msg->setTarget(id());
1103
1104    notify->setMessage("reply", msg);
1105    notify->post();
1106}
1107
1108void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1109    mContinuation.clear();
1110    // All remaining fetchers are still suspended, the player has shutdown
1111    // any decoders that needed it.
1112
1113    uint32_t streamMask, resumeMask;
1114    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1115    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1116
1117    for (size_t i = 0; i < kMaxStreams; ++i) {
1118        if (streamMask & indexToType(i)) {
1119            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1120        }
1121    }
1122
1123    int64_t timeUs;
1124    bool switching = false;
1125    CHECK(msg->findInt64("timeUs", &timeUs));
1126
1127    if (timeUs < 0ll) {
1128        timeUs = mLastDequeuedTimeUs;
1129        switching = true;
1130    }
1131    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1132
1133    mNewStreamMask = streamMask;
1134
1135    // Of all existing fetchers:
1136    // * Resume fetchers that are still needed and assign them original packet sources.
1137    // * Mark otherwise unneeded fetchers for removal.
1138    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1139    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1140        const AString &uri = mFetcherInfos.keyAt(i);
1141
1142        sp<AnotherPacketSource> sources[kMaxStreams];
1143        for (size_t j = 0; j < kMaxStreams; ++j) {
1144            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1145                sources[j] = mPacketSources.valueFor(indexToType(j));
1146            }
1147        }
1148
1149        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1150        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1151                || sources[kSubtitleIndex] != NULL) {
1152            info.mFetcher->startAsync(
1153                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1154        } else {
1155            info.mToBeRemoved = true;
1156        }
1157    }
1158
1159    // streamMask now only contains the types that need a new fetcher created.
1160
1161    if (streamMask != 0) {
1162        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1163    }
1164
1165    // Find out when the original fetchers have buffered up to and start the new fetchers
1166    // at a later timestamp.
1167    for (size_t i = 0; i < kMaxStreams; i++) {
1168        if (!(indexToType(i) & streamMask)) {
1169            continue;
1170        }
1171
1172        AString uri;
1173        uri = mStreams[i].mUri;
1174
1175        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1176        CHECK(fetcher != NULL);
1177
1178        int32_t latestSeq = -1;
1179        int64_t latestTimeUs = 0ll;
1180        sp<AnotherPacketSource> sources[kMaxStreams];
1181
1182        // TRICKY: looping from i as earlier streams are already removed from streamMask
1183        for (size_t j = i; j < kMaxStreams; ++j) {
1184            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1185                sources[j] = mPacketSources.valueFor(indexToType(j));
1186
1187                if (!switching) {
1188                    sources[j]->clear();
1189                } else {
1190                    int32_t type, seq;
1191                    int64_t srcTimeUs;
1192                    sp<AMessage> meta = sources[j]->getLatestMeta();
1193
1194                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1195                        CHECK(meta->findInt32("seq", &seq));
1196                        if (seq > latestSeq) {
1197                            latestSeq = seq;
1198                        }
1199                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
1200                        if (srcTimeUs > latestTimeUs) {
1201                            latestTimeUs = srcTimeUs;
1202                        }
1203                    }
1204
1205                    sources[j] = mPacketSources2.valueFor(indexToType(j));
1206                    sources[j]->clear();
1207                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1208                    if (extraStreams & indexToType(j)) {
1209                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
1210                    }
1211                }
1212
1213                streamMask &= ~indexToType(j);
1214            }
1215        }
1216
1217        fetcher->startAsync(
1218                sources[kAudioIndex],
1219                sources[kVideoIndex],
1220                sources[kSubtitleIndex],
1221                timeUs,
1222                latestTimeUs /* min start time(us) */,
1223                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
1224    }
1225
1226    // All fetchers have now been started, the configuration change
1227    // has completed.
1228
1229    scheduleCheckBandwidthEvent();
1230
1231    ALOGV("XXX configuration change completed.");
1232    mReconfigurationInProgress = false;
1233    if (switching) {
1234        mSwitchInProgress = true;
1235    } else {
1236        mStreamMask = mNewStreamMask;
1237    }
1238
1239    if (mDisconnectReplyID != 0) {
1240        finishDisconnect();
1241    }
1242}
1243
1244void LiveSession::onSwapped(const sp<AMessage> &msg) {
1245    int32_t switchGeneration;
1246    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1247    if (switchGeneration != mSwitchGeneration) {
1248        return;
1249    }
1250
1251    int32_t stream;
1252    CHECK(msg->findInt32("stream", &stream));
1253    mSwapMask |= stream;
1254    if (mSwapMask != mStreamMask) {
1255        return;
1256    }
1257
1258    // Check if new variant contains extra streams.
1259    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1260    while (extraStreams) {
1261        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1262        swapPacketSource(extraStream);
1263        extraStreams &= ~extraStream;
1264    }
1265
1266    tryToFinishBandwidthSwitch();
1267}
1268
1269// Mark switch done when:
1270//   1. all old buffers are swapped out, AND
1271//   2. all old fetchers are removed.
1272void LiveSession::tryToFinishBandwidthSwitch() {
1273    bool needToRemoveFetchers = false;
1274    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1275        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1276            needToRemoveFetchers = true;
1277            break;
1278        }
1279    }
1280    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
1281        mStreamMask = mNewStreamMask;
1282        mSwitchInProgress = false;
1283        mSwapMask = 0;
1284    }
1285}
1286
1287void LiveSession::scheduleCheckBandwidthEvent() {
1288    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1289    msg->setInt32("generation", mCheckBandwidthGeneration);
1290    msg->post(10000000ll);
1291}
1292
1293void LiveSession::cancelCheckBandwidthEvent() {
1294    ++mCheckBandwidthGeneration;
1295}
1296
1297void LiveSession::cancelBandwidthSwitch() {
1298    Mutex::Autolock lock(mSwapMutex);
1299    mSwitchGeneration++;
1300    mSwitchInProgress = false;
1301    mSwapMask = 0;
1302}
1303
1304bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1305    if (mReconfigurationInProgress || mSwitchInProgress) {
1306        return false;
1307    }
1308
1309    if (mPrevBandwidthIndex < 0) {
1310        return true;
1311    }
1312
1313    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
1314        return false;
1315    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
1316        return canSwitchUp();
1317    } else {
1318        return true;
1319    }
1320}
1321
1322void LiveSession::onCheckBandwidth() {
1323    size_t bandwidthIndex = getBandwidthIndex();
1324    if (canSwitchBandwidthTo(bandwidthIndex)) {
1325        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1326    } else {
1327        scheduleCheckBandwidthEvent();
1328    }
1329
1330    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1331    // schedule another one on return, only an explicit call to
1332    // scheduleCheckBandwidthEvent will do that.
1333    // This ensures that only one configuration change is ongoing at any
1334    // one time, once that completes it'll schedule another check bandwidth
1335    // event.
1336}
1337
1338void LiveSession::postPrepared(status_t err) {
1339    CHECK(mInPreparationPhase);
1340
1341    sp<AMessage> notify = mNotify->dup();
1342    if (err == OK || err == ERROR_END_OF_STREAM) {
1343        notify->setInt32("what", kWhatPrepared);
1344    } else {
1345        notify->setInt32("what", kWhatPreparationFailed);
1346        notify->setInt32("err", err);
1347    }
1348
1349    notify->post();
1350
1351    mInPreparationPhase = false;
1352}
1353
1354}  // namespace android
1355
1356