LiveSession.cpp revision 997594088164cfb33c1cb8c376884346fbf1e7ae
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/HTTPBase.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39#include <openssl/md5.h>
40
41namespace android {
42
43LiveSession::LiveSession(
44        const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
45    : mNotify(notify),
46      mFlags(flags),
47      mUIDValid(uidValid),
48      mUID(uid),
49      mInPreparationPhase(true),
50      mDataSource(new LiveDataSource),
51      mHTTPDataSource(
52              HTTPBase::Create(
53                  (mFlags & kFlagIncognito)
54                    ? HTTPBase::kFlagIncognito
55                    : 0)),
56      mPrevBandwidthIndex(-1),
57      mLastPlaylistFetchTimeUs(-1),
58      mSeqNumber(-1),
59      mSeekTimeUs(-1),
60      mNumRetries(0),
61      mStartOfPlayback(true),
62      mDurationUs(-1),
63      mDurationFixed(false),
64      mSeekDone(false),
65      mDisconnectPending(false),
66      mMonitorQueueGeneration(0),
67      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY) {
68    if (mUIDValid) {
69        mHTTPDataSource->setUID(mUID);
70    }
71}
72
73LiveSession::~LiveSession() {
74}
75
76sp<DataSource> LiveSession::getDataSource() {
77    return mDataSource;
78}
79
80void LiveSession::connect(
81        const char *url, const KeyedVector<String8, String8> *headers) {
82    sp<AMessage> msg = new AMessage(kWhatConnect, id());
83    msg->setString("url", url);
84
85    if (headers != NULL) {
86        msg->setPointer(
87                "headers",
88                new KeyedVector<String8, String8>(*headers));
89    }
90
91    msg->post();
92}
93
94void LiveSession::disconnect() {
95    Mutex::Autolock autoLock(mLock);
96    mDisconnectPending = true;
97
98    mHTTPDataSource->disconnect();
99
100    (new AMessage(kWhatDisconnect, id()))->post();
101}
102
103void LiveSession::seekTo(int64_t timeUs) {
104    Mutex::Autolock autoLock(mLock);
105    mSeekDone = false;
106
107    sp<AMessage> msg = new AMessage(kWhatSeek, id());
108    msg->setInt64("timeUs", timeUs);
109    msg->post();
110
111    while (!mSeekDone) {
112        mCondition.wait(mLock);
113    }
114}
115
116void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
117    switch (msg->what()) {
118        case kWhatConnect:
119            onConnect(msg);
120            break;
121
122        case kWhatDisconnect:
123            onDisconnect();
124            break;
125
126        case kWhatMonitorQueue:
127        {
128            int32_t generation;
129            CHECK(msg->findInt32("generation", &generation));
130
131            if (generation != mMonitorQueueGeneration) {
132                // Stale event
133                break;
134            }
135
136            onMonitorQueue();
137            break;
138        }
139
140        case kWhatSeek:
141            onSeek(msg);
142            break;
143
144        default:
145            TRESPASS();
146            break;
147    }
148}
149
150// static
151int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
152    if (a->mBandwidth < b->mBandwidth) {
153        return -1;
154    } else if (a->mBandwidth == b->mBandwidth) {
155        return 0;
156    }
157
158    return 1;
159}
160
161void LiveSession::onConnect(const sp<AMessage> &msg) {
162    AString url;
163    CHECK(msg->findString("url", &url));
164
165    KeyedVector<String8, String8> *headers = NULL;
166    if (!msg->findPointer("headers", (void **)&headers)) {
167        mExtraHeaders.clear();
168    } else {
169        mExtraHeaders = *headers;
170
171        delete headers;
172        headers = NULL;
173    }
174
175    ALOGI("onConnect <URL suppressed>");
176
177    mMasterURL = url;
178
179    bool dummy;
180    sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &dummy);
181
182    if (playlist == NULL) {
183        ALOGE("unable to fetch master playlist '%s'.", url.c_str());
184
185        signalEOS(ERROR_IO);
186        return;
187    }
188
189    if (playlist->isVariantPlaylist()) {
190        for (size_t i = 0; i < playlist->size(); ++i) {
191            BandwidthItem item;
192
193            sp<AMessage> meta;
194            playlist->itemAt(i, &item.mURI, &meta);
195
196            unsigned long bandwidth;
197            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
198
199            mBandwidthItems.push(item);
200        }
201
202        CHECK_GT(mBandwidthItems.size(), 0u);
203
204        mBandwidthItems.sort(SortByBandwidth);
205    }
206
207    postMonitorQueue();
208}
209
210void LiveSession::onDisconnect() {
211    ALOGI("onDisconnect");
212
213    signalEOS(ERROR_END_OF_STREAM);
214
215    Mutex::Autolock autoLock(mLock);
216    mDisconnectPending = false;
217}
218
219status_t LiveSession::fetchFile(
220        const char *url, sp<ABuffer> *out,
221        int64_t range_offset, int64_t range_length) {
222    *out = NULL;
223
224    sp<DataSource> source;
225
226    if (!strncasecmp(url, "file://", 7)) {
227        source = new FileSource(url + 7);
228    } else if (strncasecmp(url, "http://", 7)
229            && strncasecmp(url, "https://", 8)) {
230        return ERROR_UNSUPPORTED;
231    } else {
232        {
233            Mutex::Autolock autoLock(mLock);
234
235            if (mDisconnectPending) {
236                return ERROR_IO;
237            }
238        }
239
240        KeyedVector<String8, String8> headers = mExtraHeaders;
241        if (range_offset > 0 || range_length >= 0) {
242            headers.add(
243                    String8("Range"),
244                    String8(
245                        StringPrintf(
246                            "bytes=%lld-%s",
247                            range_offset,
248                            range_length < 0
249                                ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
250        }
251        status_t err = mHTTPDataSource->connect(url, &headers);
252
253        if (err != OK) {
254            return err;
255        }
256
257        source = mHTTPDataSource;
258    }
259
260    off64_t size;
261    status_t err = source->getSize(&size);
262
263    if (err != OK) {
264        size = 65536;
265    }
266
267    sp<ABuffer> buffer = new ABuffer(size);
268    buffer->setRange(0, 0);
269
270    for (;;) {
271        size_t bufferRemaining = buffer->capacity() - buffer->size();
272
273        if (bufferRemaining == 0) {
274            bufferRemaining = 32768;
275
276            ALOGV("increasing download buffer to %d bytes",
277                 buffer->size() + bufferRemaining);
278
279            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
280            memcpy(copy->data(), buffer->data(), buffer->size());
281            copy->setRange(0, buffer->size());
282
283            buffer = copy;
284        }
285
286        size_t maxBytesToRead = bufferRemaining;
287        if (range_length >= 0) {
288            int64_t bytesLeftInRange = range_length - buffer->size();
289            if (bytesLeftInRange < maxBytesToRead) {
290                maxBytesToRead = bytesLeftInRange;
291
292                if (bytesLeftInRange == 0) {
293                    break;
294                }
295            }
296        }
297
298        ssize_t n = source->readAt(
299                buffer->size(), buffer->data() + buffer->size(),
300                maxBytesToRead);
301
302        if (n < 0) {
303            return n;
304        }
305
306        if (n == 0) {
307            break;
308        }
309
310        buffer->setRange(0, buffer->size() + (size_t)n);
311    }
312
313    *out = buffer;
314
315    return OK;
316}
317
318sp<M3UParser> LiveSession::fetchPlaylist(const char *url, bool *unchanged) {
319    ALOGV("fetchPlaylist '%s'", url);
320
321    *unchanged = false;
322
323    sp<ABuffer> buffer;
324    status_t err = fetchFile(url, &buffer);
325
326    if (err != OK) {
327        return NULL;
328    }
329
330    // MD5 functionality is not available on the simulator, treat all
331    // playlists as changed.
332
333#if defined(HAVE_ANDROID_OS)
334    uint8_t hash[16];
335
336    MD5_CTX m;
337    MD5_Init(&m);
338    MD5_Update(&m, buffer->data(), buffer->size());
339
340    MD5_Final(hash, &m);
341
342    if (mPlaylist != NULL && !memcmp(hash, mPlaylistHash, 16)) {
343        // playlist unchanged
344
345        if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
346            mRefreshState = (RefreshState)(mRefreshState + 1);
347        }
348
349        *unchanged = true;
350
351        ALOGV("Playlist unchanged, refresh state is now %d",
352             (int)mRefreshState);
353
354        return NULL;
355    }
356
357    memcpy(mPlaylistHash, hash, sizeof(hash));
358
359    mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
360#endif
361
362    sp<M3UParser> playlist =
363        new M3UParser(url, buffer->data(), buffer->size());
364
365    if (playlist->initCheck() != OK) {
366        ALOGE("failed to parse .m3u8 playlist");
367
368        return NULL;
369    }
370
371    return playlist;
372}
373
374int64_t LiveSession::getSegmentStartTimeUs(int32_t seqNumber) const {
375    CHECK(mPlaylist != NULL);
376
377    int32_t firstSeqNumberInPlaylist;
378    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
379                "media-sequence", &firstSeqNumberInPlaylist)) {
380        firstSeqNumberInPlaylist = 0;
381    }
382
383    int32_t lastSeqNumberInPlaylist =
384        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
385
386    CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
387    CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
388
389    int64_t segmentStartUs = 0ll;
390    for (int32_t index = 0;
391            index < seqNumber - firstSeqNumberInPlaylist; ++index) {
392        sp<AMessage> itemMeta;
393        CHECK(mPlaylist->itemAt(
394                    index, NULL /* uri */, &itemMeta));
395
396        int64_t itemDurationUs;
397        CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
398
399        segmentStartUs += itemDurationUs;
400    }
401
402    return segmentStartUs;
403}
404
405static double uniformRand() {
406    return (double)rand() / RAND_MAX;
407}
408
409size_t LiveSession::getBandwidthIndex() {
410    if (mBandwidthItems.size() == 0) {
411        return 0;
412    }
413
414#if 1
415    int32_t bandwidthBps;
416    if (mHTTPDataSource != NULL
417            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
418        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
419    } else {
420        ALOGV("no bandwidth estimate.");
421        return 0;  // Pick the lowest bandwidth stream by default.
422    }
423
424    char value[PROPERTY_VALUE_MAX];
425    if (property_get("media.httplive.max-bw", value, NULL)) {
426        char *end;
427        long maxBw = strtoul(value, &end, 10);
428        if (end > value && *end == '\0') {
429            if (maxBw > 0 && bandwidthBps > maxBw) {
430                ALOGV("bandwidth capped to %ld bps", maxBw);
431                bandwidthBps = maxBw;
432            }
433        }
434    }
435
436    // Consider only 80% of the available bandwidth usable.
437    bandwidthBps = (bandwidthBps * 8) / 10;
438
439    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
440
441    size_t index = mBandwidthItems.size() - 1;
442    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
443                            > (size_t)bandwidthBps) {
444        --index;
445    }
446#elif 0
447    // Change bandwidth at random()
448    size_t index = uniformRand() * mBandwidthItems.size();
449#elif 0
450    // There's a 50% chance to stay on the current bandwidth and
451    // a 50% chance to switch to the next higher bandwidth (wrapping around
452    // to lowest)
453    const size_t kMinIndex = 0;
454
455    size_t index;
456    if (mPrevBandwidthIndex < 0) {
457        index = kMinIndex;
458    } else if (uniformRand() < 0.5) {
459        index = (size_t)mPrevBandwidthIndex;
460    } else {
461        index = mPrevBandwidthIndex + 1;
462        if (index == mBandwidthItems.size()) {
463            index = kMinIndex;
464        }
465    }
466#elif 0
467    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
468
469    size_t index = mBandwidthItems.size() - 1;
470    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
471        --index;
472    }
473#else
474    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
475#endif
476
477    return index;
478}
479
480bool LiveSession::timeToRefreshPlaylist(int64_t nowUs) const {
481    if (mPlaylist == NULL) {
482        CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
483        return true;
484    }
485
486    int32_t targetDurationSecs;
487    CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
488
489    int64_t targetDurationUs = targetDurationSecs * 1000000ll;
490
491    int64_t minPlaylistAgeUs;
492
493    switch (mRefreshState) {
494        case INITIAL_MINIMUM_RELOAD_DELAY:
495        {
496            size_t n = mPlaylist->size();
497            if (n > 0) {
498                sp<AMessage> itemMeta;
499                CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
500
501                int64_t itemDurationUs;
502                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
503
504                minPlaylistAgeUs = itemDurationUs;
505                break;
506            }
507
508            // fall through
509        }
510
511        case FIRST_UNCHANGED_RELOAD_ATTEMPT:
512        {
513            minPlaylistAgeUs = targetDurationUs / 2;
514            break;
515        }
516
517        case SECOND_UNCHANGED_RELOAD_ATTEMPT:
518        {
519            minPlaylistAgeUs = (targetDurationUs * 3) / 2;
520            break;
521        }
522
523        case THIRD_UNCHANGED_RELOAD_ATTEMPT:
524        {
525            minPlaylistAgeUs = targetDurationUs * 3;
526            break;
527        }
528
529        default:
530            TRESPASS();
531            break;
532    }
533
534    return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
535}
536
537void LiveSession::onDownloadNext() {
538    size_t bandwidthIndex = getBandwidthIndex();
539
540rinse_repeat:
541    int64_t nowUs = ALooper::GetNowUs();
542
543    if (mLastPlaylistFetchTimeUs < 0
544            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
545            || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
546        AString url;
547        if (mBandwidthItems.size() > 0) {
548            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
549        } else {
550            url = mMasterURL;
551        }
552
553        if ((ssize_t)bandwidthIndex != mPrevBandwidthIndex) {
554            // If we switch bandwidths, do not pay any heed to whether
555            // playlists changed since the last time...
556            mPlaylist.clear();
557        }
558
559        bool unchanged;
560        sp<M3UParser> playlist = fetchPlaylist(url.c_str(), &unchanged);
561        if (playlist == NULL) {
562            if (unchanged) {
563                // We succeeded in fetching the playlist, but it was
564                // unchanged from the last time we tried.
565            } else {
566                ALOGE("failed to load playlist at url '%s'", url.c_str());
567                signalEOS(ERROR_IO);
568
569                return;
570            }
571        } else {
572            mPlaylist = playlist;
573        }
574
575        if (!mDurationFixed) {
576            Mutex::Autolock autoLock(mLock);
577
578            if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
579                mDurationUs = -1;
580                mDurationFixed = true;
581            } else {
582                mDurationUs = 0;
583                for (size_t i = 0; i < mPlaylist->size(); ++i) {
584                    sp<AMessage> itemMeta;
585                    CHECK(mPlaylist->itemAt(
586                                i, NULL /* uri */, &itemMeta));
587
588                    int64_t itemDurationUs;
589                    CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
590
591                    mDurationUs += itemDurationUs;
592                }
593
594                mDurationFixed = mPlaylist->isComplete();
595            }
596        }
597
598        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
599    }
600
601    int32_t firstSeqNumberInPlaylist;
602    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
603                "media-sequence", &firstSeqNumberInPlaylist)) {
604        firstSeqNumberInPlaylist = 0;
605    }
606
607    bool seekDiscontinuity = false;
608    bool explicitDiscontinuity = false;
609    bool bandwidthChanged = false;
610
611    if (mSeekTimeUs >= 0) {
612        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
613            size_t index = 0;
614            int64_t segmentStartUs = 0;
615            while (index < mPlaylist->size()) {
616                sp<AMessage> itemMeta;
617                CHECK(mPlaylist->itemAt(
618                            index, NULL /* uri */, &itemMeta));
619
620                int64_t itemDurationUs;
621                CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
622
623                if (mSeekTimeUs < segmentStartUs + itemDurationUs) {
624                    break;
625                }
626
627                segmentStartUs += itemDurationUs;
628                ++index;
629            }
630
631            if (index < mPlaylist->size()) {
632                int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
633
634                ALOGI("seeking to seq no %d", newSeqNumber);
635
636                mSeqNumber = newSeqNumber;
637
638                mDataSource->reset();
639
640                // reseting the data source will have had the
641                // side effect of discarding any previously queued
642                // bandwidth change discontinuity.
643                // Therefore we'll need to treat these seek
644                // discontinuities as involving a bandwidth change
645                // even if they aren't directly.
646                seekDiscontinuity = true;
647                bandwidthChanged = true;
648            }
649        }
650
651        mSeekTimeUs = -1;
652
653        Mutex::Autolock autoLock(mLock);
654        mSeekDone = true;
655        mCondition.broadcast();
656    }
657
658    const int32_t lastSeqNumberInPlaylist =
659        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
660
661    if (mSeqNumber < 0) {
662        if (mPlaylist->isComplete()) {
663            mSeqNumber = firstSeqNumberInPlaylist;
664        } else {
665            // If this is a live session, start 3 segments from the end.
666            mSeqNumber = lastSeqNumberInPlaylist - 3;
667            if (mSeqNumber < firstSeqNumberInPlaylist) {
668                mSeqNumber = firstSeqNumberInPlaylist;
669            }
670        }
671    }
672
673    if (mSeqNumber < firstSeqNumberInPlaylist
674            || mSeqNumber > lastSeqNumberInPlaylist) {
675        if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
676            // Go back to the previous bandwidth.
677
678            ALOGI("new bandwidth does not have the sequence number "
679                 "we're looking for, switching back to previous bandwidth");
680
681            mLastPlaylistFetchTimeUs = -1;
682            bandwidthIndex = mPrevBandwidthIndex;
683            goto rinse_repeat;
684        }
685
686        if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) {
687            ++mNumRetries;
688
689            if (mSeqNumber > lastSeqNumberInPlaylist) {
690                mLastPlaylistFetchTimeUs = -1;
691                postMonitorQueue(3000000ll);
692                return;
693            }
694
695            // we've missed the boat, let's start from the lowest sequence
696            // number available and signal a discontinuity.
697
698            ALOGI("We've missed the boat, restarting playback.");
699            mSeqNumber = lastSeqNumberInPlaylist;
700            explicitDiscontinuity = true;
701
702            // fall through
703        } else {
704            ALOGE("Cannot find sequence number %d in playlist "
705                 "(contains %d - %d)",
706                 mSeqNumber, firstSeqNumberInPlaylist,
707                 firstSeqNumberInPlaylist + mPlaylist->size() - 1);
708
709            signalEOS(ERROR_END_OF_STREAM);
710            return;
711        }
712    }
713
714    mNumRetries = 0;
715
716    AString uri;
717    sp<AMessage> itemMeta;
718    CHECK(mPlaylist->itemAt(
719                mSeqNumber - firstSeqNumberInPlaylist,
720                &uri,
721                &itemMeta));
722
723    int32_t val;
724    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
725        explicitDiscontinuity = true;
726    }
727
728    int64_t range_offset, range_length;
729    if (!itemMeta->findInt64("range-offset", &range_offset)
730            || !itemMeta->findInt64("range-length", &range_length)) {
731        range_offset = 0;
732        range_length = -1;
733    }
734
735    ALOGV("fetching segment %d from (%d .. %d)",
736          mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
737
738    sp<ABuffer> buffer;
739    status_t err = fetchFile(uri.c_str(), &buffer, range_offset, range_length);
740    if (err != OK) {
741        ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
742        signalEOS(err);
743        return;
744    }
745
746    CHECK(buffer != NULL);
747
748    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
749
750    if (err != OK) {
751        ALOGE("decryptBuffer failed w/ error %d", err);
752
753        signalEOS(err);
754        return;
755    }
756
757    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
758        // Not a transport stream???
759
760        ALOGE("This doesn't look like a transport stream...");
761
762        mBandwidthItems.removeAt(bandwidthIndex);
763
764        if (mBandwidthItems.isEmpty()) {
765            signalEOS(ERROR_UNSUPPORTED);
766            return;
767        }
768
769        ALOGI("Retrying with a different bandwidth stream.");
770
771        mLastPlaylistFetchTimeUs = -1;
772        bandwidthIndex = getBandwidthIndex();
773        mPrevBandwidthIndex = bandwidthIndex;
774        mSeqNumber = -1;
775
776        goto rinse_repeat;
777    }
778
779    if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
780        bandwidthChanged = true;
781    }
782
783    if (mPrevBandwidthIndex < 0) {
784        // Don't signal a bandwidth change at the very beginning of
785        // playback.
786        bandwidthChanged = false;
787    }
788
789    if (mStartOfPlayback) {
790        seekDiscontinuity = true;
791        mStartOfPlayback = false;
792    }
793
794    if (seekDiscontinuity || explicitDiscontinuity || bandwidthChanged) {
795        // Signal discontinuity.
796
797        ALOGI("queueing discontinuity (seek=%d, explicit=%d, bandwidthChanged=%d)",
798             seekDiscontinuity, explicitDiscontinuity, bandwidthChanged);
799
800        sp<ABuffer> tmp = new ABuffer(188);
801        memset(tmp->data(), 0, tmp->size());
802
803        // signal a 'hard' discontinuity for explicit or bandwidthChanged.
804        uint8_t type = (explicitDiscontinuity || bandwidthChanged) ? 1 : 0;
805
806        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
807            // If this was a live event this made no sense since
808            // we don't have access to all the segment before the current
809            // one.
810            int64_t segmentStartTimeUs = getSegmentStartTimeUs(mSeqNumber);
811            memcpy(tmp->data() + 2, &segmentStartTimeUs, sizeof(segmentStartTimeUs));
812
813            type |= 2;
814        }
815
816        tmp->data()[1] = type;
817
818        mDataSource->queueBuffer(tmp);
819    }
820
821    mDataSource->queueBuffer(buffer);
822
823    mPrevBandwidthIndex = bandwidthIndex;
824    ++mSeqNumber;
825
826    postMonitorQueue();
827}
828
829void LiveSession::signalEOS(status_t err) {
830    if (mInPreparationPhase && mNotify != NULL) {
831        sp<AMessage> notify = mNotify->dup();
832
833        notify->setInt32(
834                "what",
835                err == ERROR_END_OF_STREAM
836                    ? kWhatPrepared : kWhatPreparationFailed);
837
838        if (err != ERROR_END_OF_STREAM) {
839            notify->setInt32("err", err);
840        }
841
842        notify->post();
843
844        mInPreparationPhase = false;
845    }
846
847    mDataSource->queueEOS(err);
848}
849
850void LiveSession::onMonitorQueue() {
851    if (mSeekTimeUs >= 0
852            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
853        onDownloadNext();
854    } else {
855        if (mInPreparationPhase) {
856            if (mNotify != NULL) {
857                sp<AMessage> notify = mNotify->dup();
858                notify->setInt32("what", kWhatPrepared);
859                notify->post();
860            }
861
862            mInPreparationPhase = false;
863        }
864
865        postMonitorQueue(1000000ll);
866    }
867}
868
869status_t LiveSession::decryptBuffer(
870        size_t playlistIndex, const sp<ABuffer> &buffer) {
871    sp<AMessage> itemMeta;
872    bool found = false;
873    AString method;
874
875    for (ssize_t i = playlistIndex; i >= 0; --i) {
876        AString uri;
877        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
878
879        if (itemMeta->findString("cipher-method", &method)) {
880            found = true;
881            break;
882        }
883    }
884
885    if (!found) {
886        method = "NONE";
887    }
888
889    if (method == "NONE") {
890        return OK;
891    } else if (!(method == "AES-128")) {
892        ALOGE("Unsupported cipher method '%s'", method.c_str());
893        return ERROR_UNSUPPORTED;
894    }
895
896    AString keyURI;
897    if (!itemMeta->findString("cipher-uri", &keyURI)) {
898        ALOGE("Missing key uri");
899        return ERROR_MALFORMED;
900    }
901
902    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
903
904    sp<ABuffer> key;
905    if (index >= 0) {
906        key = mAESKeyForURI.valueAt(index);
907    } else {
908        key = new ABuffer(16);
909
910        sp<HTTPBase> keySource =
911              HTTPBase::Create(
912                  (mFlags & kFlagIncognito)
913                    ? HTTPBase::kFlagIncognito
914                    : 0);
915
916        if (mUIDValid) {
917            keySource->setUID(mUID);
918        }
919
920        status_t err =
921            keySource->connect(
922                    keyURI.c_str(),
923                    mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
924
925        if (err == OK) {
926            size_t offset = 0;
927            while (offset < 16) {
928                ssize_t n = keySource->readAt(
929                        offset, key->data() + offset, 16 - offset);
930                if (n <= 0) {
931                    err = ERROR_IO;
932                    break;
933                }
934
935                offset += n;
936            }
937        }
938
939        if (err != OK) {
940            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
941            return ERROR_IO;
942        }
943
944        mAESKeyForURI.add(keyURI, key);
945    }
946
947    AES_KEY aes_key;
948    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
949        ALOGE("failed to set AES decryption key.");
950        return UNKNOWN_ERROR;
951    }
952
953    unsigned char aes_ivec[16];
954
955    AString iv;
956    if (itemMeta->findString("cipher-iv", &iv)) {
957        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
958                || iv.size() != 16 * 2 + 2) {
959            ALOGE("malformed cipher IV '%s'.", iv.c_str());
960            return ERROR_MALFORMED;
961        }
962
963        memset(aes_ivec, 0, sizeof(aes_ivec));
964        for (size_t i = 0; i < 16; ++i) {
965            char c1 = tolower(iv.c_str()[2 + 2 * i]);
966            char c2 = tolower(iv.c_str()[3 + 2 * i]);
967            if (!isxdigit(c1) || !isxdigit(c2)) {
968                ALOGE("malformed cipher IV '%s'.", iv.c_str());
969                return ERROR_MALFORMED;
970            }
971            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
972            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
973
974            aes_ivec[i] = nibble1 << 4 | nibble2;
975        }
976    } else {
977        memset(aes_ivec, 0, sizeof(aes_ivec));
978        aes_ivec[15] = mSeqNumber & 0xff;
979        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
980        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
981        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
982    }
983
984    AES_cbc_encrypt(
985            buffer->data(), buffer->data(), buffer->size(),
986            &aes_key, aes_ivec, AES_DECRYPT);
987
988    // hexdump(buffer->data(), buffer->size());
989
990    size_t n = buffer->size();
991    CHECK_GT(n, 0u);
992
993    size_t pad = buffer->data()[n - 1];
994
995    CHECK_GT(pad, 0u);
996    CHECK_LE(pad, 16u);
997    CHECK_GE((size_t)n, pad);
998    for (size_t i = 0; i < pad; ++i) {
999        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
1000    }
1001
1002    n -= pad;
1003
1004    buffer->setRange(buffer->offset(), n);
1005
1006    return OK;
1007}
1008
1009void LiveSession::postMonitorQueue(int64_t delayUs) {
1010    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
1011    msg->setInt32("generation", ++mMonitorQueueGeneration);
1012    msg->post(delayUs);
1013}
1014
1015void LiveSession::onSeek(const sp<AMessage> &msg) {
1016    int64_t timeUs;
1017    CHECK(msg->findInt64("timeUs", &timeUs));
1018
1019    mSeekTimeUs = timeUs;
1020    postMonitorQueue();
1021}
1022
1023status_t LiveSession::getDuration(int64_t *durationUs) const {
1024    Mutex::Autolock autoLock(mLock);
1025    *durationUs = mDurationUs;
1026
1027    return OK;
1028}
1029
1030bool LiveSession::isSeekable() const {
1031    int64_t durationUs;
1032    return getDuration(&durationUs) == OK && durationUs >= 0;
1033}
1034
1035bool LiveSession::hasDynamicDuration() const {
1036    return !mDurationFixed;
1037}
1038
1039}  // namespace android
1040
1041