LiveSession.cpp revision b4a7a2df4c28c3f32b5d877b54831d2cc5d78f81
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mPrevBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0) {
72
73    mStreams[kAudioIndex] = StreamItem("audio");
74    mStreams[kVideoIndex] = StreamItem("video");
75    mStreams[kSubtitleIndex] = StreamItem("subtitles");
76
77    for (size_t i = 0; i < kMaxStreams; ++i) {
78        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
79        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
80    }
81}
82
83LiveSession::~LiveSession() {
84}
85
86sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
87    ABuffer *discontinuity = new ABuffer(0);
88    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
89    discontinuity->meta()->setInt32("swapPacketSource", swap);
90    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
91    discontinuity->meta()->setInt64("timeUs", -1);
92    return discontinuity;
93}
94
95void LiveSession::swapPacketSource(StreamType stream) {
96    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
97    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
98    sp<AnotherPacketSource> tmp = aps;
99    aps = aps2;
100    aps2 = tmp;
101    aps2->clear();
102}
103
104status_t LiveSession::dequeueAccessUnit(
105        StreamType stream, sp<ABuffer> *accessUnit) {
106    if (!(mStreamMask & stream)) {
107        // return -EWOULDBLOCK to avoid halting the decoder
108        // when switching between audio/video and audio only.
109        return -EWOULDBLOCK;
110    }
111
112    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
113
114    status_t finalResult;
115    if (!packetSource->hasBufferAvailable(&finalResult)) {
116        return finalResult == OK ? -EAGAIN : finalResult;
117    }
118
119    status_t err = packetSource->dequeueAccessUnit(accessUnit);
120
121    const char *streamStr;
122    switch (stream) {
123        case STREAMTYPE_AUDIO:
124            streamStr = "audio";
125            break;
126        case STREAMTYPE_VIDEO:
127            streamStr = "video";
128            break;
129        case STREAMTYPE_SUBTITLES:
130            streamStr = "subs";
131            break;
132        default:
133            TRESPASS();
134    }
135
136    if (err == INFO_DISCONTINUITY) {
137        int32_t type;
138        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
139
140        sp<AMessage> extra;
141        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
142            extra.clear();
143        }
144
145        ALOGI("[%s] read discontinuity of type %d, extra = %s",
146              streamStr,
147              type,
148              extra == NULL ? "NULL" : extra->debugString().c_str());
149
150        int32_t swap;
151        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
152                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
153                && swap) {
154
155            int32_t switchGeneration;
156            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
157            {
158                Mutex::Autolock lock(mSwapMutex);
159                if (switchGeneration == mSwitchGeneration) {
160                    swapPacketSource(stream);
161                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
162                    msg->setInt32("stream", stream);
163                    msg->setInt32("switchGeneration", switchGeneration);
164                    msg->post();
165                }
166            }
167        }
168    } else if (err == OK) {
169        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
170            int64_t timeUs;
171            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
172            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
173
174            mLastDequeuedTimeUs = timeUs;
175            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
176        } else if (stream == STREAMTYPE_SUBTITLES) {
177            (*accessUnit)->meta()->setInt32(
178                    "trackIndex", mPlaylist->getSelectedIndex());
179            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
180        }
181    } else {
182        ALOGI("[%s] encountered error %d", streamStr, err);
183    }
184
185    return err;
186}
187
188status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
189    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
190    if (!(mStreamMask & stream)) {
191        return UNKNOWN_ERROR;
192    }
193
194    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
195
196    sp<MetaData> meta = packetSource->getFormat();
197
198    if (meta == NULL) {
199        return -EAGAIN;
200    }
201
202    return convertMetaDataToMessage(meta, format);
203}
204
205void LiveSession::connectAsync(
206        const char *url, const KeyedVector<String8, String8> *headers) {
207    sp<AMessage> msg = new AMessage(kWhatConnect, id());
208    msg->setString("url", url);
209
210    if (headers != NULL) {
211        msg->setPointer(
212                "headers",
213                new KeyedVector<String8, String8>(*headers));
214    }
215
216    msg->post();
217}
218
219status_t LiveSession::disconnect() {
220    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
221
222    sp<AMessage> response;
223    status_t err = msg->postAndAwaitResponse(&response);
224
225    return err;
226}
227
228status_t LiveSession::seekTo(int64_t timeUs) {
229    sp<AMessage> msg = new AMessage(kWhatSeek, id());
230    msg->setInt64("timeUs", timeUs);
231
232    sp<AMessage> response;
233    status_t err = msg->postAndAwaitResponse(&response);
234
235    uint32_t replyID;
236    CHECK(response == mSeekReply && 0 != mSeekReplyID);
237    mSeekReply.clear();
238    mSeekReplyID = 0;
239    return err;
240}
241
242void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
243    switch (msg->what()) {
244        case kWhatConnect:
245        {
246            onConnect(msg);
247            break;
248        }
249
250        case kWhatDisconnect:
251        {
252            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
253
254            if (mReconfigurationInProgress) {
255                break;
256            }
257
258            finishDisconnect();
259            break;
260        }
261
262        case kWhatSeek:
263        {
264            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
265
266            status_t err = onSeek(msg);
267
268            mSeekReply = new AMessage;
269            mSeekReply->setInt32("err", err);
270            break;
271        }
272
273        case kWhatFetcherNotify:
274        {
275            int32_t what;
276            CHECK(msg->findInt32("what", &what));
277
278            switch (what) {
279                case PlaylistFetcher::kWhatStarted:
280                    break;
281                case PlaylistFetcher::kWhatPaused:
282                case PlaylistFetcher::kWhatStopped:
283                {
284                    if (what == PlaylistFetcher::kWhatStopped) {
285                        AString uri;
286                        CHECK(msg->findString("uri", &uri));
287                        if (mFetcherInfos.removeItem(uri) < 0) {
288                            // ignore duplicated kWhatStopped messages.
289                            break;
290                        }
291
292                        tryToFinishBandwidthSwitch();
293                    }
294
295                    if (mContinuation != NULL) {
296                        CHECK_GT(mContinuationCounter, 0);
297                        if (--mContinuationCounter == 0) {
298                            mContinuation->post();
299
300                            if (mSeekReplyID != 0) {
301                                CHECK(mSeekReply != NULL);
302                                mSeekReply->postReply(mSeekReplyID);
303                            }
304                        }
305                    }
306                    break;
307                }
308
309                case PlaylistFetcher::kWhatDurationUpdate:
310                {
311                    AString uri;
312                    CHECK(msg->findString("uri", &uri));
313
314                    int64_t durationUs;
315                    CHECK(msg->findInt64("durationUs", &durationUs));
316
317                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
318                    info->mDurationUs = durationUs;
319                    break;
320                }
321
322                case PlaylistFetcher::kWhatError:
323                {
324                    status_t err;
325                    CHECK(msg->findInt32("err", &err));
326
327                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
328
329                    if (mInPreparationPhase) {
330                        postPrepared(err);
331                    }
332
333                    cancelBandwidthSwitch();
334
335                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
336
337                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
338
339                    mPacketSources.valueFor(
340                            STREAMTYPE_SUBTITLES)->signalEOS(err);
341
342                    sp<AMessage> notify = mNotify->dup();
343                    notify->setInt32("what", kWhatError);
344                    notify->setInt32("err", err);
345                    notify->post();
346                    break;
347                }
348
349                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
350                {
351                    AString uri;
352                    CHECK(msg->findString("uri", &uri));
353
354                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
355                    info->mIsPrepared = true;
356
357                    if (mInPreparationPhase) {
358                        bool allFetchersPrepared = true;
359                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
360                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
361                                allFetchersPrepared = false;
362                                break;
363                            }
364                        }
365
366                        if (allFetchersPrepared) {
367                            postPrepared(OK);
368                        }
369                    }
370                    break;
371                }
372
373                case PlaylistFetcher::kWhatStartedAt:
374                {
375                    int32_t switchGeneration;
376                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
377
378                    if (switchGeneration != mSwitchGeneration) {
379                        break;
380                    }
381
382                    // Resume fetcher for the original variant; the resumed fetcher should
383                    // continue until the timestamps found in msg, which is stored by the
384                    // new fetcher to indicate where the new variant has started buffering.
385                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
386                        const FetcherInfo info = mFetcherInfos.valueAt(i);
387                        if (info.mToBeRemoved) {
388                            info.mFetcher->resumeUntilAsync(msg);
389                        }
390                    }
391                    break;
392                }
393
394                default:
395                    TRESPASS();
396            }
397
398            break;
399        }
400
401        case kWhatCheckBandwidth:
402        {
403            int32_t generation;
404            CHECK(msg->findInt32("generation", &generation));
405
406            if (generation != mCheckBandwidthGeneration) {
407                break;
408            }
409
410            onCheckBandwidth();
411            break;
412        }
413
414        case kWhatChangeConfiguration:
415        {
416            onChangeConfiguration(msg);
417            break;
418        }
419
420        case kWhatChangeConfiguration2:
421        {
422            onChangeConfiguration2(msg);
423            break;
424        }
425
426        case kWhatChangeConfiguration3:
427        {
428            onChangeConfiguration3(msg);
429            break;
430        }
431
432        case kWhatFinishDisconnect2:
433        {
434            onFinishDisconnect2();
435            break;
436        }
437
438        case kWhatSwapped:
439        {
440            onSwapped(msg);
441            break;
442        }
443        default:
444            TRESPASS();
445            break;
446    }
447}
448
449// static
450int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
451    if (a->mBandwidth < b->mBandwidth) {
452        return -1;
453    } else if (a->mBandwidth == b->mBandwidth) {
454        return 0;
455    }
456
457    return 1;
458}
459
460// static
461LiveSession::StreamType LiveSession::indexToType(int idx) {
462    CHECK(idx >= 0 && idx < kMaxStreams);
463    return (StreamType)(1 << idx);
464}
465
466void LiveSession::onConnect(const sp<AMessage> &msg) {
467    AString url;
468    CHECK(msg->findString("url", &url));
469
470    KeyedVector<String8, String8> *headers = NULL;
471    if (!msg->findPointer("headers", (void **)&headers)) {
472        mExtraHeaders.clear();
473    } else {
474        mExtraHeaders = *headers;
475
476        delete headers;
477        headers = NULL;
478    }
479
480#if 1
481    ALOGI("onConnect <URL suppressed>");
482#else
483    ALOGI("onConnect %s", url.c_str());
484#endif
485
486    mMasterURL = url;
487
488    bool dummy;
489    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
490
491    if (mPlaylist == NULL) {
492        ALOGE("unable to fetch master playlist <URL suppressed>.");
493
494        postPrepared(ERROR_IO);
495        return;
496    }
497
498    // We trust the content provider to make a reasonable choice of preferred
499    // initial bandwidth by listing it first in the variant playlist.
500    // At startup we really don't have a good estimate on the available
501    // network bandwidth since we haven't tranferred any data yet. Once
502    // we have we can make a better informed choice.
503    size_t initialBandwidth = 0;
504    size_t initialBandwidthIndex = 0;
505
506    if (mPlaylist->isVariantPlaylist()) {
507        for (size_t i = 0; i < mPlaylist->size(); ++i) {
508            BandwidthItem item;
509
510            item.mPlaylistIndex = i;
511
512            sp<AMessage> meta;
513            AString uri;
514            mPlaylist->itemAt(i, &uri, &meta);
515
516            unsigned long bandwidth;
517            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
518
519            if (initialBandwidth == 0) {
520                initialBandwidth = item.mBandwidth;
521            }
522
523            mBandwidthItems.push(item);
524        }
525
526        CHECK_GT(mBandwidthItems.size(), 0u);
527
528        mBandwidthItems.sort(SortByBandwidth);
529
530        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
531            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
532                initialBandwidthIndex = i;
533                break;
534            }
535        }
536    } else {
537        // dummy item.
538        BandwidthItem item;
539        item.mPlaylistIndex = 0;
540        item.mBandwidth = 0;
541        mBandwidthItems.push(item);
542    }
543
544    changeConfiguration(
545            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
546}
547
548void LiveSession::finishDisconnect() {
549    // No reconfiguration is currently pending, make sure none will trigger
550    // during disconnection either.
551    cancelCheckBandwidthEvent();
552
553    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
554    // (finishDisconnect, onFinishDisconnect2)
555    cancelBandwidthSwitch();
556
557    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
558        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
559    }
560
561    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
562
563    mContinuationCounter = mFetcherInfos.size();
564    mContinuation = msg;
565
566    if (mContinuationCounter == 0) {
567        msg->post();
568    }
569}
570
571void LiveSession::onFinishDisconnect2() {
572    mContinuation.clear();
573
574    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
575    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
576
577    mPacketSources.valueFor(
578            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
579
580    sp<AMessage> response = new AMessage;
581    response->setInt32("err", OK);
582
583    response->postReply(mDisconnectReplyID);
584    mDisconnectReplyID = 0;
585}
586
587sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
588    ssize_t index = mFetcherInfos.indexOfKey(uri);
589
590    if (index >= 0) {
591        return NULL;
592    }
593
594    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
595    notify->setString("uri", uri);
596    notify->setInt32("switchGeneration", mSwitchGeneration);
597
598    FetcherInfo info;
599    info.mFetcher = new PlaylistFetcher(notify, this, uri);
600    info.mDurationUs = -1ll;
601    info.mIsPrepared = false;
602    info.mToBeRemoved = false;
603    looper()->registerHandler(info.mFetcher);
604
605    mFetcherInfos.add(uri, info);
606
607    return info.mFetcher;
608}
609
610/*
611 * Illustration of parameters:
612 *
613 * 0      `range_offset`
614 * +------------+-------------------------------------------------------+--+--+
615 * |            |                                 | next block to fetch |  |  |
616 * |            | `source` handle => `out` buffer |                     |  |  |
617 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
618 * |            |<----------- `range_length` / buffer capacity ----------->|  |
619 * |<------------------------------ file_size ------------------------------->|
620 *
621 * Special parameter values:
622 * - range_length == -1 means entire file
623 * - block_size == 0 means entire range
624 *
625 */
626ssize_t LiveSession::fetchFile(
627        const char *url, sp<ABuffer> *out,
628        int64_t range_offset, int64_t range_length,
629        uint32_t block_size, /* download block size */
630        sp<DataSource> *source, /* to return and reuse source */
631        String8 *actualUrl) {
632    off64_t size;
633    sp<DataSource> temp_source;
634    if (source == NULL) {
635        source = &temp_source;
636    }
637
638    if (*source == NULL) {
639        if (!strncasecmp(url, "file://", 7)) {
640            *source = new FileSource(url + 7);
641        } else if (strncasecmp(url, "http://", 7)
642                && strncasecmp(url, "https://", 8)) {
643            return ERROR_UNSUPPORTED;
644        } else {
645            KeyedVector<String8, String8> headers = mExtraHeaders;
646            if (range_offset > 0 || range_length >= 0) {
647                headers.add(
648                        String8("Range"),
649                        String8(
650                            StringPrintf(
651                                "bytes=%lld-%s",
652                                range_offset,
653                                range_length < 0
654                                    ? "" : StringPrintf("%lld",
655                                            range_offset + range_length - 1).c_str()).c_str()));
656            }
657            status_t err = mHTTPDataSource->connect(url, &headers);
658
659            if (err != OK) {
660                return err;
661            }
662
663            *source = mHTTPDataSource;
664        }
665    }
666
667    status_t getSizeErr = (*source)->getSize(&size);
668    if (getSizeErr != OK) {
669        size = 65536;
670    }
671
672    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
673    if (*out == NULL) {
674        buffer->setRange(0, 0);
675    }
676
677    ssize_t bytesRead = 0;
678    // adjust range_length if only reading partial block
679    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
680        range_length = buffer->size() + block_size;
681    }
682    for (;;) {
683        // Only resize when we don't know the size.
684        size_t bufferRemaining = buffer->capacity() - buffer->size();
685        if (bufferRemaining == 0 && getSizeErr != OK) {
686            bufferRemaining = 32768;
687
688            ALOGV("increasing download buffer to %zu bytes",
689                 buffer->size() + bufferRemaining);
690
691            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
692            memcpy(copy->data(), buffer->data(), buffer->size());
693            copy->setRange(0, buffer->size());
694
695            buffer = copy;
696        }
697
698        size_t maxBytesToRead = bufferRemaining;
699        if (range_length >= 0) {
700            int64_t bytesLeftInRange = range_length - buffer->size();
701            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
702                maxBytesToRead = bytesLeftInRange;
703
704                if (bytesLeftInRange == 0) {
705                    break;
706                }
707            }
708        }
709
710        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
711        // to help us break out of the loop.
712        ssize_t n = (*source)->readAt(
713                buffer->size(), buffer->data() + buffer->size(),
714                maxBytesToRead);
715
716        if (n < 0) {
717            return n;
718        }
719
720        if (n == 0) {
721            break;
722        }
723
724        buffer->setRange(0, buffer->size() + (size_t)n);
725        bytesRead += n;
726    }
727
728    *out = buffer;
729    if (actualUrl != NULL) {
730        *actualUrl = (*source)->getUri();
731        if (actualUrl->isEmpty()) {
732            *actualUrl = url;
733        }
734    }
735
736    return bytesRead;
737}
738
739sp<M3UParser> LiveSession::fetchPlaylist(
740        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
741    ALOGV("fetchPlaylist '%s'", url);
742
743    *unchanged = false;
744
745    sp<ABuffer> buffer;
746    String8 actualUrl;
747    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
748
749    if (err <= 0) {
750        return NULL;
751    }
752
753    // MD5 functionality is not available on the simulator, treat all
754    // playlists as changed.
755
756#if defined(HAVE_ANDROID_OS)
757    uint8_t hash[16];
758
759    MD5_CTX m;
760    MD5_Init(&m);
761    MD5_Update(&m, buffer->data(), buffer->size());
762
763    MD5_Final(hash, &m);
764
765    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
766        // playlist unchanged
767        *unchanged = true;
768
769        return NULL;
770    }
771
772    if (curPlaylistHash != NULL) {
773        memcpy(curPlaylistHash, hash, sizeof(hash));
774    }
775#endif
776
777    sp<M3UParser> playlist =
778        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
779
780    if (playlist->initCheck() != OK) {
781        ALOGE("failed to parse .m3u8 playlist");
782
783        return NULL;
784    }
785
786    return playlist;
787}
788
789static double uniformRand() {
790    return (double)rand() / RAND_MAX;
791}
792
793size_t LiveSession::getBandwidthIndex() {
794    if (mBandwidthItems.size() == 0) {
795        return 0;
796    }
797
798#if 1
799    char value[PROPERTY_VALUE_MAX];
800    ssize_t index = -1;
801    if (property_get("media.httplive.bw-index", value, NULL)) {
802        char *end;
803        index = strtol(value, &end, 10);
804        CHECK(end > value && *end == '\0');
805
806        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
807            index = mBandwidthItems.size() - 1;
808        }
809    }
810
811    if (index < 0) {
812        int32_t bandwidthBps;
813        if (mHTTPDataSource != NULL
814                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
815            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
816        } else {
817            ALOGV("no bandwidth estimate.");
818            return 0;  // Pick the lowest bandwidth stream by default.
819        }
820
821        char value[PROPERTY_VALUE_MAX];
822        if (property_get("media.httplive.max-bw", value, NULL)) {
823            char *end;
824            long maxBw = strtoul(value, &end, 10);
825            if (end > value && *end == '\0') {
826                if (maxBw > 0 && bandwidthBps > maxBw) {
827                    ALOGV("bandwidth capped to %ld bps", maxBw);
828                    bandwidthBps = maxBw;
829                }
830            }
831        }
832
833        // Consider only 80% of the available bandwidth usable.
834        bandwidthBps = (bandwidthBps * 8) / 10;
835
836        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
837
838        index = mBandwidthItems.size() - 1;
839        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
840                                > (size_t)bandwidthBps) {
841            --index;
842        }
843    }
844#elif 0
845    // Change bandwidth at random()
846    size_t index = uniformRand() * mBandwidthItems.size();
847#elif 0
848    // There's a 50% chance to stay on the current bandwidth and
849    // a 50% chance to switch to the next higher bandwidth (wrapping around
850    // to lowest)
851    const size_t kMinIndex = 0;
852
853    static ssize_t mPrevBandwidthIndex = -1;
854
855    size_t index;
856    if (mPrevBandwidthIndex < 0) {
857        index = kMinIndex;
858    } else if (uniformRand() < 0.5) {
859        index = (size_t)mPrevBandwidthIndex;
860    } else {
861        index = mPrevBandwidthIndex + 1;
862        if (index == mBandwidthItems.size()) {
863            index = kMinIndex;
864        }
865    }
866    mPrevBandwidthIndex = index;
867#elif 0
868    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
869
870    size_t index = mBandwidthItems.size() - 1;
871    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
872        --index;
873    }
874#elif 1
875    char value[PROPERTY_VALUE_MAX];
876    size_t index;
877    if (property_get("media.httplive.bw-index", value, NULL)) {
878        char *end;
879        index = strtoul(value, &end, 10);
880        CHECK(end > value && *end == '\0');
881
882        if (index >= mBandwidthItems.size()) {
883            index = mBandwidthItems.size() - 1;
884        }
885    } else {
886        index = 0;
887    }
888#else
889    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
890#endif
891
892    CHECK_GE(index, 0);
893
894    return index;
895}
896
897status_t LiveSession::onSeek(const sp<AMessage> &msg) {
898    int64_t timeUs;
899    CHECK(msg->findInt64("timeUs", &timeUs));
900
901    if (!mReconfigurationInProgress) {
902        changeConfiguration(timeUs, getBandwidthIndex());
903    }
904
905    return OK;
906}
907
908status_t LiveSession::getDuration(int64_t *durationUs) const {
909    int64_t maxDurationUs = 0ll;
910    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
911        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
912
913        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
914            maxDurationUs = fetcherDurationUs;
915        }
916    }
917
918    *durationUs = maxDurationUs;
919
920    return OK;
921}
922
923bool LiveSession::isSeekable() const {
924    int64_t durationUs;
925    return getDuration(&durationUs) == OK && durationUs >= 0;
926}
927
928bool LiveSession::hasDynamicDuration() const {
929    return false;
930}
931
932status_t LiveSession::getTrackInfo(Parcel *reply) const {
933    return mPlaylist->getTrackInfo(reply);
934}
935
936status_t LiveSession::selectTrack(size_t index, bool select) {
937    status_t err = mPlaylist->selectTrack(index, select);
938    if (err == OK) {
939        (new AMessage(kWhatChangeConfiguration, id()))->post();
940    }
941    return err;
942}
943
944bool LiveSession::canSwitchUp() {
945    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
946    status_t err = OK;
947    for (size_t i = 0; i < mPacketSources.size(); ++i) {
948        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
949        int64_t dur = source->getBufferedDurationUs(&err);
950        if (err == OK && dur > 10000000) {
951            return true;
952        }
953    }
954    return false;
955}
956
957void LiveSession::changeConfiguration(
958        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
959    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
960    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
961    cancelBandwidthSwitch();
962
963    CHECK(!mReconfigurationInProgress);
964    mReconfigurationInProgress = true;
965
966    mPrevBandwidthIndex = bandwidthIndex;
967
968    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
969          timeUs, bandwidthIndex, pickTrack);
970
971    if (pickTrack) {
972        mPlaylist->pickRandomMediaItems();
973    }
974
975    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
976    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
977
978    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
979    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
980
981    AString URIs[kMaxStreams];
982    for (size_t i = 0; i < kMaxStreams; ++i) {
983        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
984            streamMask |= indexToType(i);
985        }
986    }
987
988    // Step 1, stop and discard fetchers that are no longer needed.
989    // Pause those that we'll reuse.
990    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
991        const AString &uri = mFetcherInfos.keyAt(i);
992
993        bool discardFetcher = true;
994
995        // If we're seeking all current fetchers are discarded.
996        if (timeUs < 0ll) {
997            // delay fetcher removal
998            discardFetcher = false;
999
1000            for (size_t j = 0; j < kMaxStreams; ++j) {
1001                StreamType type = indexToType(j);
1002                if ((streamMask & type) && uri == URIs[j]) {
1003                    resumeMask |= type;
1004                    streamMask &= ~type;
1005                }
1006            }
1007        }
1008
1009        if (discardFetcher) {
1010            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1011        } else {
1012            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1013        }
1014    }
1015
1016    sp<AMessage> msg;
1017    if (timeUs < 0ll) {
1018        // skip onChangeConfiguration2 (decoder destruction) if switching.
1019        msg = new AMessage(kWhatChangeConfiguration3, id());
1020    } else {
1021        msg = new AMessage(kWhatChangeConfiguration2, id());
1022    }
1023    msg->setInt32("streamMask", streamMask);
1024    msg->setInt32("resumeMask", resumeMask);
1025    msg->setInt64("timeUs", timeUs);
1026    for (size_t i = 0; i < kMaxStreams; ++i) {
1027        if (streamMask & indexToType(i)) {
1028            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1029        }
1030    }
1031
1032    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1033    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1034    // fetchers have completed their asynchronous operation, we'll post
1035    // mContinuation, which then is handled below in onChangeConfiguration2.
1036    mContinuationCounter = mFetcherInfos.size();
1037    mContinuation = msg;
1038
1039    if (mContinuationCounter == 0) {
1040        msg->post();
1041
1042        if (mSeekReplyID != 0) {
1043            CHECK(mSeekReply != NULL);
1044            mSeekReply->postReply(mSeekReplyID);
1045        }
1046    }
1047}
1048
1049void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1050    if (!mReconfigurationInProgress) {
1051        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
1052    } else {
1053        msg->post(1000000ll); // retry in 1 sec
1054    }
1055}
1056
1057void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1058    mContinuation.clear();
1059
1060    // All fetchers are either suspended or have been removed now.
1061
1062    uint32_t streamMask;
1063    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1064
1065    AString URIs[kMaxStreams];
1066    for (size_t i = 0; i < kMaxStreams; ++i) {
1067        if (streamMask & indexToType(i)) {
1068            const AString &uriKey = mStreams[i].uriKey();
1069            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1070            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1071        }
1072    }
1073
1074    // Determine which decoders to shutdown on the player side,
1075    // a decoder has to be shutdown if either
1076    // 1) its streamtype was active before but now longer isn't.
1077    // or
1078    // 2) its streamtype was already active and still is but the URI
1079    //    has changed.
1080    uint32_t changedMask = 0;
1081    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1082        if (((mStreamMask & streamMask & indexToType(i))
1083                && !(URIs[i] == mStreams[i].mUri))
1084                || (mStreamMask & ~streamMask & indexToType(i))) {
1085            changedMask |= indexToType(i);
1086        }
1087    }
1088
1089    if (changedMask == 0) {
1090        // If nothing changed as far as the audio/video decoders
1091        // are concerned we can proceed.
1092        onChangeConfiguration3(msg);
1093        return;
1094    }
1095
1096    // Something changed, inform the player which will shutdown the
1097    // corresponding decoders and will post the reply once that's done.
1098    // Handling the reply will continue executing below in
1099    // onChangeConfiguration3.
1100    sp<AMessage> notify = mNotify->dup();
1101    notify->setInt32("what", kWhatStreamsChanged);
1102    notify->setInt32("changedMask", changedMask);
1103
1104    msg->setWhat(kWhatChangeConfiguration3);
1105    msg->setTarget(id());
1106
1107    notify->setMessage("reply", msg);
1108    notify->post();
1109}
1110
1111void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1112    mContinuation.clear();
1113    // All remaining fetchers are still suspended, the player has shutdown
1114    // any decoders that needed it.
1115
1116    uint32_t streamMask, resumeMask;
1117    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1118    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1119
1120    for (size_t i = 0; i < kMaxStreams; ++i) {
1121        if (streamMask & indexToType(i)) {
1122            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1123        }
1124    }
1125
1126    int64_t timeUs;
1127    bool switching = false;
1128    CHECK(msg->findInt64("timeUs", &timeUs));
1129
1130    if (timeUs < 0ll) {
1131        timeUs = mLastDequeuedTimeUs;
1132        switching = true;
1133    }
1134    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1135
1136    mNewStreamMask = streamMask;
1137
1138    // Of all existing fetchers:
1139    // * Resume fetchers that are still needed and assign them original packet sources.
1140    // * Mark otherwise unneeded fetchers for removal.
1141    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1142    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1143        const AString &uri = mFetcherInfos.keyAt(i);
1144
1145        sp<AnotherPacketSource> sources[kMaxStreams];
1146        for (size_t j = 0; j < kMaxStreams; ++j) {
1147            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1148                sources[j] = mPacketSources.valueFor(indexToType(j));
1149            }
1150        }
1151
1152        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1153        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1154                || sources[kSubtitleIndex] != NULL) {
1155            info.mFetcher->startAsync(
1156                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1157        } else {
1158            info.mToBeRemoved = true;
1159        }
1160    }
1161
1162    // streamMask now only contains the types that need a new fetcher created.
1163
1164    if (streamMask != 0) {
1165        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1166    }
1167
1168    // Find out when the original fetchers have buffered up to and start the new fetchers
1169    // at a later timestamp.
1170    for (size_t i = 0; i < kMaxStreams; i++) {
1171        if (!(indexToType(i) & streamMask)) {
1172            continue;
1173        }
1174
1175        AString uri;
1176        uri = mStreams[i].mUri;
1177
1178        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1179        CHECK(fetcher != NULL);
1180
1181        int32_t latestSeq = -1;
1182        int64_t latestTimeUs = 0ll;
1183        sp<AnotherPacketSource> sources[kMaxStreams];
1184
1185        // TRICKY: looping from i as earlier streams are already removed from streamMask
1186        for (size_t j = i; j < kMaxStreams; ++j) {
1187            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1188                sources[j] = mPacketSources.valueFor(indexToType(j));
1189
1190                if (!switching) {
1191                    sources[j]->clear();
1192                } else {
1193                    int32_t type, seq;
1194                    int64_t srcTimeUs;
1195                    sp<AMessage> meta = sources[j]->getLatestMeta();
1196
1197                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1198                        CHECK(meta->findInt32("seq", &seq));
1199                        if (seq > latestSeq) {
1200                            latestSeq = seq;
1201                        }
1202                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
1203                        if (srcTimeUs > latestTimeUs) {
1204                            latestTimeUs = srcTimeUs;
1205                        }
1206                    }
1207
1208                    sources[j] = mPacketSources2.valueFor(indexToType(j));
1209                    sources[j]->clear();
1210                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1211                    if (extraStreams & indexToType(j)) {
1212                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
1213                    }
1214                }
1215
1216                streamMask &= ~indexToType(j);
1217            }
1218        }
1219
1220        fetcher->startAsync(
1221                sources[kAudioIndex],
1222                sources[kVideoIndex],
1223                sources[kSubtitleIndex],
1224                timeUs,
1225                latestTimeUs /* min start time(us) */,
1226                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
1227    }
1228
1229    // All fetchers have now been started, the configuration change
1230    // has completed.
1231
1232    scheduleCheckBandwidthEvent();
1233
1234    ALOGV("XXX configuration change completed.");
1235    mReconfigurationInProgress = false;
1236    if (switching) {
1237        mSwitchInProgress = true;
1238    } else {
1239        mStreamMask = mNewStreamMask;
1240    }
1241
1242    if (mDisconnectReplyID != 0) {
1243        finishDisconnect();
1244    }
1245}
1246
1247void LiveSession::onSwapped(const sp<AMessage> &msg) {
1248    int32_t switchGeneration;
1249    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1250    if (switchGeneration != mSwitchGeneration) {
1251        return;
1252    }
1253
1254    int32_t stream;
1255    CHECK(msg->findInt32("stream", &stream));
1256    mSwapMask |= stream;
1257    if (mSwapMask != mStreamMask) {
1258        return;
1259    }
1260
1261    // Check if new variant contains extra streams.
1262    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1263    while (extraStreams) {
1264        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1265        swapPacketSource(extraStream);
1266        extraStreams &= ~extraStream;
1267    }
1268
1269    tryToFinishBandwidthSwitch();
1270}
1271
1272// Mark switch done when:
1273//   1. all old buffers are swapped out, AND
1274//   2. all old fetchers are removed.
1275void LiveSession::tryToFinishBandwidthSwitch() {
1276    bool needToRemoveFetchers = false;
1277    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1278        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1279            needToRemoveFetchers = true;
1280            break;
1281        }
1282    }
1283    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
1284        mStreamMask = mNewStreamMask;
1285        mSwitchInProgress = false;
1286        mSwapMask = 0;
1287    }
1288}
1289
1290void LiveSession::scheduleCheckBandwidthEvent() {
1291    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1292    msg->setInt32("generation", mCheckBandwidthGeneration);
1293    msg->post(10000000ll);
1294}
1295
1296void LiveSession::cancelCheckBandwidthEvent() {
1297    ++mCheckBandwidthGeneration;
1298}
1299
1300void LiveSession::cancelBandwidthSwitch() {
1301    Mutex::Autolock lock(mSwapMutex);
1302    mSwitchGeneration++;
1303    mSwitchInProgress = false;
1304    mSwapMask = 0;
1305}
1306
1307bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1308    if (mReconfigurationInProgress || mSwitchInProgress) {
1309        return false;
1310    }
1311
1312    if (mPrevBandwidthIndex < 0) {
1313        return true;
1314    }
1315
1316    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
1317        return false;
1318    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
1319        return canSwitchUp();
1320    } else {
1321        return true;
1322    }
1323}
1324
1325void LiveSession::onCheckBandwidth() {
1326    size_t bandwidthIndex = getBandwidthIndex();
1327    if (canSwitchBandwidthTo(bandwidthIndex)) {
1328        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1329    } else {
1330        scheduleCheckBandwidthEvent();
1331    }
1332
1333    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1334    // schedule another one on return, only an explicit call to
1335    // scheduleCheckBandwidthEvent will do that.
1336    // This ensures that only one configuration change is ongoing at any
1337    // one time, once that completes it'll schedule another check bandwidth
1338    // event.
1339}
1340
1341void LiveSession::postPrepared(status_t err) {
1342    CHECK(mInPreparationPhase);
1343
1344    sp<AMessage> notify = mNotify->dup();
1345    if (err == OK || err == ERROR_END_OF_STREAM) {
1346        notify->setInt32("what", kWhatPrepared);
1347    } else {
1348        notify->setInt32("what", kWhatPreparationFailed);
1349        notify->setInt32("err", err);
1350    }
1351
1352    notify->post();
1353
1354    mInPreparationPhase = false;
1355}
1356
1357}  // namespace android
1358
1359