LiveSession.cpp revision df42f949c8bd05b81d94633767514fff88f52062
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/NuHTTPDataSource.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39
40namespace android {
41
42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll;
43
44LiveSession::LiveSession()
45    : mDataSource(new LiveDataSource),
46      mHTTPDataSource(new NuHTTPDataSource),
47      mPrevBandwidthIndex(-1),
48      mLastPlaylistFetchTimeUs(-1),
49      mSeqNumber(-1),
50      mSeekTimeUs(-1),
51      mNumRetries(0),
52      mDurationUs(-1),
53      mSeekDone(false),
54      mMonitorQueueGeneration(0) {
55}
56
57LiveSession::~LiveSession() {
58}
59
60sp<DataSource> LiveSession::getDataSource() {
61    return mDataSource;
62}
63
64void LiveSession::connect(const char *url) {
65    sp<AMessage> msg = new AMessage(kWhatConnect, id());
66    msg->setString("url", url);
67    msg->post();
68}
69
70void LiveSession::disconnect() {
71    (new AMessage(kWhatDisconnect, id()))->post();
72}
73
74void LiveSession::seekTo(int64_t timeUs) {
75    Mutex::Autolock autoLock(mLock);
76    mSeekDone = false;
77
78    sp<AMessage> msg = new AMessage(kWhatSeek, id());
79    msg->setInt64("timeUs", timeUs);
80    msg->post();
81
82    while (!mSeekDone) {
83        mCondition.wait(mLock);
84    }
85}
86
87void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
88    switch (msg->what()) {
89        case kWhatConnect:
90            onConnect(msg);
91            break;
92
93        case kWhatDisconnect:
94            onDisconnect();
95            break;
96
97        case kWhatMonitorQueue:
98        {
99            int32_t generation;
100            CHECK(msg->findInt32("generation", &generation));
101
102            if (generation != mMonitorQueueGeneration) {
103                // Stale event
104                break;
105            }
106
107            onMonitorQueue();
108            break;
109        }
110
111        case kWhatSeek:
112            onSeek(msg);
113            break;
114
115        default:
116            TRESPASS();
117            break;
118    }
119}
120
121// static
122int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
123    if (a->mBandwidth < b->mBandwidth) {
124        return -1;
125    } else if (a->mBandwidth == b->mBandwidth) {
126        return 0;
127    }
128
129    return 1;
130}
131
132void LiveSession::onConnect(const sp<AMessage> &msg) {
133    AString url;
134    CHECK(msg->findString("url", &url));
135
136    LOGI("onConnect '%s'", url.c_str());
137
138    mMasterURL = url;
139
140    sp<M3UParser> playlist = fetchPlaylist(url.c_str());
141    CHECK(playlist != NULL);
142
143    if (playlist->isVariantPlaylist()) {
144        for (size_t i = 0; i < playlist->size(); ++i) {
145            BandwidthItem item;
146
147            sp<AMessage> meta;
148            playlist->itemAt(i, &item.mURI, &meta);
149
150            unsigned long bandwidth;
151            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
152
153            mBandwidthItems.push(item);
154        }
155
156        CHECK_GT(mBandwidthItems.size(), 0u);
157
158        mBandwidthItems.sort(SortByBandwidth);
159
160        if (mBandwidthItems.size() > 1) {
161            // XXX Remove the lowest bitrate stream for now...
162            mBandwidthItems.removeAt(0);
163        }
164    }
165
166    postMonitorQueue();
167}
168
169void LiveSession::onDisconnect() {
170    LOGI("onDisconnect");
171
172    mDataSource->queueEOS(ERROR_END_OF_STREAM);
173}
174
175status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) {
176    *out = NULL;
177
178    sp<DataSource> source;
179
180    if (!strncasecmp(url, "file://", 7)) {
181        source = new FileSource(url + 7);
182    } else if (strncasecmp(url, "http://", 7)) {
183        return ERROR_UNSUPPORTED;
184    } else {
185        status_t err = mHTTPDataSource->connect(url);
186
187        if (err != OK) {
188            return err;
189        }
190
191        source = mHTTPDataSource;
192    }
193
194    off64_t size;
195    status_t err = source->getSize(&size);
196
197    if (err != OK) {
198        size = 65536;
199    }
200
201    sp<ABuffer> buffer = new ABuffer(size);
202    buffer->setRange(0, 0);
203
204    for (;;) {
205        size_t bufferRemaining = buffer->capacity() - buffer->size();
206
207        if (bufferRemaining == 0) {
208            bufferRemaining = 32768;
209
210            LOGV("increasing download buffer to %d bytes",
211                 buffer->size() + bufferRemaining);
212
213            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
214            memcpy(copy->data(), buffer->data(), buffer->size());
215            copy->setRange(0, buffer->size());
216
217            buffer = copy;
218        }
219
220        ssize_t n = source->readAt(
221                buffer->size(), buffer->data() + buffer->size(),
222                bufferRemaining);
223
224        if (n < 0) {
225            return err;
226        }
227
228        if (n == 0) {
229            break;
230        }
231
232        buffer->setRange(0, buffer->size() + (size_t)n);
233    }
234
235    *out = buffer;
236
237    return OK;
238}
239
240sp<M3UParser> LiveSession::fetchPlaylist(const char *url) {
241    sp<ABuffer> buffer;
242    status_t err = fetchFile(url, &buffer);
243
244    if (err != OK) {
245        return NULL;
246    }
247
248    sp<M3UParser> playlist =
249        new M3UParser(url, buffer->data(), buffer->size());
250
251    if (playlist->initCheck() != OK) {
252        return NULL;
253    }
254
255    return playlist;
256}
257
258static double uniformRand() {
259    return (double)rand() / RAND_MAX;
260}
261
262size_t LiveSession::getBandwidthIndex() {
263    if (mBandwidthItems.size() == 0) {
264        return 0;
265    }
266
267#if 1
268    int32_t bandwidthBps;
269    if (mHTTPDataSource != NULL
270            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
271        LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
272    } else {
273        LOGV("no bandwidth estimate.");
274        return 0;  // Pick the lowest bandwidth stream by default.
275    }
276
277    char value[PROPERTY_VALUE_MAX];
278    if (property_get("media.httplive.max-bw", value, NULL)) {
279        char *end;
280        long maxBw = strtoul(value, &end, 10);
281        if (end > value && *end == '\0') {
282            if (maxBw > 0 && bandwidthBps > maxBw) {
283                LOGV("bandwidth capped to %ld bps", maxBw);
284                bandwidthBps = maxBw;
285            }
286        }
287    }
288
289    // Consider only 80% of the available bandwidth usable.
290    bandwidthBps = (bandwidthBps * 8) / 10;
291
292    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
293
294    size_t index = mBandwidthItems.size() - 1;
295    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
296                            > (size_t)bandwidthBps) {
297        --index;
298    }
299#elif 0
300    // Change bandwidth at random()
301    size_t index = uniformRand() * mBandwidthItems.size();
302#elif 0
303    // There's a 50% chance to stay on the current bandwidth and
304    // a 50% chance to switch to the next higher bandwidth (wrapping around
305    // to lowest)
306    const size_t kMinIndex = 0;
307
308    size_t index;
309    if (mPrevBandwidthIndex < 0) {
310        index = kMinIndex;
311    } else if (uniformRand() < 0.5) {
312        index = (size_t)mPrevBandwidthIndex;
313    } else {
314        index = mPrevBandwidthIndex + 1;
315        if (index == mBandwidthItems.size()) {
316            index = kMinIndex;
317        }
318    }
319#elif 0
320    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
321
322    size_t index = mBandwidthItems.size() - 1;
323    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
324        --index;
325    }
326#else
327    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
328#endif
329
330    return index;
331}
332
333void LiveSession::onDownloadNext() {
334    size_t bandwidthIndex = getBandwidthIndex();
335
336    int64_t nowUs = ALooper::GetNowUs();
337
338    if (mLastPlaylistFetchTimeUs < 0
339            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
340            || (!mPlaylist->isComplete()
341                && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) {
342        AString url;
343        if (mBandwidthItems.size() > 0) {
344            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
345        } else {
346            url = mMasterURL;
347        }
348
349        bool firstTime = (mPlaylist == NULL);
350
351        mPlaylist = fetchPlaylist(url.c_str());
352        if (mPlaylist == NULL) {
353            LOGE("failed to load playlist at url '%s'", url.c_str());
354            mDataSource->queueEOS(ERROR_IO);
355            return;
356        }
357
358        if (firstTime) {
359            Mutex::Autolock autoLock(mLock);
360
361            int32_t targetDuration;
362            if (!mPlaylist->isComplete()
363                    || !mPlaylist->meta()->findInt32(
364                    "target-duration", &targetDuration)) {
365                mDurationUs = -1;
366            } else {
367                mDurationUs = 1000000ll * targetDuration * mPlaylist->size();
368            }
369        }
370
371        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
372    }
373
374    int32_t firstSeqNumberInPlaylist;
375    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
376                "media-sequence", &firstSeqNumberInPlaylist)) {
377        firstSeqNumberInPlaylist = 0;
378    }
379
380    bool explicitDiscontinuity = false;
381
382    if (mSeekTimeUs >= 0) {
383        int32_t targetDuration;
384        if (mPlaylist->isComplete() &&
385                mPlaylist->meta()->findInt32(
386                    "target-duration", &targetDuration)) {
387            int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll;
388            int64_t index = seekTimeSecs / targetDuration;
389
390            if (index >= 0 && index < mPlaylist->size()) {
391                mSeqNumber = firstSeqNumberInPlaylist + index;
392                mDataSource->reset();
393
394                explicitDiscontinuity = true;
395            }
396        }
397
398        mSeekTimeUs = -1;
399
400        Mutex::Autolock autoLock(mLock);
401        mSeekDone = true;
402        mCondition.broadcast();
403    }
404
405    if (mSeqNumber < 0) {
406        if (mPlaylist->isComplete()) {
407            mSeqNumber = firstSeqNumberInPlaylist;
408        } else {
409            mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2;
410        }
411    }
412
413    int32_t lastSeqNumberInPlaylist =
414        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
415
416    if (mSeqNumber < firstSeqNumberInPlaylist
417            || mSeqNumber > lastSeqNumberInPlaylist) {
418        if (!mPlaylist->isComplete()
419                && mSeqNumber > lastSeqNumberInPlaylist
420                && mNumRetries < kMaxNumRetries) {
421            ++mNumRetries;
422
423            mLastPlaylistFetchTimeUs = -1;
424            postMonitorQueue(1000000ll);
425            return;
426        }
427
428        LOGE("Cannot find sequence number %d in playlist "
429             "(contains %d - %d)",
430             mSeqNumber, firstSeqNumberInPlaylist,
431             firstSeqNumberInPlaylist + mPlaylist->size() - 1);
432
433        mDataSource->queueEOS(ERROR_END_OF_STREAM);
434        return;
435    }
436
437    mNumRetries = 0;
438
439    AString uri;
440    sp<AMessage> itemMeta;
441    CHECK(mPlaylist->itemAt(
442                mSeqNumber - firstSeqNumberInPlaylist,
443                &uri,
444                &itemMeta));
445
446    int32_t val;
447    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
448        explicitDiscontinuity = true;
449    }
450
451    sp<ABuffer> buffer;
452    status_t err = fetchFile(uri.c_str(), &buffer);
453    if (err != OK) {
454        LOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
455        mDataSource->queueEOS(err);
456        return;
457    }
458
459    CHECK_EQ((status_t)OK,
460             decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer));
461
462    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
463        // Not a transport stream???
464
465        LOGE("This doesn't look like a transport stream...");
466
467        mDataSource->queueEOS(ERROR_UNSUPPORTED);
468        return;
469    }
470
471    bool bandwidthChanged =
472        mPrevBandwidthIndex >= 0
473            && (size_t)mPrevBandwidthIndex != bandwidthIndex;
474
475    if (explicitDiscontinuity || bandwidthChanged) {
476        // Signal discontinuity.
477
478        sp<ABuffer> tmp = new ABuffer(188);
479        memset(tmp->data(), 0, tmp->size());
480        tmp->data()[1] = bandwidthChanged;
481
482        mDataSource->queueBuffer(tmp);
483    }
484
485    mDataSource->queueBuffer(buffer);
486
487    mPrevBandwidthIndex = bandwidthIndex;
488    ++mSeqNumber;
489
490    postMonitorQueue();
491}
492
493void LiveSession::onMonitorQueue() {
494    if (mSeekTimeUs >= 0
495            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
496        onDownloadNext();
497    } else {
498        postMonitorQueue(1000000ll);
499    }
500}
501
502status_t LiveSession::decryptBuffer(
503        size_t playlistIndex, const sp<ABuffer> &buffer) {
504    sp<AMessage> itemMeta;
505    bool found = false;
506    AString method;
507
508    for (ssize_t i = playlistIndex; i >= 0; --i) {
509        AString uri;
510        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
511
512        if (itemMeta->findString("cipher-method", &method)) {
513            found = true;
514            break;
515        }
516    }
517
518    if (!found) {
519        method = "NONE";
520    }
521
522    if (method == "NONE") {
523        return OK;
524    } else if (!(method == "AES-128")) {
525        LOGE("Unsupported cipher method '%s'", method.c_str());
526        return ERROR_UNSUPPORTED;
527    }
528
529    AString keyURI;
530    if (!itemMeta->findString("cipher-uri", &keyURI)) {
531        LOGE("Missing key uri");
532        return ERROR_MALFORMED;
533    }
534
535    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
536
537    sp<ABuffer> key;
538    if (index >= 0) {
539        key = mAESKeyForURI.valueAt(index);
540    } else {
541        key = new ABuffer(16);
542
543        sp<NuHTTPDataSource> keySource = new NuHTTPDataSource;
544        status_t err = keySource->connect(keyURI.c_str());
545
546        if (err == OK) {
547            size_t offset = 0;
548            while (offset < 16) {
549                ssize_t n = keySource->readAt(
550                        offset, key->data() + offset, 16 - offset);
551                if (n <= 0) {
552                    err = ERROR_IO;
553                    break;
554                }
555
556                offset += n;
557            }
558        }
559
560        if (err != OK) {
561            LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
562            return ERROR_IO;
563        }
564
565        mAESKeyForURI.add(keyURI, key);
566    }
567
568    AES_KEY aes_key;
569    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
570        LOGE("failed to set AES decryption key.");
571        return UNKNOWN_ERROR;
572    }
573
574    unsigned char aes_ivec[16];
575
576    AString iv;
577    if (itemMeta->findString("cipher-iv", &iv)) {
578        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
579                || iv.size() != 16 * 2 + 2) {
580            LOGE("malformed cipher IV '%s'.", iv.c_str());
581            return ERROR_MALFORMED;
582        }
583
584        memset(aes_ivec, 0, sizeof(aes_ivec));
585        for (size_t i = 0; i < 16; ++i) {
586            char c1 = tolower(iv.c_str()[2 + 2 * i]);
587            char c2 = tolower(iv.c_str()[3 + 2 * i]);
588            if (!isxdigit(c1) || !isxdigit(c2)) {
589                LOGE("malformed cipher IV '%s'.", iv.c_str());
590                return ERROR_MALFORMED;
591            }
592            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
593            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
594
595            aes_ivec[i] = nibble1 << 4 | nibble2;
596        }
597    } else {
598        memset(aes_ivec, 0, sizeof(aes_ivec));
599        aes_ivec[15] = mSeqNumber & 0xff;
600        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
601        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
602        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
603    }
604
605    AES_cbc_encrypt(
606            buffer->data(), buffer->data(), buffer->size(),
607            &aes_key, aes_ivec, AES_DECRYPT);
608
609    // hexdump(buffer->data(), buffer->size());
610
611    size_t n = buffer->size();
612    CHECK_GT(n, 0u);
613
614    size_t pad = buffer->data()[n - 1];
615
616    CHECK_GT(pad, 0u);
617    CHECK_LE(pad, 16u);
618    CHECK_GE((size_t)n, pad);
619    for (size_t i = 0; i < pad; ++i) {
620        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
621    }
622
623    n -= pad;
624
625    buffer->setRange(buffer->offset(), n);
626
627    return OK;
628}
629
630void LiveSession::postMonitorQueue(int64_t delayUs) {
631    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
632    msg->setInt32("generation", ++mMonitorQueueGeneration);
633    msg->post(delayUs);
634}
635
636void LiveSession::onSeek(const sp<AMessage> &msg) {
637    int64_t timeUs;
638    CHECK(msg->findInt64("timeUs", &timeUs));
639
640    mSeekTimeUs = timeUs;
641    postMonitorQueue();
642}
643
644status_t LiveSession::getDuration(int64_t *durationUs) {
645    Mutex::Autolock autoLock(mLock);
646    *durationUs = mDurationUs;
647
648    return OK;
649}
650
651bool LiveSession::isSeekable() {
652    int64_t durationUs;
653    return getDuration(&durationUs) == OK && durationUs >= 0;
654}
655
656}  // namespace android
657
658