LiveSession.cpp revision ab8a0badb8fb1e294dacf2eb6a891439f348aff9
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "LiveSession"
19#include <utils/Log.h>
20
21#include "include/LiveSession.h"
22
23#include "LiveDataSource.h"
24
25#include "include/M3UParser.h"
26#include "include/NuHTTPDataSource.h"
27
28#include <cutils/properties.h>
29#include <media/stagefright/foundation/hexdump.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <media/stagefright/DataSource.h>
34#include <media/stagefright/FileSource.h>
35#include <media/stagefright/MediaErrors.h>
36
37#include <ctype.h>
38#include <openssl/aes.h>
39
40namespace android {
41
42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll;
43
44LiveSession::LiveSession()
45    : mDataSource(new LiveDataSource),
46      mHTTPDataSource(new NuHTTPDataSource),
47      mPrevBandwidthIndex(-1),
48      mLastPlaylistFetchTimeUs(-1),
49      mSeqNumber(-1),
50      mSeekTimeUs(-1),
51      mNumRetries(0),
52      mDurationUs(-1),
53      mSeekDone(false),
54      mDisconnectPending(false),
55      mMonitorQueueGeneration(0) {
56}
57
58LiveSession::~LiveSession() {
59}
60
61sp<DataSource> LiveSession::getDataSource() {
62    return mDataSource;
63}
64
65void LiveSession::connect(const char *url) {
66    sp<AMessage> msg = new AMessage(kWhatConnect, id());
67    msg->setString("url", url);
68    msg->post();
69}
70
71void LiveSession::disconnect() {
72    Mutex::Autolock autoLock(mLock);
73    mDisconnectPending = true;
74
75    mHTTPDataSource->disconnect();
76
77    (new AMessage(kWhatDisconnect, id()))->post();
78}
79
80void LiveSession::seekTo(int64_t timeUs) {
81    Mutex::Autolock autoLock(mLock);
82    mSeekDone = false;
83
84    sp<AMessage> msg = new AMessage(kWhatSeek, id());
85    msg->setInt64("timeUs", timeUs);
86    msg->post();
87
88    while (!mSeekDone) {
89        mCondition.wait(mLock);
90    }
91}
92
93void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
94    switch (msg->what()) {
95        case kWhatConnect:
96            onConnect(msg);
97            break;
98
99        case kWhatDisconnect:
100            onDisconnect();
101            break;
102
103        case kWhatMonitorQueue:
104        {
105            int32_t generation;
106            CHECK(msg->findInt32("generation", &generation));
107
108            if (generation != mMonitorQueueGeneration) {
109                // Stale event
110                break;
111            }
112
113            onMonitorQueue();
114            break;
115        }
116
117        case kWhatSeek:
118            onSeek(msg);
119            break;
120
121        default:
122            TRESPASS();
123            break;
124    }
125}
126
127// static
128int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
129    if (a->mBandwidth < b->mBandwidth) {
130        return -1;
131    } else if (a->mBandwidth == b->mBandwidth) {
132        return 0;
133    }
134
135    return 1;
136}
137
138void LiveSession::onConnect(const sp<AMessage> &msg) {
139    AString url;
140    CHECK(msg->findString("url", &url));
141
142    LOGI("onConnect '%s'", url.c_str());
143
144    mMasterURL = url;
145
146    sp<M3UParser> playlist = fetchPlaylist(url.c_str());
147
148    if (playlist == NULL) {
149        LOGE("unable to fetch master playlist '%s'.", url.c_str());
150
151        mDataSource->queueEOS(ERROR_IO);
152        return;
153    }
154
155    if (playlist->isVariantPlaylist()) {
156        for (size_t i = 0; i < playlist->size(); ++i) {
157            BandwidthItem item;
158
159            sp<AMessage> meta;
160            playlist->itemAt(i, &item.mURI, &meta);
161
162            unsigned long bandwidth;
163            CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
164
165            mBandwidthItems.push(item);
166        }
167
168        CHECK_GT(mBandwidthItems.size(), 0u);
169
170        mBandwidthItems.sort(SortByBandwidth);
171
172        char value[PROPERTY_VALUE_MAX];
173        if (property_get("media.httplive.disable-nuplayer", value, NULL)
174                && (!strcasecmp(value, "true") || !strcmp(value, "1"))) {
175            // The "legacy" player cannot deal with audio format changes,
176            // some streams use different audio encoding parameters for
177            // their lowest bandwidth stream.
178            if (mBandwidthItems.size() > 1) {
179                // XXX Remove the lowest bitrate stream for now...
180                mBandwidthItems.removeAt(0);
181            }
182        }
183    }
184
185    postMonitorQueue();
186}
187
188void LiveSession::onDisconnect() {
189    LOGI("onDisconnect");
190
191    mDataSource->queueEOS(ERROR_END_OF_STREAM);
192
193    Mutex::Autolock autoLock(mLock);
194    mDisconnectPending = false;
195}
196
197status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) {
198    *out = NULL;
199
200    sp<DataSource> source;
201
202    if (!strncasecmp(url, "file://", 7)) {
203        source = new FileSource(url + 7);
204    } else if (strncasecmp(url, "http://", 7)) {
205        return ERROR_UNSUPPORTED;
206    } else {
207        {
208            Mutex::Autolock autoLock(mLock);
209
210            if (mDisconnectPending) {
211                return ERROR_IO;
212            }
213        }
214
215        status_t err = mHTTPDataSource->connect(url);
216
217        if (err != OK) {
218            return err;
219        }
220
221        source = mHTTPDataSource;
222    }
223
224    off64_t size;
225    status_t err = source->getSize(&size);
226
227    if (err != OK) {
228        size = 65536;
229    }
230
231    sp<ABuffer> buffer = new ABuffer(size);
232    buffer->setRange(0, 0);
233
234    for (;;) {
235        size_t bufferRemaining = buffer->capacity() - buffer->size();
236
237        if (bufferRemaining == 0) {
238            bufferRemaining = 32768;
239
240            LOGV("increasing download buffer to %d bytes",
241                 buffer->size() + bufferRemaining);
242
243            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
244            memcpy(copy->data(), buffer->data(), buffer->size());
245            copy->setRange(0, buffer->size());
246
247            buffer = copy;
248        }
249
250        ssize_t n = source->readAt(
251                buffer->size(), buffer->data() + buffer->size(),
252                bufferRemaining);
253
254        if (n < 0) {
255            return n;
256        }
257
258        if (n == 0) {
259            break;
260        }
261
262        buffer->setRange(0, buffer->size() + (size_t)n);
263    }
264
265    *out = buffer;
266
267    return OK;
268}
269
270sp<M3UParser> LiveSession::fetchPlaylist(const char *url) {
271    sp<ABuffer> buffer;
272    status_t err = fetchFile(url, &buffer);
273
274    if (err != OK) {
275        return NULL;
276    }
277
278    sp<M3UParser> playlist =
279        new M3UParser(url, buffer->data(), buffer->size());
280
281    if (playlist->initCheck() != OK) {
282        return NULL;
283    }
284
285    return playlist;
286}
287
288static double uniformRand() {
289    return (double)rand() / RAND_MAX;
290}
291
292size_t LiveSession::getBandwidthIndex() {
293    if (mBandwidthItems.size() == 0) {
294        return 0;
295    }
296
297#if 1
298    int32_t bandwidthBps;
299    if (mHTTPDataSource != NULL
300            && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
301        LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
302    } else {
303        LOGV("no bandwidth estimate.");
304        return 0;  // Pick the lowest bandwidth stream by default.
305    }
306
307    char value[PROPERTY_VALUE_MAX];
308    if (property_get("media.httplive.max-bw", value, NULL)) {
309        char *end;
310        long maxBw = strtoul(value, &end, 10);
311        if (end > value && *end == '\0') {
312            if (maxBw > 0 && bandwidthBps > maxBw) {
313                LOGV("bandwidth capped to %ld bps", maxBw);
314                bandwidthBps = maxBw;
315            }
316        }
317    }
318
319    // Consider only 80% of the available bandwidth usable.
320    bandwidthBps = (bandwidthBps * 8) / 10;
321
322    // Pick the highest bandwidth stream below or equal to estimated bandwidth.
323
324    size_t index = mBandwidthItems.size() - 1;
325    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
326                            > (size_t)bandwidthBps) {
327        --index;
328    }
329#elif 0
330    // Change bandwidth at random()
331    size_t index = uniformRand() * mBandwidthItems.size();
332#elif 0
333    // There's a 50% chance to stay on the current bandwidth and
334    // a 50% chance to switch to the next higher bandwidth (wrapping around
335    // to lowest)
336    const size_t kMinIndex = 0;
337
338    size_t index;
339    if (mPrevBandwidthIndex < 0) {
340        index = kMinIndex;
341    } else if (uniformRand() < 0.5) {
342        index = (size_t)mPrevBandwidthIndex;
343    } else {
344        index = mPrevBandwidthIndex + 1;
345        if (index == mBandwidthItems.size()) {
346            index = kMinIndex;
347        }
348    }
349#elif 0
350    // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
351
352    size_t index = mBandwidthItems.size() - 1;
353    while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
354        --index;
355    }
356#else
357    size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
358#endif
359
360    return index;
361}
362
363void LiveSession::onDownloadNext() {
364    size_t bandwidthIndex = getBandwidthIndex();
365
366rinse_repeat:
367    int64_t nowUs = ALooper::GetNowUs();
368
369    if (mLastPlaylistFetchTimeUs < 0
370            || (ssize_t)bandwidthIndex != mPrevBandwidthIndex
371            || (!mPlaylist->isComplete()
372                && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) {
373        AString url;
374        if (mBandwidthItems.size() > 0) {
375            url = mBandwidthItems.editItemAt(bandwidthIndex).mURI;
376        } else {
377            url = mMasterURL;
378        }
379
380        bool firstTime = (mPlaylist == NULL);
381
382        mPlaylist = fetchPlaylist(url.c_str());
383        if (mPlaylist == NULL) {
384            LOGE("failed to load playlist at url '%s'", url.c_str());
385            mDataSource->queueEOS(ERROR_IO);
386            return;
387        }
388
389        if (firstTime) {
390            Mutex::Autolock autoLock(mLock);
391
392            int32_t targetDuration;
393            if (!mPlaylist->isComplete()
394                    || !mPlaylist->meta()->findInt32(
395                    "target-duration", &targetDuration)) {
396                mDurationUs = -1;
397            } else {
398                mDurationUs = 1000000ll * targetDuration * mPlaylist->size();
399            }
400        }
401
402        mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
403    }
404
405    int32_t firstSeqNumberInPlaylist;
406    if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
407                "media-sequence", &firstSeqNumberInPlaylist)) {
408        firstSeqNumberInPlaylist = 0;
409    }
410
411    bool explicitDiscontinuity = false;
412    bool bandwidthChanged = false;
413
414    if (mSeekTimeUs >= 0) {
415        int32_t targetDuration;
416        if (mPlaylist->isComplete() &&
417                mPlaylist->meta()->findInt32(
418                    "target-duration", &targetDuration)) {
419            int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll;
420            int64_t index = seekTimeSecs / targetDuration;
421
422            if (index >= 0 && index < mPlaylist->size()) {
423                int32_t newSeqNumber = firstSeqNumberInPlaylist + index;
424
425                if (newSeqNumber != mSeqNumber) {
426                    LOGI("seeking to seq no %d", newSeqNumber);
427
428                    mSeqNumber = newSeqNumber;
429
430                    mDataSource->reset();
431
432                    // reseting the data source will have had the
433                    // side effect of discarding any previously queued
434                    // bandwidth change discontinuity.
435                    // Therefore we'll need to treat these explicit
436                    // discontinuities as involving a bandwidth change
437                    // even if they aren't directly.
438                    explicitDiscontinuity = true;
439                    bandwidthChanged = true;
440                }
441            }
442        }
443
444        mSeekTimeUs = -1;
445
446        Mutex::Autolock autoLock(mLock);
447        mSeekDone = true;
448        mCondition.broadcast();
449    }
450
451    if (mSeqNumber < 0) {
452        if (mPlaylist->isComplete()) {
453            mSeqNumber = firstSeqNumberInPlaylist;
454        } else {
455            mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2;
456        }
457    }
458
459    int32_t lastSeqNumberInPlaylist =
460        firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
461
462    if (mSeqNumber < firstSeqNumberInPlaylist
463            || mSeqNumber > lastSeqNumberInPlaylist) {
464        if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) {
465            // Go back to the previous bandwidth.
466
467            LOGI("new bandwidth does not have the sequence number "
468                 "we're looking for, switching back to previous bandwidth");
469
470            mLastPlaylistFetchTimeUs = -1;
471            bandwidthIndex = mPrevBandwidthIndex;
472            goto rinse_repeat;
473        }
474
475        if (!mPlaylist->isComplete()
476                && mSeqNumber > lastSeqNumberInPlaylist
477                && mNumRetries < kMaxNumRetries) {
478            ++mNumRetries;
479
480            mLastPlaylistFetchTimeUs = -1;
481            postMonitorQueue(3000000ll);
482            return;
483        }
484
485        LOGE("Cannot find sequence number %d in playlist "
486             "(contains %d - %d)",
487             mSeqNumber, firstSeqNumberInPlaylist,
488             firstSeqNumberInPlaylist + mPlaylist->size() - 1);
489
490        mDataSource->queueEOS(ERROR_END_OF_STREAM);
491        return;
492    }
493
494    mNumRetries = 0;
495
496    AString uri;
497    sp<AMessage> itemMeta;
498    CHECK(mPlaylist->itemAt(
499                mSeqNumber - firstSeqNumberInPlaylist,
500                &uri,
501                &itemMeta));
502
503    int32_t val;
504    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
505        explicitDiscontinuity = true;
506    }
507
508    sp<ABuffer> buffer;
509    status_t err = fetchFile(uri.c_str(), &buffer);
510    if (err != OK) {
511        LOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
512        mDataSource->queueEOS(err);
513        return;
514    }
515
516    CHECK(buffer != NULL);
517
518    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
519
520    if (err != OK) {
521        LOGE("decryptBuffer failed w/ error %d", err);
522
523        mDataSource->queueEOS(err);
524        return;
525    }
526
527    if (buffer->size() == 0 || buffer->data()[0] != 0x47) {
528        // Not a transport stream???
529
530        LOGE("This doesn't look like a transport stream...");
531
532        mBandwidthItems.removeAt(bandwidthIndex);
533
534        if (mBandwidthItems.isEmpty()) {
535            mDataSource->queueEOS(ERROR_UNSUPPORTED);
536            return;
537        }
538
539        LOGI("Retrying with a different bandwidth stream.");
540
541        mLastPlaylistFetchTimeUs = -1;
542        bandwidthIndex = getBandwidthIndex();
543        mPrevBandwidthIndex = bandwidthIndex;
544        mSeqNumber = -1;
545
546        goto rinse_repeat;
547    }
548
549    if ((size_t)mPrevBandwidthIndex != bandwidthIndex) {
550        bandwidthChanged = true;
551    }
552
553    if (mPrevBandwidthIndex < 0) {
554        // Don't signal a bandwidth change at the very beginning of
555        // playback.
556        bandwidthChanged = false;
557    }
558
559    if (explicitDiscontinuity || bandwidthChanged) {
560        // Signal discontinuity.
561
562        LOGI("queueing discontinuity (explicit=%d, bandwidthChanged=%d)",
563             explicitDiscontinuity, bandwidthChanged);
564
565        sp<ABuffer> tmp = new ABuffer(188);
566        memset(tmp->data(), 0, tmp->size());
567        tmp->data()[1] = bandwidthChanged;
568
569        mDataSource->queueBuffer(tmp);
570    }
571
572    mDataSource->queueBuffer(buffer);
573
574    mPrevBandwidthIndex = bandwidthIndex;
575    ++mSeqNumber;
576
577    postMonitorQueue();
578}
579
580void LiveSession::onMonitorQueue() {
581    if (mSeekTimeUs >= 0
582            || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) {
583        onDownloadNext();
584    } else {
585        postMonitorQueue(1000000ll);
586    }
587}
588
589status_t LiveSession::decryptBuffer(
590        size_t playlistIndex, const sp<ABuffer> &buffer) {
591    sp<AMessage> itemMeta;
592    bool found = false;
593    AString method;
594
595    for (ssize_t i = playlistIndex; i >= 0; --i) {
596        AString uri;
597        CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
598
599        if (itemMeta->findString("cipher-method", &method)) {
600            found = true;
601            break;
602        }
603    }
604
605    if (!found) {
606        method = "NONE";
607    }
608
609    if (method == "NONE") {
610        return OK;
611    } else if (!(method == "AES-128")) {
612        LOGE("Unsupported cipher method '%s'", method.c_str());
613        return ERROR_UNSUPPORTED;
614    }
615
616    AString keyURI;
617    if (!itemMeta->findString("cipher-uri", &keyURI)) {
618        LOGE("Missing key uri");
619        return ERROR_MALFORMED;
620    }
621
622    ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
623
624    sp<ABuffer> key;
625    if (index >= 0) {
626        key = mAESKeyForURI.valueAt(index);
627    } else {
628        key = new ABuffer(16);
629
630        sp<NuHTTPDataSource> keySource = new NuHTTPDataSource;
631        status_t err = keySource->connect(keyURI.c_str());
632
633        if (err == OK) {
634            size_t offset = 0;
635            while (offset < 16) {
636                ssize_t n = keySource->readAt(
637                        offset, key->data() + offset, 16 - offset);
638                if (n <= 0) {
639                    err = ERROR_IO;
640                    break;
641                }
642
643                offset += n;
644            }
645        }
646
647        if (err != OK) {
648            LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
649            return ERROR_IO;
650        }
651
652        mAESKeyForURI.add(keyURI, key);
653    }
654
655    AES_KEY aes_key;
656    if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
657        LOGE("failed to set AES decryption key.");
658        return UNKNOWN_ERROR;
659    }
660
661    unsigned char aes_ivec[16];
662
663    AString iv;
664    if (itemMeta->findString("cipher-iv", &iv)) {
665        if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
666                || iv.size() != 16 * 2 + 2) {
667            LOGE("malformed cipher IV '%s'.", iv.c_str());
668            return ERROR_MALFORMED;
669        }
670
671        memset(aes_ivec, 0, sizeof(aes_ivec));
672        for (size_t i = 0; i < 16; ++i) {
673            char c1 = tolower(iv.c_str()[2 + 2 * i]);
674            char c2 = tolower(iv.c_str()[3 + 2 * i]);
675            if (!isxdigit(c1) || !isxdigit(c2)) {
676                LOGE("malformed cipher IV '%s'.", iv.c_str());
677                return ERROR_MALFORMED;
678            }
679            uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
680            uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
681
682            aes_ivec[i] = nibble1 << 4 | nibble2;
683        }
684    } else {
685        memset(aes_ivec, 0, sizeof(aes_ivec));
686        aes_ivec[15] = mSeqNumber & 0xff;
687        aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
688        aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
689        aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
690    }
691
692    AES_cbc_encrypt(
693            buffer->data(), buffer->data(), buffer->size(),
694            &aes_key, aes_ivec, AES_DECRYPT);
695
696    // hexdump(buffer->data(), buffer->size());
697
698    size_t n = buffer->size();
699    CHECK_GT(n, 0u);
700
701    size_t pad = buffer->data()[n - 1];
702
703    CHECK_GT(pad, 0u);
704    CHECK_LE(pad, 16u);
705    CHECK_GE((size_t)n, pad);
706    for (size_t i = 0; i < pad; ++i) {
707        CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
708    }
709
710    n -= pad;
711
712    buffer->setRange(buffer->offset(), n);
713
714    return OK;
715}
716
717void LiveSession::postMonitorQueue(int64_t delayUs) {
718    sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
719    msg->setInt32("generation", ++mMonitorQueueGeneration);
720    msg->post(delayUs);
721}
722
723void LiveSession::onSeek(const sp<AMessage> &msg) {
724    int64_t timeUs;
725    CHECK(msg->findInt64("timeUs", &timeUs));
726
727    mSeekTimeUs = timeUs;
728    postMonitorQueue();
729}
730
731status_t LiveSession::getDuration(int64_t *durationUs) {
732    Mutex::Autolock autoLock(mLock);
733    *durationUs = mDurationUs;
734
735    return OK;
736}
737
738bool LiveSession::isSeekable() {
739    int64_t durationUs;
740    return getDuration(&durationUs) == OK && durationUs >= 0;
741}
742
743}  // namespace android
744
745