LiveSession.cpp revision ed8d14f6a934072cd012992c4ef16990a54baa9a
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/NuHTTPDataSource.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39
40namespace android {
41
42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll;
43
44LiveSession::LiveSession()
45    : mDataSource(new LiveDataSource),
46      mHTTPDataSource(new NuHTTPDataSource),
47      mPrevBandwidthIndex(-1),
48      mLastPlaylistFetchTimeUs(-1),
49      mSeqNumber(-1),
50      mSeekTimeUs(-1),
51      mNumRetries(0),
52      mDurationUs(-1),
53      mSeekDone(false),
54      mDisconnectPending(false),
55      mMonitorQueueGeneration(0) {
56}
57
58LiveSession::~LiveSession() {
59}
60
61sp<DataSource> LiveSession::getDataSource() {
62    return mDataSource;
63}
64
65void LiveSession::connect(const char *url) {
66    sp<AMessage> msg = new AMessage(kWhatConnect, id());
67    msg->setString("url", url);
68    msg->post();
69}
70
71void LiveSession::disconnect() {
72    Mutex::Autolock autoLock(mLock);
73    mDisconnectPending = true;
74
75    mHTTPDataSource->disconnect();
76
77    (new AMessage(kWhatDisconnect, id()))->post();
78}
79
80void LiveSession::seekTo(int64_t timeUs) {
81    Mutex::Autolock autoLock(mLock);
82    mSeekDone = false;
83
84    sp<AMessage> msg = new AMessage(kWhatSeek, id());
85    msg->setInt64("timeUs", timeUs);
86    msg->post();
87
88    while (!mSeekDone) {
89        mCondition.wait(mLock);
90    }
91}
92
93void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
94    switch (msg->what()) {
95        case kWhatConnect:
96            onConnect(msg);
97            break;
98
99        case kWhatDisconnect:
100            onDisconnect();
101            break;
102
103        case kWhatMonitorQueue:
104        {
105            int32_t generation;
106            CHECK(msg->findInt32("generation", &generation));
107
108            if (generation != mMonitorQueueGeneration) {
109                // Stale event
110                break;
111            }
112
113            onMonitorQueue();
114            break;
115        }
116
117        case kWhatSeek:
118            onSeek(msg);
119            break;
120
121        default:
122            TRESPASS();
123            break;
124    }
125}
126
127// static
128int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
129    if (a->mBandwidth < b->mBandwidth) {
130        return -1;
131    } else if (a->mBandwidth == b->mBandwidth) {
132        return 0;
133    }
134
135    return 1;
136}
137
138void LiveSession::onConnect(const sp<AMessage> &msg) {
139    AString url;
140    CHECK(msg->findString("url", &url));
141
142    LOGI("onConnect '%s'", url.c_str());
143
144    mMasterURL = url;
145
146    sp<M3UParser> playlist = fetchPlaylist(url.c_str());
147
148    if (playlist == NULL) {
149        LOGE("unable to fetch master playlist '%s'.", url.c_str());
150
151        mDataSource->queueEOS(ERROR_IO);
152        return;
153    }
154
155    if (playlist->isVariantPlaylist()) {
156        for (size_t i = 0; i < playlist->size(); ++i) {
157            BandwidthItem item;
158
159            sp<AMessage> meta;
160            playlist->itemAt(i, &item.mURI, &meta);
161
162            unsigned long bandwidth;
163            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
164
165            mBandwidthItems.push(item);
166        }
167
168        CHECK_GT(mBandwidthItems.size(), 0u);
169
170        mBandwidthItems.sort(SortByBandwidth);
171    }
172
173    postMonitorQueue();
174}
175
176void LiveSession::onDisconnect() {
177    LOGI("onDisconnect");
178
179    mDataSource->queueEOS(ERROR_END_OF_STREAM);
180
181    Mutex::Autolock autoLock(mLock);
182    mDisconnectPending = false;
183}
184
185status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) {
186    *out = NULL;
187
188    sp<DataSource> source;
189
190    if (!strncasecmp(url, "file://", 7)) {
191        source = new FileSource(url + 7);
192    } else if (strncasecmp(url, "http://", 7)) {
193        return ERROR_UNSUPPORTED;
194    } else {
195        {
196            Mutex::Autolock autoLock(mLock);
197
198            if (mDisconnectPending) {
199                return ERROR_IO;
200            }
201        }
202
203        status_t err = mHTTPDataSource->connect(url);
204
205        if (err != OK) {
206            return err;
207        }
208
209        source = mHTTPDataSource;
210    }
211
212    off64_t size;
213    status_t err = source->getSize(&size);
214
215    if (err != OK) {
216        size = 65536;
217    }
218
219    sp<ABuffer> buffer = new ABuffer(size);
220    buffer->setRange(0, 0);
221
222    for (;;) {
223        size_t bufferRemaining = buffer->capacity() - buffer->size();
224
225        if (bufferRemaining == 0) {
226            bufferRemaining = 32768;
227
228            LOGV("increasing download buffer to %d bytes",
229                 buffer->size() + bufferRemaining);
230
231            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
232            memcpy(copy->data(), buffer->data(), buffer->size());
233            copy->setRange(0, buffer->size());
234
235            buffer = copy;
236        }
237
238        ssize_t n = source->readAt(
239                buffer->size(), buffer->data() + buffer->size(),
240                bufferRemaining);
241
242        if (n < 0) {
243            return n;
244        }
245
246        if (n == 0) {
247            break;
248        }
249
250        buffer->setRange(0, buffer->size() + (size_t)n);
251    }
252
253    *out = buffer;
254
255    return OK;
256}
257
258sp<M3UParser> LiveSession::fetchPlaylist(const char *url) {
259    sp<ABuffer> buffer;
260    status_t err = fetchFile(url, &buffer);
261
262    if (err != OK) {
263        return NULL;
264    }
265
266    sp<M3UParser> playlist =
267        new M3UParser(url, buffer->data(), buffer->size());
268
269    if (playlist->initCheck() != OK) {
270        return NULL;
271    }
272
273    return playlist;
274}
275
276static double uniformRand() {
277    return (double)rand() / RAND_MAX;
278}
279
280size_t LiveSession::getBandwidthIndex() {
281    if (mBandwidthItems.size() == 0) {
282        return 0;
283    }
284
285#if 1
286    int32_t bandwidthBps;
287    if (mHTTPDataSource != NULL
288            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
289        LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
290    } else {
291        LOGV("no bandwidth estimate.");
292        return 0;  // Pick the lowest bandwidth stream by default.
293    }
294
295    char value[PROPERTY_VALUE_MAX];
296    if (property_get("media.httplive.max-bw", value, NULL)) {
297        char *end;
298        long maxBw = strtoul(value, &end, 10);
299        if (end > value && *end == '\0') {
300            if (maxBw > 0 && bandwidthBps > maxBw) {
301                LOGV("bandwidth capped to %ld bps", maxBw);
302                bandwidthBps = maxBw;
303            }
304        }
305    }
306
307    // Consider only 80% of the available bandwidth usable.
308    bandwidthBps = (bandwidthBps * 8) / 10;
309
310    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
311
312    size_t index = mBandwidthItems.size() - 1;
313    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
314                            > (size_t)bandwidthBps) {
315        --index;
316    }
317#elif 0
318    // Change bandwidth at random()
319    size_t index = uniformRand() * mBandwidthItems.size();
320#elif 0
321    // There's a 50% chance to stay on the current bandwidth and
322    // a 50% chance to switch to the next higher bandwidth (wrapping around
323    // to lowest)
324    const size_t kMinIndex = 0;
325
326    size_t index;
327    if (mPrevBandwidthIndex < 0) {
328        index = kMinIndex;
329    } else if (uniformRand() < 0.5) {
330        index = (size_t)mPrevBandwidthIndex;
331    } else {
332        index = mPrevBandwidthIndex + 1;
333        if (index == mBandwidthItems.size()) {
334            index = kMinIndex;
335        }
336    }
337#elif 0
338    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
339
340    size_t index = mBandwidthItems.size() - 1;
341    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
342        --index;
343    }
344#else
345    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
346#endif
347
348    return index;
349}
350
351void LiveSession::onDownloadNext() {
352    size_t bandwidthIndex = getBandwidthIndex();
353
354rinse_repeat:
355    int64_t nowUs = ALooper::GetNowUs();
356
357    if (mLastPlaylistFetchTimeUs < 0
358            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
359            || (!mPlaylist->isComplete()
360                && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) {
361        AString url;
362        if (mBandwidthItems.size() > 0) {
363            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
364        } else {
365            url = mMasterURL;
366        }
367
368        bool firstTime = (mPlaylist == NULL);
369
370        mPlaylist = fetchPlaylist(url.c_str());
371        if (mPlaylist == NULL) {
372            LOGE("failed to load playlist at url '%s'", url.c_str());
373            mDataSource->queueEOS(ERROR_IO);
374            return;
375        }
376
377        if (firstTime) {
378            Mutex::Autolock autoLock(mLock);
379
380            int32_t targetDuration;
381            if (!mPlaylist->isComplete()
382                    || !mPlaylist->meta()->findInt32(
383                    "target-duration", &targetDuration)) {
384                mDurationUs = -1;
385            } else {
386                mDurationUs = 1000000ll * targetDuration * mPlaylist->size();
387            }
388        }
389
390        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
391    }
392
393    int32_t firstSeqNumberInPlaylist;
394    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
395                "media-sequence", &firstSeqNumberInPlaylist)) {
396        firstSeqNumberInPlaylist = 0;
397    }
398
399    bool explicitDiscontinuity = false;
400    bool bandwidthChanged = false;
401
402    if (mSeekTimeUs >= 0) {
403        int32_t targetDuration;
404        if (mPlaylist->isComplete() &&
405                mPlaylist->meta()->findInt32(
406                    "target-duration", &targetDuration)) {
407            int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll;
408            int64_t index = seekTimeSecs / targetDuration;
409
410            if (index >= 0 && index < mPlaylist->size()) {
411                int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
412
413                if (newSeqNumber != mSeqNumber) {
414                    LOGI("seeking to seq no %d", newSeqNumber);
415
416                    mSeqNumber = newSeqNumber;
417
418                    mDataSource->reset();
419
420                    // reseting the data source will have had the
421                    // side effect of discarding any previously queued
422                    // bandwidth change discontinuity.
423                    // Therefore we'll need to treat these explicit
424                    // discontinuities as involving a bandwidth change
425                    // even if they aren't directly.
426                    explicitDiscontinuity = true;
427                    bandwidthChanged = true;
428                }
429            }
430        }
431
432        mSeekTimeUs = -1;
433
434        Mutex::Autolock autoLock(mLock);
435        mSeekDone = true;
436        mCondition.broadcast();
437    }
438
439    if (mSeqNumber < 0) {
440        if (mPlaylist->isComplete()) {
441            mSeqNumber = firstSeqNumberInPlaylist;
442        } else {
443            mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2;
444        }
445    }
446
447    int32_t lastSeqNumberInPlaylist =
448        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
449
450    if (mSeqNumber < firstSeqNumberInPlaylist
451            || mSeqNumber > lastSeqNumberInPlaylist) {
452        if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
453            // Go back to the previous bandwidth.
454
455            LOGI("new bandwidth does not have the sequence number "
456                 "we're looking for, switching back to previous bandwidth");
457
458            mLastPlaylistFetchTimeUs = -1;
459            bandwidthIndex = mPrevBandwidthIndex;
460            goto rinse_repeat;
461        }
462
463        if (!mPlaylist->isComplete()
464                && mSeqNumber > lastSeqNumberInPlaylist
465                && mNumRetries < kMaxNumRetries) {
466            ++mNumRetries;
467
468            mLastPlaylistFetchTimeUs = -1;
469            postMonitorQueue(3000000ll);
470            return;
471        }
472
473        LOGE("Cannot find sequence number %d in playlist "
474             "(contains %d - %d)",
475             mSeqNumber, firstSeqNumberInPlaylist,
476             firstSeqNumberInPlaylist + mPlaylist->size() - 1);
477
478        mDataSource->queueEOS(ERROR_END_OF_STREAM);
479        return;
480    }
481
482    mNumRetries = 0;
483
484    AString uri;
485    sp<AMessage> itemMeta;
486    CHECK(mPlaylist->itemAt(
487                mSeqNumber - firstSeqNumberInPlaylist,
488                &uri,
489                &itemMeta));
490
491    int32_t val;
492    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
493        explicitDiscontinuity = true;
494    }
495
496    sp<ABuffer> buffer;
497    status_t err = fetchFile(uri.c_str(), &buffer);
498    if (err != OK) {
499        LOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
500        mDataSource->queueEOS(err);
501        return;
502    }
503
504    CHECK(buffer != NULL);
505
506    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
507
508    if (err != OK) {
509        LOGE("decryptBuffer failed w/ error %d", err);
510
511        mDataSource->queueEOS(err);
512        return;
513    }
514
515    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
516        // Not a transport stream???
517
518        LOGE("This doesn't look like a transport stream...");
519
520        mBandwidthItems.removeAt(bandwidthIndex);
521
522        if (mBandwidthItems.isEmpty()) {
523            mDataSource->queueEOS(ERROR_UNSUPPORTED);
524            return;
525        }
526
527        LOGI("Retrying with a different bandwidth stream.");
528
529        mLastPlaylistFetchTimeUs = -1;
530        bandwidthIndex = getBandwidthIndex();
531        mPrevBandwidthIndex = bandwidthIndex;
532        mSeqNumber = -1;
533
534        goto rinse_repeat;
535    }
536
537    if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
538        bandwidthChanged = true;
539    }
540
541    if (mPrevBandwidthIndex < 0) {
542        // Don't signal a bandwidth change at the very beginning of
543        // playback.
544        bandwidthChanged = false;
545    }
546
547    if (explicitDiscontinuity || bandwidthChanged) {
548        // Signal discontinuity.
549
550        LOGI("queueing discontinuity (explicit=%d, bandwidthChanged=%d)",
551             explicitDiscontinuity, bandwidthChanged);
552
553        sp<ABuffer> tmp = new ABuffer(188);
554        memset(tmp->data(), 0, tmp->size());
555        tmp->data()[1] = bandwidthChanged;
556
557        mDataSource->queueBuffer(tmp);
558    }
559
560    mDataSource->queueBuffer(buffer);
561
562    mPrevBandwidthIndex = bandwidthIndex;
563    ++mSeqNumber;
564
565    postMonitorQueue();
566}
567
568void LiveSession::onMonitorQueue() {
569    if (mSeekTimeUs >= 0
570            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
571        onDownloadNext();
572    } else {
573        postMonitorQueue(1000000ll);
574    }
575}
576
577status_t LiveSession::decryptBuffer(
578        size_t playlistIndex, const sp<ABuffer> &buffer) {
579    sp<AMessage> itemMeta;
580    bool found = false;
581    AString method;
582
583    for (ssize_t i = playlistIndex; i >= 0; --i) {
584        AString uri;
585        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
586
587        if (itemMeta->findString("cipher-method", &method)) {
588            found = true;
589            break;
590        }
591    }
592
593    if (!found) {
594        method = "NONE";
595    }
596
597    if (method == "NONE") {
598        return OK;
599    } else if (!(method == "AES-128")) {
600        LOGE("Unsupported cipher method '%s'", method.c_str());
601        return ERROR_UNSUPPORTED;
602    }
603
604    AString keyURI;
605    if (!itemMeta->findString("cipher-uri", &keyURI)) {
606        LOGE("Missing key uri");
607        return ERROR_MALFORMED;
608    }
609
610    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
611
612    sp<ABuffer> key;
613    if (index >= 0) {
614        key = mAESKeyForURI.valueAt(index);
615    } else {
616        key = new ABuffer(16);
617
618        sp<NuHTTPDataSource> keySource = new NuHTTPDataSource;
619        status_t err = keySource->connect(keyURI.c_str());
620
621        if (err == OK) {
622            size_t offset = 0;
623            while (offset < 16) {
624                ssize_t n = keySource->readAt(
625                        offset, key->data() + offset, 16 - offset);
626                if (n <= 0) {
627                    err = ERROR_IO;
628                    break;
629                }
630
631                offset += n;
632            }
633        }
634
635        if (err != OK) {
636            LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
637            return ERROR_IO;
638        }
639
640        mAESKeyForURI.add(keyURI, key);
641    }
642
643    AES_KEY aes_key;
644    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
645        LOGE("failed to set AES decryption key.");
646        return UNKNOWN_ERROR;
647    }
648
649    unsigned char aes_ivec[16];
650
651    AString iv;
652    if (itemMeta->findString("cipher-iv", &iv)) {
653        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
654                || iv.size() != 16 * 2 + 2) {
655            LOGE("malformed cipher IV '%s'.", iv.c_str());
656            return ERROR_MALFORMED;
657        }
658
659        memset(aes_ivec, 0, sizeof(aes_ivec));
660        for (size_t i = 0; i < 16; ++i) {
661            char c1 = tolower(iv.c_str()[2 + 2 * i]);
662            char c2 = tolower(iv.c_str()[3 + 2 * i]);
663            if (!isxdigit(c1) || !isxdigit(c2)) {
664                LOGE("malformed cipher IV '%s'.", iv.c_str());
665                return ERROR_MALFORMED;
666            }
667            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
668            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
669
670            aes_ivec[i] = nibble1 << 4 | nibble2;
671        }
672    } else {
673        memset(aes_ivec, 0, sizeof(aes_ivec));
674        aes_ivec[15] = mSeqNumber & 0xff;
675        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
676        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
677        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
678    }
679
680    AES_cbc_encrypt(
681            buffer->data(), buffer->data(), buffer->size(),
682            &aes_key, aes_ivec, AES_DECRYPT);
683
684    // hexdump(buffer->data(), buffer->size());
685
686    size_t n = buffer->size();
687    CHECK_GT(n, 0u);
688
689    size_t pad = buffer->data()[n - 1];
690
691    CHECK_GT(pad, 0u);
692    CHECK_LE(pad, 16u);
693    CHECK_GE((size_t)n, pad);
694    for (size_t i = 0; i < pad; ++i) {
695        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
696    }
697
698    n -= pad;
699
700    buffer->setRange(buffer->offset(), n);
701
702    return OK;
703}
704
705void LiveSession::postMonitorQueue(int64_t delayUs) {
706    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
707    msg->setInt32("generation", ++mMonitorQueueGeneration);
708    msg->post(delayUs);
709}
710
711void LiveSession::onSeek(const sp<AMessage> &msg) {
712    int64_t timeUs;
713    CHECK(msg->findInt64("timeUs", &timeUs));
714
715    mSeekTimeUs = timeUs;
716    postMonitorQueue();
717}
718
719status_t LiveSession::getDuration(int64_t *durationUs) {
720    Mutex::Autolock autoLock(mLock);
721    *durationUs = mDurationUs;
722
723    return OK;
724}
725
726bool LiveSession::isSeekable() {
727    int64_t durationUs;
728    return getDuration(&durationUs) == OK && durationUs >= 0;
729}
730
731}  // namespace android
732
733