LiveSession.cpp revision 404fced9bfa8fa423ee210a271ca051ffd1bec13
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "LiveSession.h"
22
23#include "M3UParser.h"
24#include "PlaylistFetcher.h"
25
26#include "include/HTTPBase.h"
27#include "mpeg2ts/AnotherPacketSource.h"
28
29#include <cutils/properties.h>
30#include <media/IMediaHTTPConnection.h>
31#include <media/IMediaHTTPService.h>
32#include <media/stagefright/foundation/hexdump.h>
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/DataSource.h>
37#include <media/stagefright/FileSource.h>
38#include <media/stagefright/MediaErrors.h>
39#include <media/stagefright/MediaHTTP.h>
40#include <media/stagefright/MetaData.h>
41#include <media/stagefright/Utils.h>
42
43#include <utils/Mutex.h>
44
45#include <ctype.h>
46#include <inttypes.h>
47#include <openssl/aes.h>
48#include <openssl/md5.h>
49
50namespace android {
51
52LiveSession::LiveSession(
53        const sp<AMessage> &notify, uint32_t flags,
54        const sp<IMediaHTTPService> &httpService)
55    : mNotify(notify),
56      mFlags(flags),
57      mHTTPService(httpService),
58      mInPreparationPhase(true),
59      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
60      mPrevBandwidthIndex(-1),
61      mStreamMask(0),
62      mNewStreamMask(0),
63      mSwapMask(0),
64      mCheckBandwidthGeneration(0),
65      mSwitchGeneration(0),
66      mLastDequeuedTimeUs(0ll),
67      mRealTimeBaseUs(0ll),
68      mReconfigurationInProgress(false),
69      mSwitchInProgress(false),
70      mDisconnectReplyID(0),
71      mSeekReplyID(0) {
72
73    mStreams[kAudioIndex] = StreamItem("audio");
74    mStreams[kVideoIndex] = StreamItem("video");
75    mStreams[kSubtitleIndex] = StreamItem("subtitles");
76
77    for (size_t i = 0; i < kMaxStreams; ++i) {
78        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
79        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
80    }
81}
82
83LiveSession::~LiveSession() {
84}
85
86sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
87    ABuffer *discontinuity = new ABuffer(0);
88    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
89    discontinuity->meta()->setInt32("swapPacketSource", swap);
90    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
91    discontinuity->meta()->setInt64("timeUs", -1);
92    return discontinuity;
93}
94
95void LiveSession::swapPacketSource(StreamType stream) {
96    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
97    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
98    sp<AnotherPacketSource> tmp = aps;
99    aps = aps2;
100    aps2 = tmp;
101    aps2->clear();
102}
103
104status_t LiveSession::dequeueAccessUnit(
105        StreamType stream, sp<ABuffer> *accessUnit) {
106    if (!(mStreamMask & stream)) {
107        // return -EWOULDBLOCK to avoid halting the decoder
108        // when switching between audio/video and audio only.
109        return -EWOULDBLOCK;
110    }
111
112    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
113
114    status_t finalResult;
115    if (!packetSource->hasBufferAvailable(&finalResult)) {
116        return finalResult == OK ? -EAGAIN : finalResult;
117    }
118
119    status_t err = packetSource->dequeueAccessUnit(accessUnit);
120
121    const char *streamStr;
122    switch (stream) {
123        case STREAMTYPE_AUDIO:
124            streamStr = "audio";
125            break;
126        case STREAMTYPE_VIDEO:
127            streamStr = "video";
128            break;
129        case STREAMTYPE_SUBTITLES:
130            streamStr = "subs";
131            break;
132        default:
133            TRESPASS();
134    }
135
136    if (err == INFO_DISCONTINUITY) {
137        int32_t type;
138        CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
139
140        sp<AMessage> extra;
141        if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
142            extra.clear();
143        }
144
145        ALOGI("[%s] read discontinuity of type %d, extra = %s",
146              streamStr,
147              type,
148              extra == NULL ? "NULL" : extra->debugString().c_str());
149
150        int32_t swap;
151        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
152                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
153                && swap) {
154
155            int32_t switchGeneration;
156            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
157            {
158                Mutex::Autolock lock(mSwapMutex);
159                if (switchGeneration == mSwitchGeneration) {
160                    swapPacketSource(stream);
161                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
162                    msg->setInt32("stream", stream);
163                    msg->setInt32("switchGeneration", switchGeneration);
164                    msg->post();
165                }
166            }
167        }
168    } else if (err == OK) {
169        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
170            int64_t timeUs;
171            CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
172            ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
173
174            mLastDequeuedTimeUs = timeUs;
175            mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
176        } else if (stream == STREAMTYPE_SUBTITLES) {
177            (*accessUnit)->meta()->setInt32(
178                    "trackIndex", mPlaylist->getSelectedIndex());
179            (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
180        }
181    } else {
182        ALOGI("[%s] encountered error %d", streamStr, err);
183    }
184
185    return err;
186}
187
188status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
189    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
190    if (!(mStreamMask & stream)) {
191        return UNKNOWN_ERROR;
192    }
193
194    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
195
196    sp<MetaData> meta = packetSource->getFormat();
197
198    if (meta == NULL) {
199        return -EAGAIN;
200    }
201
202    return convertMetaDataToMessage(meta, format);
203}
204
205void LiveSession::connectAsync(
206        const char *url, const KeyedVector<String8, String8> *headers) {
207    sp<AMessage> msg = new AMessage(kWhatConnect, id());
208    msg->setString("url", url);
209
210    if (headers != NULL) {
211        msg->setPointer(
212                "headers",
213                new KeyedVector<String8, String8>(*headers));
214    }
215
216    msg->post();
217}
218
219status_t LiveSession::disconnect() {
220    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
221
222    sp<AMessage> response;
223    status_t err = msg->postAndAwaitResponse(&response);
224
225    return err;
226}
227
228status_t LiveSession::seekTo(int64_t timeUs) {
229    sp<AMessage> msg = new AMessage(kWhatSeek, id());
230    msg->setInt64("timeUs", timeUs);
231
232    sp<AMessage> response;
233    status_t err = msg->postAndAwaitResponse(&response);
234
235    uint32_t replyID;
236    CHECK(response == mSeekReply && 0 != mSeekReplyID);
237    mSeekReply.clear();
238    mSeekReplyID = 0;
239    return err;
240}
241
242void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
243    switch (msg->what()) {
244        case kWhatConnect:
245        {
246            onConnect(msg);
247            break;
248        }
249
250        case kWhatDisconnect:
251        {
252            CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
253
254            if (mReconfigurationInProgress) {
255                break;
256            }
257
258            finishDisconnect();
259            break;
260        }
261
262        case kWhatSeek:
263        {
264            CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
265
266            status_t err = onSeek(msg);
267
268            mSeekReply = new AMessage;
269            mSeekReply->setInt32("err", err);
270            break;
271        }
272
273        case kWhatFetcherNotify:
274        {
275            int32_t what;
276            CHECK(msg->findInt32("what", &what));
277
278            switch (what) {
279                case PlaylistFetcher::kWhatStarted:
280                    break;
281                case PlaylistFetcher::kWhatPaused:
282                case PlaylistFetcher::kWhatStopped:
283                {
284                    if (what == PlaylistFetcher::kWhatStopped) {
285                        AString uri;
286                        CHECK(msg->findString("uri", &uri));
287                        if (mFetcherInfos.removeItem(uri) < 0) {
288                            // ignore duplicated kWhatStopped messages.
289                            break;
290                        }
291
292                        tryToFinishBandwidthSwitch();
293                    }
294
295                    if (mContinuation != NULL) {
296                        CHECK_GT(mContinuationCounter, 0);
297                        if (--mContinuationCounter == 0) {
298                            mContinuation->post();
299
300                            if (mSeekReplyID != 0) {
301                                CHECK(mSeekReply != NULL);
302                                mSeekReply->postReply(mSeekReplyID);
303                            }
304                        }
305                    }
306                    break;
307                }
308
309                case PlaylistFetcher::kWhatDurationUpdate:
310                {
311                    AString uri;
312                    CHECK(msg->findString("uri", &uri));
313
314                    int64_t durationUs;
315                    CHECK(msg->findInt64("durationUs", &durationUs));
316
317                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
318                    info->mDurationUs = durationUs;
319                    break;
320                }
321
322                case PlaylistFetcher::kWhatError:
323                {
324                    status_t err;
325                    CHECK(msg->findInt32("err", &err));
326
327                    ALOGE("XXX Received error %d from PlaylistFetcher.", err);
328
329                    if (mInPreparationPhase) {
330                        postPrepared(err);
331                    }
332
333                    cancelBandwidthSwitch();
334
335                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
336
337                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
338
339                    mPacketSources.valueFor(
340                            STREAMTYPE_SUBTITLES)->signalEOS(err);
341
342                    sp<AMessage> notify = mNotify->dup();
343                    notify->setInt32("what", kWhatError);
344                    notify->setInt32("err", err);
345                    notify->post();
346                    break;
347                }
348
349                case PlaylistFetcher::kWhatTemporarilyDoneFetching:
350                {
351                    AString uri;
352                    CHECK(msg->findString("uri", &uri));
353
354                    FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
355                    info->mIsPrepared = true;
356
357                    if (mInPreparationPhase) {
358                        bool allFetchersPrepared = true;
359                        for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
360                            if (!mFetcherInfos.valueAt(i).mIsPrepared) {
361                                allFetchersPrepared = false;
362                                break;
363                            }
364                        }
365
366                        if (allFetchersPrepared) {
367                            postPrepared(OK);
368                        }
369                    }
370                    break;
371                }
372
373                case PlaylistFetcher::kWhatStartedAt:
374                {
375                    int32_t switchGeneration;
376                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
377
378                    if (switchGeneration != mSwitchGeneration) {
379                        break;
380                    }
381
382                    // Resume fetcher for the original variant; the resumed fetcher should
383                    // continue until the timestamps found in msg, which is stored by the
384                    // new fetcher to indicate where the new variant has started buffering.
385                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
386                        const FetcherInfo info = mFetcherInfos.valueAt(i);
387                        if (info.mToBeRemoved) {
388                            info.mFetcher->resumeUntilAsync(msg);
389                        }
390                    }
391                    break;
392                }
393
394                default:
395                    TRESPASS();
396            }
397
398            break;
399        }
400
401        case kWhatCheckBandwidth:
402        {
403            int32_t generation;
404            CHECK(msg->findInt32("generation", &generation));
405
406            if (generation != mCheckBandwidthGeneration) {
407                break;
408            }
409
410            onCheckBandwidth();
411            break;
412        }
413
414        case kWhatChangeConfiguration:
415        {
416            onChangeConfiguration(msg);
417            break;
418        }
419
420        case kWhatChangeConfiguration2:
421        {
422            onChangeConfiguration2(msg);
423            break;
424        }
425
426        case kWhatChangeConfiguration3:
427        {
428            onChangeConfiguration3(msg);
429            break;
430        }
431
432        case kWhatFinishDisconnect2:
433        {
434            onFinishDisconnect2();
435            break;
436        }
437
438        case kWhatSwapped:
439        {
440            onSwapped(msg);
441            break;
442        }
443        default:
444            TRESPASS();
445            break;
446    }
447}
448
449// static
450int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
451    if (a->mBandwidth < b->mBandwidth) {
452        return -1;
453    } else if (a->mBandwidth == b->mBandwidth) {
454        return 0;
455    }
456
457    return 1;
458}
459
460// static
461LiveSession::StreamType LiveSession::indexToType(int idx) {
462    CHECK(idx >= 0 && idx < kMaxStreams);
463    return (StreamType)(1 << idx);
464}
465
466void LiveSession::onConnect(const sp<AMessage> &msg) {
467    AString url;
468    CHECK(msg->findString("url", &url));
469
470    KeyedVector<String8, String8> *headers = NULL;
471    if (!msg->findPointer("headers", (void **)&headers)) {
472        mExtraHeaders.clear();
473    } else {
474        mExtraHeaders = *headers;
475
476        delete headers;
477        headers = NULL;
478    }
479
480    // TODO currently we don't know if we are coming here from incognito mode
481    ALOGI("onConnect %s", uriDebugString(url).c_str());
482
483    mMasterURL = url;
484
485    bool dummy;
486    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
487
488    if (mPlaylist == NULL) {
489        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
490
491        postPrepared(ERROR_IO);
492        return;
493    }
494
495    // We trust the content provider to make a reasonable choice of preferred
496    // initial bandwidth by listing it first in the variant playlist.
497    // At startup we really don't have a good estimate on the available
498    // network bandwidth since we haven't tranferred any data yet. Once
499    // we have we can make a better informed choice.
500    size_t initialBandwidth = 0;
501    size_t initialBandwidthIndex = 0;
502
503    if (mPlaylist->isVariantPlaylist()) {
504        for (size_t i = 0; i < mPlaylist->size(); ++i) {
505            BandwidthItem item;
506
507            item.mPlaylistIndex = i;
508
509            sp<AMessage> meta;
510            AString uri;
511            mPlaylist->itemAt(i, &uri, &meta);
512
513            unsigned long bandwidth;
514            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
515
516            if (initialBandwidth == 0) {
517                initialBandwidth = item.mBandwidth;
518            }
519
520            mBandwidthItems.push(item);
521        }
522
523        CHECK_GT(mBandwidthItems.size(), 0u);
524
525        mBandwidthItems.sort(SortByBandwidth);
526
527        for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
528            if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
529                initialBandwidthIndex = i;
530                break;
531            }
532        }
533    } else {
534        // dummy item.
535        BandwidthItem item;
536        item.mPlaylistIndex = 0;
537        item.mBandwidth = 0;
538        mBandwidthItems.push(item);
539    }
540
541    changeConfiguration(
542            0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
543}
544
545void LiveSession::finishDisconnect() {
546    // No reconfiguration is currently pending, make sure none will trigger
547    // during disconnection either.
548    cancelCheckBandwidthEvent();
549
550    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
551    // (finishDisconnect, onFinishDisconnect2)
552    cancelBandwidthSwitch();
553
554    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
555        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
556    }
557
558    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
559
560    mContinuationCounter = mFetcherInfos.size();
561    mContinuation = msg;
562
563    if (mContinuationCounter == 0) {
564        msg->post();
565    }
566}
567
568void LiveSession::onFinishDisconnect2() {
569    mContinuation.clear();
570
571    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
572    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
573
574    mPacketSources.valueFor(
575            STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
576
577    sp<AMessage> response = new AMessage;
578    response->setInt32("err", OK);
579
580    response->postReply(mDisconnectReplyID);
581    mDisconnectReplyID = 0;
582}
583
584sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
585    ssize_t index = mFetcherInfos.indexOfKey(uri);
586
587    if (index >= 0) {
588        return NULL;
589    }
590
591    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
592    notify->setString("uri", uri);
593    notify->setInt32("switchGeneration", mSwitchGeneration);
594
595    FetcherInfo info;
596    info.mFetcher = new PlaylistFetcher(notify, this, uri);
597    info.mDurationUs = -1ll;
598    info.mIsPrepared = false;
599    info.mToBeRemoved = false;
600    looper()->registerHandler(info.mFetcher);
601
602    mFetcherInfos.add(uri, info);
603
604    return info.mFetcher;
605}
606
607/*
608 * Illustration of parameters:
609 *
610 * 0      `range_offset`
611 * +------------+-------------------------------------------------------+--+--+
612 * |            |                                 | next block to fetch |  |  |
613 * |            | `source` handle => `out` buffer |                     |  |  |
614 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
615 * |            |<----------- `range_length` / buffer capacity ----------->|  |
616 * |<------------------------------ file_size ------------------------------->|
617 *
618 * Special parameter values:
619 * - range_length == -1 means entire file
620 * - block_size == 0 means entire range
621 *
622 */
623ssize_t LiveSession::fetchFile(
624        const char *url, sp<ABuffer> *out,
625        int64_t range_offset, int64_t range_length,
626        uint32_t block_size, /* download block size */
627        sp<DataSource> *source, /* to return and reuse source */
628        String8 *actualUrl) {
629    off64_t size;
630    sp<DataSource> temp_source;
631    if (source == NULL) {
632        source = &temp_source;
633    }
634
635    if (*source == NULL) {
636        if (!strncasecmp(url, "file://", 7)) {
637            *source = new FileSource(url + 7);
638        } else if (strncasecmp(url, "http://", 7)
639                && strncasecmp(url, "https://", 8)) {
640            return ERROR_UNSUPPORTED;
641        } else {
642            KeyedVector<String8, String8> headers = mExtraHeaders;
643            if (range_offset > 0 || range_length >= 0) {
644                headers.add(
645                        String8("Range"),
646                        String8(
647                            StringPrintf(
648                                "bytes=%lld-%s",
649                                range_offset,
650                                range_length < 0
651                                    ? "" : StringPrintf("%lld",
652                                            range_offset + range_length - 1).c_str()).c_str()));
653            }
654            status_t err = mHTTPDataSource->connect(url, &headers);
655
656            if (err != OK) {
657                return err;
658            }
659
660            *source = mHTTPDataSource;
661        }
662    }
663
664    status_t getSizeErr = (*source)->getSize(&size);
665    if (getSizeErr != OK) {
666        size = 65536;
667    }
668
669    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
670    if (*out == NULL) {
671        buffer->setRange(0, 0);
672    }
673
674    ssize_t bytesRead = 0;
675    // adjust range_length if only reading partial block
676    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
677        range_length = buffer->size() + block_size;
678    }
679    for (;;) {
680        // Only resize when we don't know the size.
681        size_t bufferRemaining = buffer->capacity() - buffer->size();
682        if (bufferRemaining == 0 && getSizeErr != OK) {
683            bufferRemaining = 32768;
684
685            ALOGV("increasing download buffer to %zu bytes",
686                 buffer->size() + bufferRemaining);
687
688            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
689            memcpy(copy->data(), buffer->data(), buffer->size());
690            copy->setRange(0, buffer->size());
691
692            buffer = copy;
693        }
694
695        size_t maxBytesToRead = bufferRemaining;
696        if (range_length >= 0) {
697            int64_t bytesLeftInRange = range_length - buffer->size();
698            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
699                maxBytesToRead = bytesLeftInRange;
700
701                if (bytesLeftInRange == 0) {
702                    break;
703                }
704            }
705        }
706
707        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
708        // to help us break out of the loop.
709        ssize_t n = (*source)->readAt(
710                buffer->size(), buffer->data() + buffer->size(),
711                maxBytesToRead);
712
713        if (n < 0) {
714            return n;
715        }
716
717        if (n == 0) {
718            break;
719        }
720
721        buffer->setRange(0, buffer->size() + (size_t)n);
722        bytesRead += n;
723    }
724
725    *out = buffer;
726    if (actualUrl != NULL) {
727        *actualUrl = (*source)->getUri();
728        if (actualUrl->isEmpty()) {
729            *actualUrl = url;
730        }
731    }
732
733    return bytesRead;
734}
735
736sp<M3UParser> LiveSession::fetchPlaylist(
737        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
738    ALOGV("fetchPlaylist '%s'", url);
739
740    *unchanged = false;
741
742    sp<ABuffer> buffer;
743    String8 actualUrl;
744    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
745
746    if (err <= 0) {
747        return NULL;
748    }
749
750    // MD5 functionality is not available on the simulator, treat all
751    // playlists as changed.
752
753#if defined(HAVE_ANDROID_OS)
754    uint8_t hash[16];
755
756    MD5_CTX m;
757    MD5_Init(&m);
758    MD5_Update(&m, buffer->data(), buffer->size());
759
760    MD5_Final(hash, &m);
761
762    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
763        // playlist unchanged
764        *unchanged = true;
765
766        return NULL;
767    }
768
769    if (curPlaylistHash != NULL) {
770        memcpy(curPlaylistHash, hash, sizeof(hash));
771    }
772#endif
773
774    sp<M3UParser> playlist =
775        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
776
777    if (playlist->initCheck() != OK) {
778        ALOGE("failed to parse .m3u8 playlist");
779
780        return NULL;
781    }
782
783    return playlist;
784}
785
786static double uniformRand() {
787    return (double)rand() / RAND_MAX;
788}
789
790size_t LiveSession::getBandwidthIndex() {
791    if (mBandwidthItems.size() == 0) {
792        return 0;
793    }
794
795#if 1
796    char value[PROPERTY_VALUE_MAX];
797    ssize_t index = -1;
798    if (property_get("media.httplive.bw-index", value, NULL)) {
799        char *end;
800        index = strtol(value, &end, 10);
801        CHECK(end > value && *end == '\0');
802
803        if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
804            index = mBandwidthItems.size() - 1;
805        }
806    }
807
808    if (index < 0) {
809        int32_t bandwidthBps;
810        if (mHTTPDataSource != NULL
811                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
812            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
813        } else {
814            ALOGV("no bandwidth estimate.");
815            return 0;  // Pick the lowest bandwidth stream by default.
816        }
817
818        char value[PROPERTY_VALUE_MAX];
819        if (property_get("media.httplive.max-bw", value, NULL)) {
820            char *end;
821            long maxBw = strtoul(value, &end, 10);
822            if (end > value && *end == '\0') {
823                if (maxBw > 0 && bandwidthBps > maxBw) {
824                    ALOGV("bandwidth capped to %ld bps", maxBw);
825                    bandwidthBps = maxBw;
826                }
827            }
828        }
829
830        // Consider only 80% of the available bandwidth usable.
831        bandwidthBps = (bandwidthBps * 8) / 10;
832
833        // Pick the highest bandwidth stream below or equal to estimated bandwidth.
834
835        index = mBandwidthItems.size() - 1;
836        while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
837                                > (size_t)bandwidthBps) {
838            --index;
839        }
840    }
841#elif 0
842    // Change bandwidth at random()
843    size_t index = uniformRand() * mBandwidthItems.size();
844#elif 0
845    // There's a 50% chance to stay on the current bandwidth and
846    // a 50% chance to switch to the next higher bandwidth (wrapping around
847    // to lowest)
848    const size_t kMinIndex = 0;
849
850    static ssize_t mPrevBandwidthIndex = -1;
851
852    size_t index;
853    if (mPrevBandwidthIndex < 0) {
854        index = kMinIndex;
855    } else if (uniformRand() < 0.5) {
856        index = (size_t)mPrevBandwidthIndex;
857    } else {
858        index = mPrevBandwidthIndex + 1;
859        if (index == mBandwidthItems.size()) {
860            index = kMinIndex;
861        }
862    }
863    mPrevBandwidthIndex = index;
864#elif 0
865    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
866
867    size_t index = mBandwidthItems.size() - 1;
868    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
869        --index;
870    }
871#elif 1
872    char value[PROPERTY_VALUE_MAX];
873    size_t index;
874    if (property_get("media.httplive.bw-index", value, NULL)) {
875        char *end;
876        index = strtoul(value, &end, 10);
877        CHECK(end > value && *end == '\0');
878
879        if (index >= mBandwidthItems.size()) {
880            index = mBandwidthItems.size() - 1;
881        }
882    } else {
883        index = 0;
884    }
885#else
886    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
887#endif
888
889    CHECK_GE(index, 0);
890
891    return index;
892}
893
894status_t LiveSession::onSeek(const sp<AMessage> &msg) {
895    int64_t timeUs;
896    CHECK(msg->findInt64("timeUs", &timeUs));
897
898    if (!mReconfigurationInProgress) {
899        changeConfiguration(timeUs, getBandwidthIndex());
900    }
901
902    return OK;
903}
904
905status_t LiveSession::getDuration(int64_t *durationUs) const {
906    int64_t maxDurationUs = 0ll;
907    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
908        int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
909
910        if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
911            maxDurationUs = fetcherDurationUs;
912        }
913    }
914
915    *durationUs = maxDurationUs;
916
917    return OK;
918}
919
920bool LiveSession::isSeekable() const {
921    int64_t durationUs;
922    return getDuration(&durationUs) == OK && durationUs >= 0;
923}
924
925bool LiveSession::hasDynamicDuration() const {
926    return false;
927}
928
929size_t LiveSession::getTrackCount() const {
930    return mPlaylist->getTrackCount();
931}
932
933sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
934    return mPlaylist->getTrackInfo(trackIndex);
935}
936
937status_t LiveSession::selectTrack(size_t index, bool select) {
938    status_t err = mPlaylist->selectTrack(index, select);
939    if (err == OK) {
940        (new AMessage(kWhatChangeConfiguration, id()))->post();
941    }
942    return err;
943}
944
945bool LiveSession::canSwitchUp() {
946    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
947    status_t err = OK;
948    for (size_t i = 0; i < mPacketSources.size(); ++i) {
949        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
950        int64_t dur = source->getBufferedDurationUs(&err);
951        if (err == OK && dur > 10000000) {
952            return true;
953        }
954    }
955    return false;
956}
957
958void LiveSession::changeConfiguration(
959        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
960    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
961    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
962    cancelBandwidthSwitch();
963
964    CHECK(!mReconfigurationInProgress);
965    mReconfigurationInProgress = true;
966
967    mPrevBandwidthIndex = bandwidthIndex;
968
969    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
970          timeUs, bandwidthIndex, pickTrack);
971
972    if (pickTrack) {
973        mPlaylist->pickRandomMediaItems();
974    }
975
976    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
977    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
978
979    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
980    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
981
982    AString URIs[kMaxStreams];
983    for (size_t i = 0; i < kMaxStreams; ++i) {
984        if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
985            streamMask |= indexToType(i);
986        }
987    }
988
989    // Step 1, stop and discard fetchers that are no longer needed.
990    // Pause those that we'll reuse.
991    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
992        const AString &uri = mFetcherInfos.keyAt(i);
993
994        bool discardFetcher = true;
995
996        // If we're seeking all current fetchers are discarded.
997        if (timeUs < 0ll) {
998            // delay fetcher removal
999            discardFetcher = false;
1000
1001            for (size_t j = 0; j < kMaxStreams; ++j) {
1002                StreamType type = indexToType(j);
1003                if ((streamMask & type) && uri == URIs[j]) {
1004                    resumeMask |= type;
1005                    streamMask &= ~type;
1006                }
1007            }
1008        }
1009
1010        if (discardFetcher) {
1011            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1012        } else {
1013            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
1014        }
1015    }
1016
1017    sp<AMessage> msg;
1018    if (timeUs < 0ll) {
1019        // skip onChangeConfiguration2 (decoder destruction) if switching.
1020        msg = new AMessage(kWhatChangeConfiguration3, id());
1021    } else {
1022        msg = new AMessage(kWhatChangeConfiguration2, id());
1023    }
1024    msg->setInt32("streamMask", streamMask);
1025    msg->setInt32("resumeMask", resumeMask);
1026    msg->setInt64("timeUs", timeUs);
1027    for (size_t i = 0; i < kMaxStreams; ++i) {
1028        if (streamMask & indexToType(i)) {
1029            msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1030        }
1031    }
1032
1033    // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1034    // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1035    // fetchers have completed their asynchronous operation, we'll post
1036    // mContinuation, which then is handled below in onChangeConfiguration2.
1037    mContinuationCounter = mFetcherInfos.size();
1038    mContinuation = msg;
1039
1040    if (mContinuationCounter == 0) {
1041        msg->post();
1042
1043        if (mSeekReplyID != 0) {
1044            CHECK(mSeekReply != NULL);
1045            mSeekReply->postReply(mSeekReplyID);
1046        }
1047    }
1048}
1049
1050void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1051    if (!mReconfigurationInProgress) {
1052        changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
1053    } else {
1054        msg->post(1000000ll); // retry in 1 sec
1055    }
1056}
1057
1058void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1059    mContinuation.clear();
1060
1061    // All fetchers are either suspended or have been removed now.
1062
1063    uint32_t streamMask;
1064    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1065
1066    AString URIs[kMaxStreams];
1067    for (size_t i = 0; i < kMaxStreams; ++i) {
1068        if (streamMask & indexToType(i)) {
1069            const AString &uriKey = mStreams[i].uriKey();
1070            CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1071            ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1072        }
1073    }
1074
1075    // Determine which decoders to shutdown on the player side,
1076    // a decoder has to be shutdown if either
1077    // 1) its streamtype was active before but now longer isn't.
1078    // or
1079    // 2) its streamtype was already active and still is but the URI
1080    //    has changed.
1081    uint32_t changedMask = 0;
1082    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1083        if (((mStreamMask & streamMask & indexToType(i))
1084                && !(URIs[i] == mStreams[i].mUri))
1085                || (mStreamMask & ~streamMask & indexToType(i))) {
1086            changedMask |= indexToType(i);
1087        }
1088    }
1089
1090    if (changedMask == 0) {
1091        // If nothing changed as far as the audio/video decoders
1092        // are concerned we can proceed.
1093        onChangeConfiguration3(msg);
1094        return;
1095    }
1096
1097    // Something changed, inform the player which will shutdown the
1098    // corresponding decoders and will post the reply once that's done.
1099    // Handling the reply will continue executing below in
1100    // onChangeConfiguration3.
1101    sp<AMessage> notify = mNotify->dup();
1102    notify->setInt32("what", kWhatStreamsChanged);
1103    notify->setInt32("changedMask", changedMask);
1104
1105    msg->setWhat(kWhatChangeConfiguration3);
1106    msg->setTarget(id());
1107
1108    notify->setMessage("reply", msg);
1109    notify->post();
1110}
1111
1112void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1113    mContinuation.clear();
1114    // All remaining fetchers are still suspended, the player has shutdown
1115    // any decoders that needed it.
1116
1117    uint32_t streamMask, resumeMask;
1118    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1119    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1120
1121    for (size_t i = 0; i < kMaxStreams; ++i) {
1122        if (streamMask & indexToType(i)) {
1123            CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1124        }
1125    }
1126
1127    int64_t timeUs;
1128    bool switching = false;
1129    CHECK(msg->findInt64("timeUs", &timeUs));
1130
1131    if (timeUs < 0ll) {
1132        timeUs = mLastDequeuedTimeUs;
1133        switching = true;
1134    }
1135    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1136
1137    mNewStreamMask = streamMask;
1138
1139    // Of all existing fetchers:
1140    // * Resume fetchers that are still needed and assign them original packet sources.
1141    // * Mark otherwise unneeded fetchers for removal.
1142    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1143    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1144        const AString &uri = mFetcherInfos.keyAt(i);
1145
1146        sp<AnotherPacketSource> sources[kMaxStreams];
1147        for (size_t j = 0; j < kMaxStreams; ++j) {
1148            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
1149                sources[j] = mPacketSources.valueFor(indexToType(j));
1150            }
1151        }
1152
1153        FetcherInfo &info = mFetcherInfos.editValueAt(i);
1154        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
1155                || sources[kSubtitleIndex] != NULL) {
1156            info.mFetcher->startAsync(
1157                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
1158        } else {
1159            info.mToBeRemoved = true;
1160        }
1161    }
1162
1163    // streamMask now only contains the types that need a new fetcher created.
1164
1165    if (streamMask != 0) {
1166        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1167    }
1168
1169    // Find out when the original fetchers have buffered up to and start the new fetchers
1170    // at a later timestamp.
1171    for (size_t i = 0; i < kMaxStreams; i++) {
1172        if (!(indexToType(i) & streamMask)) {
1173            continue;
1174        }
1175
1176        AString uri;
1177        uri = mStreams[i].mUri;
1178
1179        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1180        CHECK(fetcher != NULL);
1181
1182        int32_t latestSeq = -1;
1183        int64_t latestTimeUs = 0ll;
1184        sp<AnotherPacketSource> sources[kMaxStreams];
1185
1186        // TRICKY: looping from i as earlier streams are already removed from streamMask
1187        for (size_t j = i; j < kMaxStreams; ++j) {
1188            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
1189                sources[j] = mPacketSources.valueFor(indexToType(j));
1190
1191                if (!switching) {
1192                    sources[j]->clear();
1193                } else {
1194                    int32_t type, seq;
1195                    int64_t srcTimeUs;
1196                    sp<AMessage> meta = sources[j]->getLatestMeta();
1197
1198                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
1199                        CHECK(meta->findInt32("seq", &seq));
1200                        if (seq > latestSeq) {
1201                            latestSeq = seq;
1202                        }
1203                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
1204                        if (srcTimeUs > latestTimeUs) {
1205                            latestTimeUs = srcTimeUs;
1206                        }
1207                    }
1208
1209                    sources[j] = mPacketSources2.valueFor(indexToType(j));
1210                    sources[j]->clear();
1211                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1212                    if (extraStreams & indexToType(j)) {
1213                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
1214                    }
1215                }
1216
1217                streamMask &= ~indexToType(j);
1218            }
1219        }
1220
1221        fetcher->startAsync(
1222                sources[kAudioIndex],
1223                sources[kVideoIndex],
1224                sources[kSubtitleIndex],
1225                timeUs,
1226                latestTimeUs /* min start time(us) */,
1227                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
1228    }
1229
1230    // All fetchers have now been started, the configuration change
1231    // has completed.
1232
1233    scheduleCheckBandwidthEvent();
1234
1235    ALOGV("XXX configuration change completed.");
1236    mReconfigurationInProgress = false;
1237    if (switching) {
1238        mSwitchInProgress = true;
1239    } else {
1240        mStreamMask = mNewStreamMask;
1241    }
1242
1243    if (mDisconnectReplyID != 0) {
1244        finishDisconnect();
1245    }
1246}
1247
1248void LiveSession::onSwapped(const sp<AMessage> &msg) {
1249    int32_t switchGeneration;
1250    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
1251    if (switchGeneration != mSwitchGeneration) {
1252        return;
1253    }
1254
1255    int32_t stream;
1256    CHECK(msg->findInt32("stream", &stream));
1257    mSwapMask |= stream;
1258    if (mSwapMask != mStreamMask) {
1259        return;
1260    }
1261
1262    // Check if new variant contains extra streams.
1263    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
1264    while (extraStreams) {
1265        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
1266        swapPacketSource(extraStream);
1267        extraStreams &= ~extraStream;
1268    }
1269
1270    tryToFinishBandwidthSwitch();
1271}
1272
1273// Mark switch done when:
1274//   1. all old buffers are swapped out, AND
1275//   2. all old fetchers are removed.
1276void LiveSession::tryToFinishBandwidthSwitch() {
1277    bool needToRemoveFetchers = false;
1278    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1279        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
1280            needToRemoveFetchers = true;
1281            break;
1282        }
1283    }
1284    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
1285        mStreamMask = mNewStreamMask;
1286        mSwitchInProgress = false;
1287        mSwapMask = 0;
1288    }
1289}
1290
1291void LiveSession::scheduleCheckBandwidthEvent() {
1292    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
1293    msg->setInt32("generation", mCheckBandwidthGeneration);
1294    msg->post(10000000ll);
1295}
1296
1297void LiveSession::cancelCheckBandwidthEvent() {
1298    ++mCheckBandwidthGeneration;
1299}
1300
1301void LiveSession::cancelBandwidthSwitch() {
1302    Mutex::Autolock lock(mSwapMutex);
1303    mSwitchGeneration++;
1304    mSwitchInProgress = false;
1305    mSwapMask = 0;
1306}
1307
1308bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
1309    if (mReconfigurationInProgress || mSwitchInProgress) {
1310        return false;
1311    }
1312
1313    if (mPrevBandwidthIndex < 0) {
1314        return true;
1315    }
1316
1317    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
1318        return false;
1319    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
1320        return canSwitchUp();
1321    } else {
1322        return true;
1323    }
1324}
1325
1326void LiveSession::onCheckBandwidth() {
1327    size_t bandwidthIndex = getBandwidthIndex();
1328    if (canSwitchBandwidthTo(bandwidthIndex)) {
1329        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
1330    } else {
1331        scheduleCheckBandwidthEvent();
1332    }
1333
1334    // Handling the kWhatCheckBandwidth even here does _not_ automatically
1335    // schedule another one on return, only an explicit call to
1336    // scheduleCheckBandwidthEvent will do that.
1337    // This ensures that only one configuration change is ongoing at any
1338    // one time, once that completes it'll schedule another check bandwidth
1339    // event.
1340}
1341
1342void LiveSession::postPrepared(status_t err) {
1343    CHECK(mInPreparationPhase);
1344
1345    sp<AMessage> notify = mNotify->dup();
1346    if (err == OK || err == ERROR_END_OF_STREAM) {
1347        notify->setInt32("what", kWhatPrepared);
1348    } else {
1349        notify->setInt32("what", kWhatPreparationFailed);
1350        notify->setInt32("err", err);
1351    }
1352
1353    notify->post();
1354
1355    mInPreparationPhase = false;
1356}
1357
1358}  // namespace android
1359
1360