LiveSession.cpp revision bc7f5b2e56107cfeaeeab13cf8979379e3c2f139
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/NuHTTPDataSource.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39
40namespace android {
41
42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll;
43
44LiveSession::LiveSession()
45    : mDataSource(new LiveDataSource),
46      mHTTPDataSource(new NuHTTPDataSource),
47      mPrevBandwidthIndex(-1),
48      mLastPlaylistFetchTimeUs(-1),
49      mSeqNumber(-1),
50      mSeekTimeUs(-1),
51      mNumRetries(0),
52      mDurationUs(-1),
53      mSeekDone(false),
54      mMonitorQueueGeneration(0) {
55}
56
57LiveSession::~LiveSession() {
58}
59
60sp<DataSource> LiveSession::getDataSource() {
61    return mDataSource;
62}
63
64void LiveSession::connect(const char *url) {
65    sp<AMessage> msg = new AMessage(kWhatConnect, id());
66    msg->setString("url", url);
67    msg->post();
68}
69
70void LiveSession::disconnect() {
71    (new AMessage(kWhatDisconnect, id()))->post();
72}
73
74void LiveSession::seekTo(int64_t timeUs) {
75    Mutex::Autolock autoLock(mLock);
76    mSeekDone = false;
77
78    sp<AMessage> msg = new AMessage(kWhatSeek, id());
79    msg->setInt64("timeUs", timeUs);
80    msg->post();
81
82    while (!mSeekDone) {
83        mCondition.wait(mLock);
84    }
85}
86
87void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
88    switch (msg->what()) {
89        case kWhatConnect:
90            onConnect(msg);
91            break;
92
93        case kWhatDisconnect:
94            onDisconnect();
95            break;
96
97        case kWhatMonitorQueue:
98        {
99            int32_t generation;
100            CHECK(msg->findInt32("generation", &generation));
101
102            if (generation != mMonitorQueueGeneration) {
103                // Stale event
104                break;
105            }
106
107            onMonitorQueue();
108            break;
109        }
110
111        case kWhatSeek:
112            onSeek(msg);
113            break;
114
115        default:
116            TRESPASS();
117            break;
118    }
119}
120
121// static
122int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
123    if (a->mBandwidth < b->mBandwidth) {
124        return -1;
125    } else if (a->mBandwidth == b->mBandwidth) {
126        return 0;
127    }
128
129    return 1;
130}
131
132void LiveSession::onConnect(const sp<AMessage> &msg) {
133    AString url;
134    CHECK(msg->findString("url", &url));
135
136    LOGI("onConnect '%s'", url.c_str());
137
138    mMasterURL = url;
139
140    sp<M3UParser> playlist = fetchPlaylist(url.c_str());
141    CHECK(playlist != NULL);
142
143    if (playlist->isVariantPlaylist()) {
144        for (size_t i = 0; i < playlist->size(); ++i) {
145            BandwidthItem item;
146
147            sp<AMessage> meta;
148            playlist->itemAt(i, &item.mURI, &meta);
149
150            unsigned long bandwidth;
151            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
152
153            mBandwidthItems.push(item);
154        }
155
156        CHECK_GT(mBandwidthItems.size(), 0u);
157
158        mBandwidthItems.sort(SortByBandwidth);
159
160        char value[PROPERTY_VALUE_MAX];
161        if (property_get("media.httplive.disable-nuplayer", value, NULL)
162                && (!strcasecmp(value, "true") || !strcmp(value, "1"))) {
163            // The "legacy" player cannot deal with audio format changes,
164            // some streams use different audio encoding parameters for
165            // their lowest bandwidth stream.
166            if (mBandwidthItems.size() > 1) {
167                // XXX Remove the lowest bitrate stream for now...
168                mBandwidthItems.removeAt(0);
169            }
170        }
171    }
172
173    postMonitorQueue();
174}
175
176void LiveSession::onDisconnect() {
177    LOGI("onDisconnect");
178
179    mDataSource->queueEOS(ERROR_END_OF_STREAM);
180}
181
182status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) {
183    *out = NULL;
184
185    sp<DataSource> source;
186
187    if (!strncasecmp(url, "file://", 7)) {
188        source = new FileSource(url + 7);
189    } else if (strncasecmp(url, "http://", 7)) {
190        return ERROR_UNSUPPORTED;
191    } else {
192        status_t err = mHTTPDataSource->connect(url);
193
194        if (err != OK) {
195            return err;
196        }
197
198        source = mHTTPDataSource;
199    }
200
201    off64_t size;
202    status_t err = source->getSize(&size);
203
204    if (err != OK) {
205        size = 65536;
206    }
207
208    sp<ABuffer> buffer = new ABuffer(size);
209    buffer->setRange(0, 0);
210
211    for (;;) {
212        size_t bufferRemaining = buffer->capacity() - buffer->size();
213
214        if (bufferRemaining == 0) {
215            bufferRemaining = 32768;
216
217            LOGV("increasing download buffer to %d bytes",
218                 buffer->size() + bufferRemaining);
219
220            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
221            memcpy(copy->data(), buffer->data(), buffer->size());
222            copy->setRange(0, buffer->size());
223
224            buffer = copy;
225        }
226
227        ssize_t n = source->readAt(
228                buffer->size(), buffer->data() + buffer->size(),
229                bufferRemaining);
230
231        if (n < 0) {
232            return n;
233        }
234
235        if (n == 0) {
236            break;
237        }
238
239        buffer->setRange(0, buffer->size() + (size_t)n);
240    }
241
242    *out = buffer;
243
244    return OK;
245}
246
247sp<M3UParser> LiveSession::fetchPlaylist(const char *url) {
248    sp<ABuffer> buffer;
249    status_t err = fetchFile(url, &buffer);
250
251    if (err != OK) {
252        return NULL;
253    }
254
255    sp<M3UParser> playlist =
256        new M3UParser(url, buffer->data(), buffer->size());
257
258    if (playlist->initCheck() != OK) {
259        return NULL;
260    }
261
262    return playlist;
263}
264
265static double uniformRand() {
266    return (double)rand() / RAND_MAX;
267}
268
269size_t LiveSession::getBandwidthIndex() {
270    if (mBandwidthItems.size() == 0) {
271        return 0;
272    }
273
274#if 1
275    int32_t bandwidthBps;
276    if (mHTTPDataSource != NULL
277            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
278        LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
279    } else {
280        LOGV("no bandwidth estimate.");
281        return 0;  // Pick the lowest bandwidth stream by default.
282    }
283
284    char value[PROPERTY_VALUE_MAX];
285    if (property_get("media.httplive.max-bw", value, NULL)) {
286        char *end;
287        long maxBw = strtoul(value, &end, 10);
288        if (end > value && *end == '\0') {
289            if (maxBw > 0 && bandwidthBps > maxBw) {
290                LOGV("bandwidth capped to %ld bps", maxBw);
291                bandwidthBps = maxBw;
292            }
293        }
294    }
295
296    // Consider only 80% of the available bandwidth usable.
297    bandwidthBps = (bandwidthBps * 8) / 10;
298
299    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
300
301    size_t index = mBandwidthItems.size() - 1;
302    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
303                            > (size_t)bandwidthBps) {
304        --index;
305    }
306#elif 0
307    // Change bandwidth at random()
308    size_t index = uniformRand() * mBandwidthItems.size();
309#elif 0
310    // There's a 50% chance to stay on the current bandwidth and
311    // a 50% chance to switch to the next higher bandwidth (wrapping around
312    // to lowest)
313    const size_t kMinIndex = 0;
314
315    size_t index;
316    if (mPrevBandwidthIndex < 0) {
317        index = kMinIndex;
318    } else if (uniformRand() < 0.5) {
319        index = (size_t)mPrevBandwidthIndex;
320    } else {
321        index = mPrevBandwidthIndex + 1;
322        if (index == mBandwidthItems.size()) {
323            index = kMinIndex;
324        }
325    }
326#elif 0
327    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
328
329    size_t index = mBandwidthItems.size() - 1;
330    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
331        --index;
332    }
333#else
334    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
335#endif
336
337    return index;
338}
339
340void LiveSession::onDownloadNext() {
341    size_t bandwidthIndex = getBandwidthIndex();
342
343rinse_repeat:
344    int64_t nowUs = ALooper::GetNowUs();
345
346    if (mLastPlaylistFetchTimeUs < 0
347            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
348            || (!mPlaylist->isComplete()
349                && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) {
350        AString url;
351        if (mBandwidthItems.size() > 0) {
352            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
353        } else {
354            url = mMasterURL;
355        }
356
357        bool firstTime = (mPlaylist == NULL);
358
359        mPlaylist = fetchPlaylist(url.c_str());
360        if (mPlaylist == NULL) {
361            LOGE("failed to load playlist at url '%s'", url.c_str());
362            mDataSource->queueEOS(ERROR_IO);
363            return;
364        }
365
366        if (firstTime) {
367            Mutex::Autolock autoLock(mLock);
368
369            int32_t targetDuration;
370            if (!mPlaylist->isComplete()
371                    || !mPlaylist->meta()->findInt32(
372                    "target-duration", &targetDuration)) {
373                mDurationUs = -1;
374            } else {
375                mDurationUs = 1000000ll * targetDuration * mPlaylist->size();
376            }
377        }
378
379        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
380    }
381
382    int32_t firstSeqNumberInPlaylist;
383    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
384                "media-sequence", &firstSeqNumberInPlaylist)) {
385        firstSeqNumberInPlaylist = 0;
386    }
387
388    bool explicitDiscontinuity = false;
389    bool bandwidthChanged = false;
390
391    if (mSeekTimeUs >= 0) {
392        int32_t targetDuration;
393        if (mPlaylist->isComplete() &&
394                mPlaylist->meta()->findInt32(
395                    "target-duration", &targetDuration)) {
396            int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll;
397            int64_t index = seekTimeSecs / targetDuration;
398
399            if (index >= 0 && index < mPlaylist->size()) {
400                int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
401
402                if (newSeqNumber != mSeqNumber) {
403                    LOGI("seeking to seq no %d", newSeqNumber);
404
405                    mSeqNumber = newSeqNumber;
406
407                    mDataSource->reset();
408
409                    // reseting the data source will have had the
410                    // side effect of discarding any previously queued
411                    // bandwidth change discontinuity.
412                    // Therefore we'll need to treat these explicit
413                    // discontinuities as involving a bandwidth change
414                    // even if they aren't directly.
415                    explicitDiscontinuity = true;
416                    bandwidthChanged = true;
417                }
418            }
419        }
420
421        mSeekTimeUs = -1;
422
423        Mutex::Autolock autoLock(mLock);
424        mSeekDone = true;
425        mCondition.broadcast();
426    }
427
428    if (mSeqNumber < 0) {
429        if (mPlaylist->isComplete()) {
430            mSeqNumber = firstSeqNumberInPlaylist;
431        } else {
432            mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2;
433        }
434    }
435
436    int32_t lastSeqNumberInPlaylist =
437        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
438
439    if (mSeqNumber < firstSeqNumberInPlaylist
440            || mSeqNumber > lastSeqNumberInPlaylist) {
441        if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
442            // Go back to the previous bandwidth.
443
444            LOGI("new bandwidth does not have the sequence number "
445                 "we're looking for, switching back to previous bandwidth");
446
447            mLastPlaylistFetchTimeUs = -1;
448            bandwidthIndex = mPrevBandwidthIndex;
449            goto rinse_repeat;
450        }
451
452        if (!mPlaylist->isComplete()
453                && mSeqNumber > lastSeqNumberInPlaylist
454                && mNumRetries < kMaxNumRetries) {
455            ++mNumRetries;
456
457            mLastPlaylistFetchTimeUs = -1;
458            postMonitorQueue(3000000ll);
459            return;
460        }
461
462        LOGE("Cannot find sequence number %d in playlist "
463             "(contains %d - %d)",
464             mSeqNumber, firstSeqNumberInPlaylist,
465             firstSeqNumberInPlaylist + mPlaylist->size() - 1);
466
467        mDataSource->queueEOS(ERROR_END_OF_STREAM);
468        return;
469    }
470
471    mNumRetries = 0;
472
473    AString uri;
474    sp<AMessage> itemMeta;
475    CHECK(mPlaylist->itemAt(
476                mSeqNumber - firstSeqNumberInPlaylist,
477                &uri,
478                &itemMeta));
479
480    int32_t val;
481    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
482        explicitDiscontinuity = true;
483    }
484
485    sp<ABuffer> buffer;
486    status_t err = fetchFile(uri.c_str(), &buffer);
487    if (err != OK) {
488        LOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
489        mDataSource->queueEOS(err);
490        return;
491    }
492
493    CHECK(buffer != NULL);
494
495    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
496
497    if (err != OK) {
498        LOGE("decryptBuffer failed w/ error %d", err);
499
500        mDataSource->queueEOS(err);
501        return;
502    }
503
504    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
505        // Not a transport stream???
506
507        LOGE("This doesn't look like a transport stream...");
508
509        mBandwidthItems.removeAt(bandwidthIndex);
510
511        if (mBandwidthItems.isEmpty()) {
512            mDataSource->queueEOS(ERROR_UNSUPPORTED);
513            return;
514        }
515
516        LOGI("Retrying with a different bandwidth stream.");
517
518        mLastPlaylistFetchTimeUs = -1;
519        bandwidthIndex = getBandwidthIndex();
520        mPrevBandwidthIndex = bandwidthIndex;
521        mSeqNumber = -1;
522
523        goto rinse_repeat;
524    }
525
526    if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
527        bandwidthChanged = true;
528    }
529
530    if (mPrevBandwidthIndex < 0) {
531        // Don't signal a bandwidth change at the very beginning of
532        // playback.
533        bandwidthChanged = false;
534    }
535
536    if (explicitDiscontinuity || bandwidthChanged) {
537        // Signal discontinuity.
538
539        LOGI("queueing discontinuity (explicit=%d, bandwidthChanged=%d)",
540             explicitDiscontinuity, bandwidthChanged);
541
542        sp<ABuffer> tmp = new ABuffer(188);
543        memset(tmp->data(), 0, tmp->size());
544        tmp->data()[1] = bandwidthChanged;
545
546        mDataSource->queueBuffer(tmp);
547    }
548
549    mDataSource->queueBuffer(buffer);
550
551    mPrevBandwidthIndex = bandwidthIndex;
552    ++mSeqNumber;
553
554    postMonitorQueue();
555}
556
557void LiveSession::onMonitorQueue() {
558    if (mSeekTimeUs >= 0
559            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
560        onDownloadNext();
561    } else {
562        postMonitorQueue(1000000ll);
563    }
564}
565
566status_t LiveSession::decryptBuffer(
567        size_t playlistIndex, const sp<ABuffer> &buffer) {
568    sp<AMessage> itemMeta;
569    bool found = false;
570    AString method;
571
572    for (ssize_t i = playlistIndex; i >= 0; --i) {
573        AString uri;
574        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
575
576        if (itemMeta->findString("cipher-method", &method)) {
577            found = true;
578            break;
579        }
580    }
581
582    if (!found) {
583        method = "NONE";
584    }
585
586    if (method == "NONE") {
587        return OK;
588    } else if (!(method == "AES-128")) {
589        LOGE("Unsupported cipher method '%s'", method.c_str());
590        return ERROR_UNSUPPORTED;
591    }
592
593    AString keyURI;
594    if (!itemMeta->findString("cipher-uri", &keyURI)) {
595        LOGE("Missing key uri");
596        return ERROR_MALFORMED;
597    }
598
599    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
600
601    sp<ABuffer> key;
602    if (index >= 0) {
603        key = mAESKeyForURI.valueAt(index);
604    } else {
605        key = new ABuffer(16);
606
607        sp<NuHTTPDataSource> keySource = new NuHTTPDataSource;
608        status_t err = keySource->connect(keyURI.c_str());
609
610        if (err == OK) {
611            size_t offset = 0;
612            while (offset < 16) {
613                ssize_t n = keySource->readAt(
614                        offset, key->data() + offset, 16 - offset);
615                if (n <= 0) {
616                    err = ERROR_IO;
617                    break;
618                }
619
620                offset += n;
621            }
622        }
623
624        if (err != OK) {
625            LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
626            return ERROR_IO;
627        }
628
629        mAESKeyForURI.add(keyURI, key);
630    }
631
632    AES_KEY aes_key;
633    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
634        LOGE("failed to set AES decryption key.");
635        return UNKNOWN_ERROR;
636    }
637
638    unsigned char aes_ivec[16];
639
640    AString iv;
641    if (itemMeta->findString("cipher-iv", &iv)) {
642        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
643                || iv.size() != 16 * 2 + 2) {
644            LOGE("malformed cipher IV '%s'.", iv.c_str());
645            return ERROR_MALFORMED;
646        }
647
648        memset(aes_ivec, 0, sizeof(aes_ivec));
649        for (size_t i = 0; i < 16; ++i) {
650            char c1 = tolower(iv.c_str()[2 + 2 * i]);
651            char c2 = tolower(iv.c_str()[3 + 2 * i]);
652            if (!isxdigit(c1) || !isxdigit(c2)) {
653                LOGE("malformed cipher IV '%s'.", iv.c_str());
654                return ERROR_MALFORMED;
655            }
656            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
657            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
658
659            aes_ivec[i] = nibble1 << 4 | nibble2;
660        }
661    } else {
662        memset(aes_ivec, 0, sizeof(aes_ivec));
663        aes_ivec[15] = mSeqNumber & 0xff;
664        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
665        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
666        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
667    }
668
669    AES_cbc_encrypt(
670            buffer->data(), buffer->data(), buffer->size(),
671            &aes_key, aes_ivec, AES_DECRYPT);
672
673    // hexdump(buffer->data(), buffer->size());
674
675    size_t n = buffer->size();
676    CHECK_GT(n, 0u);
677
678    size_t pad = buffer->data()[n - 1];
679
680    CHECK_GT(pad, 0u);
681    CHECK_LE(pad, 16u);
682    CHECK_GE((size_t)n, pad);
683    for (size_t i = 0; i < pad; ++i) {
684        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
685    }
686
687    n -= pad;
688
689    buffer->setRange(buffer->offset(), n);
690
691    return OK;
692}
693
694void LiveSession::postMonitorQueue(int64_t delayUs) {
695    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
696    msg->setInt32("generation", ++mMonitorQueueGeneration);
697    msg->post(delayUs);
698}
699
700void LiveSession::onSeek(const sp<AMessage> &msg) {
701    int64_t timeUs;
702    CHECK(msg->findInt64("timeUs", &timeUs));
703
704    mSeekTimeUs = timeUs;
705    postMonitorQueue();
706}
707
708status_t LiveSession::getDuration(int64_t *durationUs) {
709    Mutex::Autolock autoLock(mLock);
710    *durationUs = mDurationUs;
711
712    return OK;
713}
714
715bool LiveSession::isSeekable() {
716    int64_t durationUs;
717    return getDuration(&durationUs) == OK && durationUs >= 0;
718}
719
720}  // namespace android
721
722