LiveSession.cpp revision 0df36ec3303c2c6bf9b42c07945ac8bd234153f3
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/HTTPBase.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39#include <openssl/md5.h>
40
41namespace android {
42
43LiveSession::LiveSession(
44        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
45    : mNotify(notify),
46      mFlags(flags),
47      mUIDValid(uidValid),
48      mUID(uid),
49      mInPreparationPhase(true),
50      mDataSource(new LiveDataSource),
51      mHTTPDataSource(
52              HTTPBase::Create(
53                  (mFlags & kFlagIncognito)
54                    ? HTTPBase::kFlagIncognito
55                    : 0)),
56      mPrevBandwidthIndex(-1),
57      mLastPlaylistFetchTimeUs(-1),
58      mSeqNumber(-1),
59      mSeekTimeUs(-1),
60      mNumRetries(0),
61      mStartOfPlayback(true),
62      mDurationUs(-1),
63      mDurationFixed(false),
64      mSeekDone(false),
65      mDisconnectPending(false),
66      mMonitorQueueGeneration(0),
67      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY) {
68    if (mUIDValid) {
69        mHTTPDataSource->setUID(mUID);
70    }
71}
72
73LiveSession::~LiveSession() {
74}
75
76sp<DataSource> LiveSession::getDataSource() {
77    return mDataSource;
78}
79
80void LiveSession::connect(
81        const char *url, const KeyedVector<String8, String8> *headers) {
82    sp<AMessage> msg = new AMessage(kWhatConnect, id());
83    msg->setString("url", url);
84
85    if (headers != NULL) {
86        msg->setPointer(
87                "headers",
88                new KeyedVector<String8, String8>(*headers));
89    }
90
91    msg->post();
92}
93
94void LiveSession::disconnect() {
95    Mutex::Autolock autoLock(mLock);
96    mDisconnectPending = true;
97
98    mHTTPDataSource->disconnect();
99
100    (new AMessage(kWhatDisconnect, id()))->post();
101}
102
103void LiveSession::seekTo(int64_t timeUs) {
104    Mutex::Autolock autoLock(mLock);
105    mSeekDone = false;
106
107    sp<AMessage> msg = new AMessage(kWhatSeek, id());
108    msg->setInt64("timeUs", timeUs);
109    msg->post();
110
111    while (!mSeekDone) {
112        mCondition.wait(mLock);
113    }
114}
115
116void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
117    switch (msg->what()) {
118        case kWhatConnect:
119            onConnect(msg);
120            break;
121
122        case kWhatDisconnect:
123            onDisconnect();
124            break;
125
126        case kWhatMonitorQueue:
127        {
128            int32_t generation;
129            CHECK(msg->findInt32("generation", &generation));
130
131            if (generation != mMonitorQueueGeneration) {
132                // Stale event
133                break;
134            }
135
136            onMonitorQueue();
137            break;
138        }
139
140        case kWhatSeek:
141            onSeek(msg);
142            break;
143
144        default:
145            TRESPASS();
146            break;
147    }
148}
149
150// static
151int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
152    if (a->mBandwidth < b->mBandwidth) {
153        return -1;
154    } else if (a->mBandwidth == b->mBandwidth) {
155        return 0;
156    }
157
158    return 1;
159}
160
161void LiveSession::onConnect(const sp<AMessage> &msg) {
162    AString url;
163    CHECK(msg->findString("url", &url));
164
165    KeyedVector<String8, String8> *headers = NULL;
166    if (!msg->findPointer("headers", (void **)&headers)) {
167        mExtraHeaders.clear();
168    } else {
169        mExtraHeaders = *headers;
170
171        delete headers;
172        headers = NULL;
173    }
174
175    ALOGI("onConnect <URL suppressed>");
176
177    mMasterURL = url;
178
179    bool dummy;
180    sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &dummy);
181
182    if (playlist == NULL) {
183        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
184
185        signalEOS(ERROR_IO);
186        return;
187    }
188
189    if (playlist->isVariantPlaylist()) {
190        for (size_t i = 0; i < playlist->size(); ++i) {
191            BandwidthItem item;
192
193            sp<AMessage> meta;
194            playlist->itemAt(i, &item.mURI, &meta);
195
196            unsigned long bandwidth;
197            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
198
199            mBandwidthItems.push(item);
200        }
201
202        CHECK_GT(mBandwidthItems.size(), 0u);
203
204        mBandwidthItems.sort(SortByBandwidth);
205    }
206
207    postMonitorQueue();
208}
209
210void LiveSession::onDisconnect() {
211    ALOGI("onDisconnect");
212
213    signalEOS(ERROR_END_OF_STREAM);
214
215    Mutex::Autolock autoLock(mLock);
216    mDisconnectPending = false;
217}
218
219status_t LiveSession::fetchFile(
220        const char *url, sp<ABuffer> *out,
221        int64_t range_offset, int64_t range_length) {
222    *out = NULL;
223
224    sp<DataSource> source;
225
226    if (!strncasecmp(url, "file://", 7)) {
227        source = new FileSource(url + 7);
228    } else if (strncasecmp(url, "http://", 7)
229            && strncasecmp(url, "https://", 8)) {
230        return ERROR_UNSUPPORTED;
231    } else {
232        {
233            Mutex::Autolock autoLock(mLock);
234
235            if (mDisconnectPending) {
236                return ERROR_IO;
237            }
238        }
239
240        KeyedVector<String8, String8> headers = mExtraHeaders;
241        if (range_offset > 0 || range_length >= 0) {
242            headers.add(
243                    String8("Range"),
244                    String8(
245                        StringPrintf(
246                            "bytes=%lld-%s",
247                            range_offset,
248                            range_length < 0
249                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
250        }
251        status_t err = mHTTPDataSource->connect(url, &headers);
252
253        if (err != OK) {
254            return err;
255        }
256
257        source = mHTTPDataSource;
258    }
259
260    off64_t size;
261    status_t err = source->getSize(&size);
262
263    if (err != OK) {
264        size = 65536;
265    }
266
267    sp<ABuffer> buffer = new ABuffer(size);
268    buffer->setRange(0, 0);
269
270    for (;;) {
271        size_t bufferRemaining = buffer->capacity() - buffer->size();
272
273        if (bufferRemaining == 0) {
274            bufferRemaining = 32768;
275
276            ALOGV("increasing download buffer to %d bytes",
277                 buffer->size() + bufferRemaining);
278
279            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
280            memcpy(copy->data(), buffer->data(), buffer->size());
281            copy->setRange(0, buffer->size());
282
283            buffer = copy;
284        }
285
286        size_t maxBytesToRead = bufferRemaining;
287        if (range_length >= 0) {
288            int64_t bytesLeftInRange = range_length - buffer->size();
289            if (bytesLeftInRange < maxBytesToRead) {
290                maxBytesToRead = bytesLeftInRange;
291
292                if (bytesLeftInRange == 0) {
293                    break;
294                }
295            }
296        }
297
298        ssize_t n = source->readAt(
299                buffer->size(), buffer->data() + buffer->size(),
300                maxBytesToRead);
301
302        if (n < 0) {
303            return n;
304        }
305
306        if (n == 0) {
307            break;
308        }
309
310        buffer->setRange(0, buffer->size() + (size_t)n);
311    }
312
313    *out = buffer;
314
315    return OK;
316}
317
318sp<M3UParser> LiveSession::fetchPlaylist(const char *url, bool *unchanged) {
319    ALOGV("fetchPlaylist '%s'", url);
320
321    *unchanged = false;
322
323    sp<ABuffer> buffer;
324    status_t err = fetchFile(url, &buffer);
325
326    if (err != OK) {
327        return NULL;
328    }
329
330    // MD5 functionality is not available on the simulator, treat all
331    // playlists as changed.
332
333#if defined(HAVE_ANDROID_OS)
334    uint8_t hash[16];
335
336    MD5_CTX m;
337    MD5_Init(&m);
338    MD5_Update(&m, buffer->data(), buffer->size());
339
340    MD5_Final(hash, &m);
341
342    if (mPlaylist != NULL && !memcmp(hash, mPlaylistHash, 16)) {
343        // playlist unchanged
344
345        if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
346            mRefreshState = (RefreshState)(mRefreshState + 1);
347        }
348
349        *unchanged = true;
350
351        ALOGV("Playlist unchanged, refresh state is now %d",
352             (int)mRefreshState);
353
354        return NULL;
355    }
356
357    memcpy(mPlaylistHash, hash, sizeof(hash));
358
359    mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
360#endif
361
362    sp<M3UParser> playlist =
363        new M3UParser(url, buffer->data(), buffer->size());
364
365    if (playlist->initCheck() != OK) {
366        ALOGE("failed to parse .m3u8 playlist");
367
368        return NULL;
369    }
370
371    return playlist;
372}
373
374int64_t LiveSession::getSegmentStartTimeUs(int32_t seqNumber) const {
375    CHECK(mPlaylist != NULL);
376
377    int32_t firstSeqNumberInPlaylist;
378    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
379                "media-sequence", &firstSeqNumberInPlaylist)) {
380        firstSeqNumberInPlaylist = 0;
381    }
382
383    int32_t lastSeqNumberInPlaylist =
384        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
385
386    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
387    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
388
389    int64_t segmentStartUs = 0ll;
390    for (int32_t index = 0;
391            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
392        sp<AMessage> itemMeta;
393        CHECK(mPlaylist->itemAt(
394                    index, NULL /* uri */, &itemMeta));
395
396        int64_t itemDurationUs;
397        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
398
399        segmentStartUs += itemDurationUs;
400    }
401
402    return segmentStartUs;
403}
404
405static double uniformRand() {
406    return (double)rand() / RAND_MAX;
407}
408
409size_t LiveSession::getBandwidthIndex() {
410    if (mBandwidthItems.size() == 0) {
411        return 0;
412    }
413
414#if 1
415    int32_t bandwidthBps;
416    if (mHTTPDataSource != NULL
417            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
418        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
419    } else {
420        ALOGV("no bandwidth estimate.");
421        return 0;  // Pick the lowest bandwidth stream by default.
422    }
423
424    char value[PROPERTY_VALUE_MAX];
425    if (property_get("media.httplive.max-bw", value, NULL)) {
426        char *end;
427        long maxBw = strtoul(value, &end, 10);
428        if (end > value && *end == '\0') {
429            if (maxBw > 0 && bandwidthBps > maxBw) {
430                ALOGV("bandwidth capped to %ld bps", maxBw);
431                bandwidthBps = maxBw;
432            }
433        }
434    }
435
436    // Consider only 80% of the available bandwidth usable.
437    bandwidthBps = (bandwidthBps * 8) / 10;
438
439    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
440
441    size_t index = mBandwidthItems.size() - 1;
442    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
443                            > (size_t)bandwidthBps) {
444        --index;
445    }
446#elif 0
447    // Change bandwidth at random()
448    size_t index = uniformRand() * mBandwidthItems.size();
449#elif 0
450    // There's a 50% chance to stay on the current bandwidth and
451    // a 50% chance to switch to the next higher bandwidth (wrapping around
452    // to lowest)
453    const size_t kMinIndex = 0;
454
455    size_t index;
456    if (mPrevBandwidthIndex < 0) {
457        index = kMinIndex;
458    } else if (uniformRand() < 0.5) {
459        index = (size_t)mPrevBandwidthIndex;
460    } else {
461        index = mPrevBandwidthIndex + 1;
462        if (index == mBandwidthItems.size()) {
463            index = kMinIndex;
464        }
465    }
466#elif 0
467    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
468
469    size_t index = mBandwidthItems.size() - 1;
470    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
471        --index;
472    }
473#else
474    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
475#endif
476
477    return index;
478}
479
480bool LiveSession::timeToRefreshPlaylist(int64_t nowUs) const {
481    if (mPlaylist == NULL) {
482        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
483        return true;
484    }
485
486    int32_t targetDurationSecs;
487    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
488
489    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
490
491    int64_t minPlaylistAgeUs;
492
493    switch (mRefreshState) {
494        case INITIAL_MINIMUM_RELOAD_DELAY:
495        {
496            size_t n = mPlaylist->size();
497            if (n > 0) {
498                sp<AMessage> itemMeta;
499                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
500
501                int64_t itemDurationUs;
502                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
503
504                minPlaylistAgeUs = itemDurationUs;
505                break;
506            }
507
508            // fall through
509        }
510
511        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
512        {
513            minPlaylistAgeUs = targetDurationUs / 2;
514            break;
515        }
516
517        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
518        {
519            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
520            break;
521        }
522
523        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
524        {
525            minPlaylistAgeUs = targetDurationUs * 3;
526            break;
527        }
528
529        default:
530            TRESPASS();
531            break;
532    }
533
534    return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
535}
536
537void LiveSession::onDownloadNext() {
538    size_t bandwidthIndex = getBandwidthIndex();
539
540rinse_repeat:
541    int64_t nowUs = ALooper::GetNowUs();
542
543    if (mLastPlaylistFetchTimeUs < 0
544            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
545            || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
546        AString url;
547        if (mBandwidthItems.size() > 0) {
548            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
549        } else {
550            url = mMasterURL;
551        }
552
553        if ((ssize_t)bandwidthIndex != mPrevBandwidthIndex) {
554            // If we switch bandwidths, do not pay any heed to whether
555            // playlists changed since the last time...
556            mPlaylist.clear();
557        }
558
559        bool unchanged;
560        sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &unchanged);
561        if (playlist == NULL) {
562            if (unchanged) {
563                // We succeeded in fetching the playlist, but it was
564                // unchanged from the last time we tried.
565            } else {
566                ALOGE("failed to load playlist at url '%s'", url.c_str());
567                signalEOS(ERROR_IO);
568
569                return;
570            }
571        } else {
572            mPlaylist = playlist;
573        }
574
575        if (!mDurationFixed) {
576            Mutex::Autolock autoLock(mLock);
577
578            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
579                mDurationUs = -1;
580                mDurationFixed = true;
581            } else {
582                mDurationUs = 0;
583                for (size_t i = 0; i < mPlaylist->size(); ++i) {
584                    sp<AMessage> itemMeta;
585                    CHECK(mPlaylist->itemAt(
586                                i, NULL /* uri */, &itemMeta));
587
588                    int64_t itemDurationUs;
589                    CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
590
591                    mDurationUs += itemDurationUs;
592                }
593
594                mDurationFixed = mPlaylist->isComplete();
595            }
596        }
597
598        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
599    }
600
601    int32_t firstSeqNumberInPlaylist;
602    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
603                "media-sequence", &firstSeqNumberInPlaylist)) {
604        firstSeqNumberInPlaylist = 0;
605    }
606
607    bool seekDiscontinuity = false;
608    bool explicitDiscontinuity = false;
609    bool bandwidthChanged = false;
610
611    if (mSeekTimeUs >= 0) {
612        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
613            size_t index = 0;
614            int64_t segmentStartUs = 0;
615            while (index < mPlaylist->size()) {
616                sp<AMessage> itemMeta;
617                CHECK(mPlaylist->itemAt(
618                            index, NULL /* uri */, &itemMeta));
619
620                int64_t itemDurationUs;
621                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
622
623                if (mSeekTimeUs < segmentStartUs + itemDurationUs) {
624                    break;
625                }
626
627                segmentStartUs += itemDurationUs;
628                ++index;
629            }
630
631            if (index < mPlaylist->size()) {
632                int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
633
634                if (newSeqNumber != mSeqNumber) {
635                    ALOGI("seeking to seq no %d", newSeqNumber);
636
637                    mSeqNumber = newSeqNumber;
638
639                    mDataSource->reset();
640
641                    // reseting the data source will have had the
642                    // side effect of discarding any previously queued
643                    // bandwidth change discontinuity.
644                    // Therefore we'll need to treat these seek
645                    // discontinuities as involving a bandwidth change
646                    // even if they aren't directly.
647                    seekDiscontinuity = true;
648                    bandwidthChanged = true;
649                }
650            }
651        }
652
653        mSeekTimeUs = -1;
654
655        Mutex::Autolock autoLock(mLock);
656        mSeekDone = true;
657        mCondition.broadcast();
658    }
659
660    const int32_t lastSeqNumberInPlaylist =
661        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
662
663    if (mSeqNumber < 0) {
664        if (mPlaylist->isComplete()) {
665            mSeqNumber = firstSeqNumberInPlaylist;
666        } else {
667            // If this is a live session, start 3 segments from the end.
668            mSeqNumber = lastSeqNumberInPlaylist - 3;
669            if (mSeqNumber < firstSeqNumberInPlaylist) {
670                mSeqNumber = firstSeqNumberInPlaylist;
671            }
672        }
673    }
674
675    if (mSeqNumber < firstSeqNumberInPlaylist
676            || mSeqNumber > lastSeqNumberInPlaylist) {
677        if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
678            // Go back to the previous bandwidth.
679
680            ALOGI("new bandwidth does not have the sequence number "
681                 "we're looking for, switching back to previous bandwidth");
682
683            mLastPlaylistFetchTimeUs = -1;
684            bandwidthIndex = mPrevBandwidthIndex;
685            goto rinse_repeat;
686        }
687
688        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
689            ++mNumRetries;
690
691            if (mSeqNumber > lastSeqNumberInPlaylist) {
692                mLastPlaylistFetchTimeUs = -1;
693                postMonitorQueue(3000000ll);
694                return;
695            }
696
697            // we've missed the boat, let's start from the lowest sequence
698            // number available and signal a discontinuity.
699
700            ALOGI("We've missed the boat, restarting playback.");
701            mSeqNumber = lastSeqNumberInPlaylist;
702            explicitDiscontinuity = true;
703
704            // fall through
705        } else {
706            ALOGE("Cannot find sequence number %d in playlist "
707                 "(contains %d - %d)",
708                 mSeqNumber, firstSeqNumberInPlaylist,
709                 firstSeqNumberInPlaylist + mPlaylist->size() - 1);
710
711            signalEOS(ERROR_END_OF_STREAM);
712            return;
713        }
714    }
715
716    mNumRetries = 0;
717
718    AString uri;
719    sp<AMessage> itemMeta;
720    CHECK(mPlaylist->itemAt(
721                mSeqNumber - firstSeqNumberInPlaylist,
722                &uri,
723                &itemMeta));
724
725    int32_t val;
726    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
727        explicitDiscontinuity = true;
728    }
729
730    int64_t range_offset, range_length;
731    if (!itemMeta->findInt64("range-offset", &range_offset)
732            || !itemMeta->findInt64("range-length", &range_length)) {
733        range_offset = 0;
734        range_length = -1;
735    }
736
737    ALOGV("fetching segment %d from (%d .. %d)",
738          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
739
740    sp<ABuffer> buffer;
741    status_t err = fetchFile(uri.c_str(), &buffer, range_offset, range_length);
742    if (err != OK) {
743        ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
744        signalEOS(err);
745        return;
746    }
747
748    CHECK(buffer != NULL);
749
750    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
751
752    if (err != OK) {
753        ALOGE("decryptBuffer failed w/ error %d", err);
754
755        signalEOS(err);
756        return;
757    }
758
759    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
760        // Not a transport stream???
761
762        ALOGE("This doesn't look like a transport stream...");
763
764        mBandwidthItems.removeAt(bandwidthIndex);
765
766        if (mBandwidthItems.isEmpty()) {
767            signalEOS(ERROR_UNSUPPORTED);
768            return;
769        }
770
771        ALOGI("Retrying with a different bandwidth stream.");
772
773        mLastPlaylistFetchTimeUs = -1;
774        bandwidthIndex = getBandwidthIndex();
775        mPrevBandwidthIndex = bandwidthIndex;
776        mSeqNumber = -1;
777
778        goto rinse_repeat;
779    }
780
781    if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
782        bandwidthChanged = true;
783    }
784
785    if (mPrevBandwidthIndex < 0) {
786        // Don't signal a bandwidth change at the very beginning of
787        // playback.
788        bandwidthChanged = false;
789    }
790
791    if (mStartOfPlayback) {
792        seekDiscontinuity = true;
793        mStartOfPlayback = false;
794    }
795
796    if (seekDiscontinuity || explicitDiscontinuity || bandwidthChanged) {
797        // Signal discontinuity.
798
799        ALOGI("queueing discontinuity (seek=%d, explicit=%d, bandwidthChanged=%d)",
800             seekDiscontinuity, explicitDiscontinuity, bandwidthChanged);
801
802        sp<ABuffer> tmp = new ABuffer(188);
803        memset(tmp->data(), 0, tmp->size());
804
805        // signal a 'hard' discontinuity for explicit or bandwidthChanged.
806        uint8_t type = (explicitDiscontinuity || bandwidthChanged) ? 1 : 0;
807
808        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
809            // If this was a live event this made no sense since
810            // we don't have access to all the segment before the current
811            // one.
812            int64_t segmentStartTimeUs = getSegmentStartTimeUs(mSeqNumber);
813            memcpy(tmp->data() + 2, &segmentStartTimeUs, sizeof(segmentStartTimeUs));
814
815            type |= 2;
816        }
817
818        tmp->data()[1] = type;
819
820        mDataSource->queueBuffer(tmp);
821    }
822
823    mDataSource->queueBuffer(buffer);
824
825    mPrevBandwidthIndex = bandwidthIndex;
826    ++mSeqNumber;
827
828    postMonitorQueue();
829}
830
831void LiveSession::signalEOS(status_t err) {
832    if (mInPreparationPhase && mNotify != NULL) {
833        sp<AMessage> notify = mNotify->dup();
834
835        notify->setInt32(
836                "what",
837                err == ERROR_END_OF_STREAM
838                    ? kWhatPrepared : kWhatPreparationFailed);
839
840        if (err != ERROR_END_OF_STREAM) {
841            notify->setInt32("err", err);
842        }
843
844        notify->post();
845
846        mInPreparationPhase = false;
847    }
848
849    mDataSource->queueEOS(err);
850}
851
852void LiveSession::onMonitorQueue() {
853    if (mSeekTimeUs >= 0
854            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
855        onDownloadNext();
856    } else {
857        if (mInPreparationPhase) {
858            if (mNotify != NULL) {
859                sp<AMessage> notify = mNotify->dup();
860                notify->setInt32("what", kWhatPrepared);
861                notify->post();
862            }
863
864            mInPreparationPhase = false;
865        }
866
867        postMonitorQueue(1000000ll);
868    }
869}
870
871status_t LiveSession::decryptBuffer(
872        size_t playlistIndex, const sp<ABuffer> &buffer) {
873    sp<AMessage> itemMeta;
874    bool found = false;
875    AString method;
876
877    for (ssize_t i = playlistIndex; i >= 0; --i) {
878        AString uri;
879        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
880
881        if (itemMeta->findString("cipher-method", &method)) {
882            found = true;
883            break;
884        }
885    }
886
887    if (!found) {
888        method = "NONE";
889    }
890
891    if (method == "NONE") {
892        return OK;
893    } else if (!(method == "AES-128")) {
894        ALOGE("Unsupported cipher method '%s'", method.c_str());
895        return ERROR_UNSUPPORTED;
896    }
897
898    AString keyURI;
899    if (!itemMeta->findString("cipher-uri", &keyURI)) {
900        ALOGE("Missing key uri");
901        return ERROR_MALFORMED;
902    }
903
904    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
905
906    sp<ABuffer> key;
907    if (index >= 0) {
908        key = mAESKeyForURI.valueAt(index);
909    } else {
910        key = new ABuffer(16);
911
912        sp<HTTPBase> keySource =
913              HTTPBase::Create(
914                  (mFlags & kFlagIncognito)
915                    ? HTTPBase::kFlagIncognito
916                    : 0);
917
918        if (mUIDValid) {
919            keySource->setUID(mUID);
920        }
921
922        status_t err =
923            keySource->connect(
924                    keyURI.c_str(),
925                    mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
926
927        if (err == OK) {
928            size_t offset = 0;
929            while (offset < 16) {
930                ssize_t n = keySource->readAt(
931                        offset, key->data() + offset, 16 - offset);
932                if (n <= 0) {
933                    err = ERROR_IO;
934                    break;
935                }
936
937                offset += n;
938            }
939        }
940
941        if (err != OK) {
942            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
943            return ERROR_IO;
944        }
945
946        mAESKeyForURI.add(keyURI, key);
947    }
948
949    AES_KEY aes_key;
950    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
951        ALOGE("failed to set AES decryption key.");
952        return UNKNOWN_ERROR;
953    }
954
955    unsigned char aes_ivec[16];
956
957    AString iv;
958    if (itemMeta->findString("cipher-iv", &iv)) {
959        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
960                || iv.size() != 16 * 2 + 2) {
961            ALOGE("malformed cipher IV '%s'.", iv.c_str());
962            return ERROR_MALFORMED;
963        }
964
965        memset(aes_ivec, 0, sizeof(aes_ivec));
966        for (size_t i = 0; i < 16; ++i) {
967            char c1 = tolower(iv.c_str()[2 + 2 * i]);
968            char c2 = tolower(iv.c_str()[3 + 2 * i]);
969            if (!isxdigit(c1) || !isxdigit(c2)) {
970                ALOGE("malformed cipher IV '%s'.", iv.c_str());
971                return ERROR_MALFORMED;
972            }
973            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
974            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
975
976            aes_ivec[i] = nibble1 << 4 | nibble2;
977        }
978    } else {
979        memset(aes_ivec, 0, sizeof(aes_ivec));
980        aes_ivec[15] = mSeqNumber & 0xff;
981        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
982        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
983        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
984    }
985
986    AES_cbc_encrypt(
987            buffer->data(), buffer->data(), buffer->size(),
988            &aes_key, aes_ivec, AES_DECRYPT);
989
990    // hexdump(buffer->data(), buffer->size());
991
992    size_t n = buffer->size();
993    CHECK_GT(n, 0u);
994
995    size_t pad = buffer->data()[n - 1];
996
997    CHECK_GT(pad, 0u);
998    CHECK_LE(pad, 16u);
999    CHECK_GE((size_t)n, pad);
1000    for (size_t i = 0; i < pad; ++i) {
1001        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
1002    }
1003
1004    n -= pad;
1005
1006    buffer->setRange(buffer->offset(), n);
1007
1008    return OK;
1009}
1010
1011void LiveSession::postMonitorQueue(int64_t delayUs) {
1012    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
1013    msg->setInt32("generation", ++mMonitorQueueGeneration);
1014    msg->post(delayUs);
1015}
1016
1017void LiveSession::onSeek(const sp<AMessage> &msg) {
1018    int64_t timeUs;
1019    CHECK(msg->findInt64("timeUs", &timeUs));
1020
1021    mSeekTimeUs = timeUs;
1022    postMonitorQueue();
1023}
1024
1025status_t LiveSession::getDuration(int64_t *durationUs) const {
1026    Mutex::Autolock autoLock(mLock);
1027    *durationUs = mDurationUs;
1028
1029    return OK;
1030}
1031
1032bool LiveSession::isSeekable() const {
1033    int64_t durationUs;
1034    return getDuration(&durationUs) == OK && durationUs >= 0;
1035}
1036
1037bool LiveSession::hasDynamicDuration() const {
1038    return !mDurationFixed;
1039}
1040
1041}  // namespace android
1042
1043