RTSPSource.cpp revision 6d3cd2e22c73e5b554a2c4e34d51616f8737e571
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "RTSPSource"
19#include <utils/Log.h>
20
21#include "RTSPSource.h"
22
23#include "AnotherPacketSource.h"
24#include "MyHandler.h"
25#include "SDPLoader.h"
26
27#include <media/IMediaHTTPService.h>
28#include <media/stagefright/MediaDefs.h>
29#include <media/stagefright/MetaData.h>
30
31namespace android {
32
33const int64_t kNearEOSTimeoutUs = 2000000ll; // 2 secs
34
35NuPlayer::RTSPSource::RTSPSource(
36        const sp<AMessage> &notify,
37        const sp<IMediaHTTPService> &httpService,
38        const char *url,
39        const KeyedVector<String8, String8> *headers,
40        bool uidValid,
41        uid_t uid,
42        bool isSDP)
43    : Source(notify),
44      mHTTPService(httpService),
45      mURL(url),
46      mUIDValid(uidValid),
47      mUID(uid),
48      mFlags(0),
49      mIsSDP(isSDP),
50      mState(DISCONNECTED),
51      mFinalResult(OK),
52      mDisconnectReplyID(0),
53      mBuffering(false),
54      mSeekGeneration(0),
55      mEOSTimeoutAudio(0),
56      mEOSTimeoutVideo(0) {
57    if (headers) {
58        mExtraHeaders = *headers;
59
60        ssize_t index =
61            mExtraHeaders.indexOfKey(String8("x-hide-urls-from-log"));
62
63        if (index >= 0) {
64            mFlags |= kFlagIncognito;
65
66            mExtraHeaders.removeItemsAt(index);
67        }
68    }
69}
70
71NuPlayer::RTSPSource::~RTSPSource() {
72    if (mLooper != NULL) {
73        mLooper->unregisterHandler(id());
74        mLooper->stop();
75    }
76}
77
78void NuPlayer::RTSPSource::prepareAsync() {
79    if (mLooper == NULL) {
80        mLooper = new ALooper;
81        mLooper->setName("rtsp");
82        mLooper->start();
83
84        mLooper->registerHandler(this);
85    }
86
87    CHECK(mHandler == NULL);
88    CHECK(mSDPLoader == NULL);
89
90    sp<AMessage> notify = new AMessage(kWhatNotify, this);
91
92    CHECK_EQ(mState, (int)DISCONNECTED);
93    mState = CONNECTING;
94
95    if (mIsSDP) {
96        mSDPLoader = new SDPLoader(notify,
97                (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0,
98                mHTTPService);
99
100        mSDPLoader->load(
101                mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
102    } else {
103        mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID);
104        mLooper->registerHandler(mHandler);
105
106        mHandler->connect();
107    }
108
109    startBufferingIfNecessary();
110}
111
112void NuPlayer::RTSPSource::start() {
113}
114
115void NuPlayer::RTSPSource::stop() {
116    if (mLooper == NULL) {
117        return;
118    }
119    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
120
121    sp<AMessage> dummy;
122    msg->postAndAwaitResponse(&dummy);
123}
124
125void NuPlayer::RTSPSource::pause() {
126    int64_t mediaDurationUs = 0;
127    getDuration(&mediaDurationUs);
128    for (size_t index = 0; index < mTracks.size(); index++) {
129        TrackInfo *info = &mTracks.editItemAt(index);
130        sp<AnotherPacketSource> source = info->mSource;
131
132        // Check if EOS or ERROR is received
133        if (source != NULL && source->isFinished(mediaDurationUs)) {
134            return;
135        }
136    }
137    if (mHandler != NULL) {
138        mHandler->pause();
139    }
140}
141
142void NuPlayer::RTSPSource::resume() {
143    if (mHandler != NULL) {
144        mHandler->resume();
145    }
146}
147
148status_t NuPlayer::RTSPSource::feedMoreTSData() {
149    Mutex::Autolock _l(mBufferingLock);
150    return mFinalResult;
151}
152
153sp<MetaData> NuPlayer::RTSPSource::getFormatMeta(bool audio) {
154    sp<AnotherPacketSource> source = getSource(audio);
155
156    if (source == NULL) {
157        return NULL;
158    }
159
160    return source->getFormat();
161}
162
163bool NuPlayer::RTSPSource::haveSufficientDataOnAllTracks() {
164    // We're going to buffer at least 2 secs worth data on all tracks before
165    // starting playback (both at startup and after a seek).
166
167    static const int64_t kMinDurationUs = 2000000ll;
168
169    int64_t mediaDurationUs = 0;
170    getDuration(&mediaDurationUs);
171    if ((mAudioTrack != NULL && mAudioTrack->isFinished(mediaDurationUs))
172            || (mVideoTrack != NULL && mVideoTrack->isFinished(mediaDurationUs))) {
173        return true;
174    }
175
176    status_t err;
177    int64_t durationUs;
178    if (mAudioTrack != NULL
179            && (durationUs = mAudioTrack->getBufferedDurationUs(&err))
180                    < kMinDurationUs
181            && err == OK) {
182        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
183              durationUs / 1E6);
184        return false;
185    }
186
187    if (mVideoTrack != NULL
188            && (durationUs = mVideoTrack->getBufferedDurationUs(&err))
189                    < kMinDurationUs
190            && err == OK) {
191        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
192              durationUs / 1E6);
193        return false;
194    }
195
196    return true;
197}
198
199status_t NuPlayer::RTSPSource::dequeueAccessUnit(
200        bool audio, sp<ABuffer> *accessUnit) {
201    if (!stopBufferingIfNecessary()) {
202        return -EWOULDBLOCK;
203    }
204
205    sp<AnotherPacketSource> source = getSource(audio);
206
207    if (source == NULL) {
208        return -EWOULDBLOCK;
209    }
210
211    status_t finalResult;
212    if (!source->hasBufferAvailable(&finalResult)) {
213        if (finalResult == OK) {
214            int64_t mediaDurationUs = 0;
215            getDuration(&mediaDurationUs);
216            sp<AnotherPacketSource> otherSource = getSource(!audio);
217            status_t otherFinalResult;
218
219            // If other source already signaled EOS, this source should also signal EOS
220            if (otherSource != NULL &&
221                    !otherSource->hasBufferAvailable(&otherFinalResult) &&
222                    otherFinalResult == ERROR_END_OF_STREAM) {
223                source->signalEOS(ERROR_END_OF_STREAM);
224                return ERROR_END_OF_STREAM;
225            }
226
227            // If this source has detected near end, give it some time to retrieve more
228            // data before signaling EOS
229            if (source->isFinished(mediaDurationUs)) {
230                int64_t eosTimeout = audio ? mEOSTimeoutAudio : mEOSTimeoutVideo;
231                if (eosTimeout == 0) {
232                    setEOSTimeout(audio, ALooper::GetNowUs());
233                } else if ((ALooper::GetNowUs() - eosTimeout) > kNearEOSTimeoutUs) {
234                    setEOSTimeout(audio, 0);
235                    source->signalEOS(ERROR_END_OF_STREAM);
236                    return ERROR_END_OF_STREAM;
237                }
238                return -EWOULDBLOCK;
239            }
240
241            if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) {
242                // We should not enter buffering mode
243                // if any of the sources already have detected EOS.
244                startBufferingIfNecessary();
245            }
246
247            return -EWOULDBLOCK;
248        }
249        return finalResult;
250    }
251
252    setEOSTimeout(audio, 0);
253
254    return source->dequeueAccessUnit(accessUnit);
255}
256
257sp<AnotherPacketSource> NuPlayer::RTSPSource::getSource(bool audio) {
258    if (mTSParser != NULL) {
259        sp<MediaSource> source = mTSParser->getSource(
260                audio ? ATSParser::AUDIO : ATSParser::VIDEO);
261
262        return static_cast<AnotherPacketSource *>(source.get());
263    }
264
265    return audio ? mAudioTrack : mVideoTrack;
266}
267
268void NuPlayer::RTSPSource::setEOSTimeout(bool audio, int64_t timeout) {
269    if (audio) {
270        mEOSTimeoutAudio = timeout;
271    } else {
272        mEOSTimeoutVideo = timeout;
273    }
274}
275
276status_t NuPlayer::RTSPSource::getDuration(int64_t *durationUs) {
277    *durationUs = 0ll;
278
279    int64_t audioDurationUs;
280    if (mAudioTrack != NULL
281            && mAudioTrack->getFormat()->findInt64(
282                kKeyDuration, &audioDurationUs)
283            && audioDurationUs > *durationUs) {
284        *durationUs = audioDurationUs;
285    }
286
287    int64_t videoDurationUs;
288    if (mVideoTrack != NULL
289            && mVideoTrack->getFormat()->findInt64(
290                kKeyDuration, &videoDurationUs)
291            && videoDurationUs > *durationUs) {
292        *durationUs = videoDurationUs;
293    }
294
295    return OK;
296}
297
298status_t NuPlayer::RTSPSource::seekTo(int64_t seekTimeUs) {
299    sp<AMessage> msg = new AMessage(kWhatPerformSeek, this);
300    msg->setInt32("generation", ++mSeekGeneration);
301    msg->setInt64("timeUs", seekTimeUs);
302
303    sp<AMessage> response;
304    status_t err = msg->postAndAwaitResponse(&response);
305    if (err == OK && response != NULL) {
306        CHECK(response->findInt32("err", &err));
307    }
308
309    return err;
310}
311
312void NuPlayer::RTSPSource::performSeek(int64_t seekTimeUs) {
313    if (mState != CONNECTED) {
314        finishSeek(INVALID_OPERATION);
315        return;
316    }
317
318    mState = SEEKING;
319    mHandler->seek(seekTimeUs);
320}
321
322void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) {
323    if (msg->what() == kWhatDisconnect) {
324        sp<AReplyToken> replyID;
325        CHECK(msg->senderAwaitsResponse(&replyID));
326
327        mDisconnectReplyID = replyID;
328        finishDisconnectIfPossible();
329        return;
330    } else if (msg->what() == kWhatPerformSeek) {
331        int32_t generation;
332        CHECK(msg->findInt32("generation", &generation));
333        CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
334
335        if (generation != mSeekGeneration) {
336            // obsolete.
337            finishSeek(OK);
338            return;
339        }
340
341        int64_t seekTimeUs;
342        CHECK(msg->findInt64("timeUs", &seekTimeUs));
343
344        performSeek(seekTimeUs);
345        return;
346    }
347
348    CHECK_EQ(msg->what(), (int)kWhatNotify);
349
350    int32_t what;
351    CHECK(msg->findInt32("what", &what));
352
353    switch (what) {
354        case MyHandler::kWhatConnected:
355        {
356            onConnected();
357
358            notifyVideoSizeChanged();
359
360            uint32_t flags = 0;
361
362            if (mHandler->isSeekable()) {
363                flags = FLAG_CAN_PAUSE
364                        | FLAG_CAN_SEEK
365                        | FLAG_CAN_SEEK_BACKWARD
366                        | FLAG_CAN_SEEK_FORWARD;
367            }
368
369            notifyFlagsChanged(flags);
370            notifyPrepared();
371            break;
372        }
373
374        case MyHandler::kWhatDisconnected:
375        {
376            onDisconnected(msg);
377            break;
378        }
379
380        case MyHandler::kWhatSeekDone:
381        {
382            mState = CONNECTED;
383            if (mSeekReplyID != NULL) {
384                // Unblock seekTo here in case we attempted to seek in a live stream
385                finishSeek(OK);
386            }
387            break;
388        }
389
390        case MyHandler::kWhatSeekPaused:
391        {
392            sp<AnotherPacketSource> source = getSource(true /* audio */);
393            if (source != NULL) {
394                source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE,
395                        /* extra */ NULL,
396                        /* discard */ true);
397            }
398            source = getSource(false /* video */);
399            if (source != NULL) {
400                source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE,
401                        /* extra */ NULL,
402                        /* discard */ true);
403            };
404
405            status_t err = OK;
406            msg->findInt32("err", &err);
407            finishSeek(err);
408
409            if (err == OK) {
410                int64_t timeUs;
411                CHECK(msg->findInt64("time", &timeUs));
412                mHandler->continueSeekAfterPause(timeUs);
413            }
414            break;
415        }
416
417        case MyHandler::kWhatAccessUnit:
418        {
419            size_t trackIndex;
420            CHECK(msg->findSize("trackIndex", &trackIndex));
421
422            if (mTSParser == NULL) {
423                CHECK_LT(trackIndex, mTracks.size());
424            } else {
425                CHECK_EQ(trackIndex, 0u);
426            }
427
428            sp<ABuffer> accessUnit;
429            CHECK(msg->findBuffer("accessUnit", &accessUnit));
430
431            int32_t damaged;
432            if (accessUnit->meta()->findInt32("damaged", &damaged)
433                    && damaged) {
434                ALOGI("dropping damaged access unit.");
435                break;
436            }
437
438            if (mTSParser != NULL) {
439                size_t offset = 0;
440                status_t err = OK;
441                while (offset + 188 <= accessUnit->size()) {
442                    err = mTSParser->feedTSPacket(
443                            accessUnit->data() + offset, 188);
444                    if (err != OK) {
445                        break;
446                    }
447
448                    offset += 188;
449                }
450
451                if (offset < accessUnit->size()) {
452                    err = ERROR_MALFORMED;
453                }
454
455                if (err != OK) {
456                    sp<AnotherPacketSource> source = getSource(false /* audio */);
457                    if (source != NULL) {
458                        source->signalEOS(err);
459                    }
460
461                    source = getSource(true /* audio */);
462                    if (source != NULL) {
463                        source->signalEOS(err);
464                    }
465                }
466                break;
467            }
468
469            TrackInfo *info = &mTracks.editItemAt(trackIndex);
470
471            sp<AnotherPacketSource> source = info->mSource;
472            if (source != NULL) {
473                uint32_t rtpTime;
474                CHECK(accessUnit->meta()->findInt32("rtp-time", (int32_t *)&rtpTime));
475
476                if (!info->mNPTMappingValid) {
477                    // This is a live stream, we didn't receive any normal
478                    // playtime mapping. We won't map to npt time.
479                    source->queueAccessUnit(accessUnit);
480                    break;
481                }
482
483                int64_t nptUs =
484                    ((double)rtpTime - (double)info->mRTPTime)
485                        / info->mTimeScale
486                        * 1000000ll
487                        + info->mNormalPlaytimeUs;
488
489                accessUnit->meta()->setInt64("timeUs", nptUs);
490
491                source->queueAccessUnit(accessUnit);
492            }
493            break;
494        }
495
496        case MyHandler::kWhatEOS:
497        {
498            int32_t finalResult;
499            CHECK(msg->findInt32("finalResult", &finalResult));
500            CHECK_NE(finalResult, (status_t)OK);
501
502            if (mTSParser != NULL) {
503                sp<AnotherPacketSource> source = getSource(false /* audio */);
504                if (source != NULL) {
505                    source->signalEOS(finalResult);
506                }
507
508                source = getSource(true /* audio */);
509                if (source != NULL) {
510                    source->signalEOS(finalResult);
511                }
512
513                return;
514            }
515
516            size_t trackIndex;
517            CHECK(msg->findSize("trackIndex", &trackIndex));
518            CHECK_LT(trackIndex, mTracks.size());
519
520            TrackInfo *info = &mTracks.editItemAt(trackIndex);
521            sp<AnotherPacketSource> source = info->mSource;
522            if (source != NULL) {
523                source->signalEOS(finalResult);
524            }
525
526            break;
527        }
528
529        case MyHandler::kWhatSeekDiscontinuity:
530        {
531            size_t trackIndex;
532            CHECK(msg->findSize("trackIndex", &trackIndex));
533            CHECK_LT(trackIndex, mTracks.size());
534
535            TrackInfo *info = &mTracks.editItemAt(trackIndex);
536            sp<AnotherPacketSource> source = info->mSource;
537            if (source != NULL) {
538                source->queueDiscontinuity(
539                        ATSParser::DISCONTINUITY_TIME,
540                        NULL,
541                        true /* discard */);
542            }
543
544            break;
545        }
546
547        case MyHandler::kWhatNormalPlayTimeMapping:
548        {
549            size_t trackIndex;
550            CHECK(msg->findSize("trackIndex", &trackIndex));
551            CHECK_LT(trackIndex, mTracks.size());
552
553            uint32_t rtpTime;
554            CHECK(msg->findInt32("rtpTime", (int32_t *)&rtpTime));
555
556            int64_t nptUs;
557            CHECK(msg->findInt64("nptUs", &nptUs));
558
559            TrackInfo *info = &mTracks.editItemAt(trackIndex);
560            info->mRTPTime = rtpTime;
561            info->mNormalPlaytimeUs = nptUs;
562            info->mNPTMappingValid = true;
563            break;
564        }
565
566        case SDPLoader::kWhatSDPLoaded:
567        {
568            onSDPLoaded(msg);
569            break;
570        }
571
572        default:
573            TRESPASS();
574    }
575}
576
577void NuPlayer::RTSPSource::onConnected() {
578    CHECK(mAudioTrack == NULL);
579    CHECK(mVideoTrack == NULL);
580
581    size_t numTracks = mHandler->countTracks();
582    for (size_t i = 0; i < numTracks; ++i) {
583        int32_t timeScale;
584        sp<MetaData> format = mHandler->getTrackFormat(i, &timeScale);
585
586        const char *mime;
587        CHECK(format->findCString(kKeyMIMEType, &mime));
588
589        if (!strcasecmp(mime, MEDIA_MIMETYPE_CONTAINER_MPEG2TS)) {
590            // Very special case for MPEG2 Transport Streams.
591            CHECK_EQ(numTracks, 1u);
592
593            mTSParser = new ATSParser;
594            return;
595        }
596
597        bool isAudio = !strncasecmp(mime, "audio/", 6);
598        bool isVideo = !strncasecmp(mime, "video/", 6);
599
600        TrackInfo info;
601        info.mTimeScale = timeScale;
602        info.mRTPTime = 0;
603        info.mNormalPlaytimeUs = 0ll;
604        info.mNPTMappingValid = false;
605
606        if ((isAudio && mAudioTrack == NULL)
607                || (isVideo && mVideoTrack == NULL)) {
608            sp<AnotherPacketSource> source = new AnotherPacketSource(format);
609
610            if (isAudio) {
611                mAudioTrack = source;
612            } else {
613                mVideoTrack = source;
614            }
615
616            info.mSource = source;
617        }
618
619        mTracks.push(info);
620    }
621
622    mState = CONNECTED;
623}
624
625void NuPlayer::RTSPSource::onSDPLoaded(const sp<AMessage> &msg) {
626    status_t err;
627    CHECK(msg->findInt32("result", &err));
628
629    mSDPLoader.clear();
630
631    if (mDisconnectReplyID != 0) {
632        err = UNKNOWN_ERROR;
633    }
634
635    if (err == OK) {
636        sp<ASessionDescription> desc;
637        sp<RefBase> obj;
638        CHECK(msg->findObject("description", &obj));
639        desc = static_cast<ASessionDescription *>(obj.get());
640
641        AString rtspUri;
642        if (!desc->findAttribute(0, "a=control", &rtspUri)) {
643            ALOGE("Unable to find url in SDP");
644            err = UNKNOWN_ERROR;
645        } else {
646            sp<AMessage> notify = new AMessage(kWhatNotify, this);
647
648            mHandler = new MyHandler(rtspUri.c_str(), notify, mUIDValid, mUID);
649            mLooper->registerHandler(mHandler);
650
651            mHandler->loadSDP(desc);
652        }
653    }
654
655    if (err != OK) {
656        if (mState == CONNECTING) {
657            // We're still in the preparation phase, signal that it
658            // failed.
659            notifyPrepared(err);
660        }
661
662        mState = DISCONNECTED;
663        setError(err);
664
665        if (mDisconnectReplyID != 0) {
666            finishDisconnectIfPossible();
667        }
668    }
669}
670
671void NuPlayer::RTSPSource::onDisconnected(const sp<AMessage> &msg) {
672    if (mState == DISCONNECTED) {
673        return;
674    }
675
676    status_t err;
677    CHECK(msg->findInt32("result", &err));
678    CHECK_NE(err, (status_t)OK);
679
680    mLooper->unregisterHandler(mHandler->id());
681    mHandler.clear();
682
683    if (mState == CONNECTING) {
684        // We're still in the preparation phase, signal that it
685        // failed.
686        notifyPrepared(err);
687    }
688
689    mState = DISCONNECTED;
690    setError(err);
691
692    if (mDisconnectReplyID != 0) {
693        finishDisconnectIfPossible();
694    }
695}
696
697void NuPlayer::RTSPSource::finishDisconnectIfPossible() {
698    if (mState != DISCONNECTED) {
699        if (mHandler != NULL) {
700            mHandler->disconnect();
701        } else if (mSDPLoader != NULL) {
702            mSDPLoader->cancel();
703        }
704        return;
705    }
706
707    (new AMessage)->postReply(mDisconnectReplyID);
708    mDisconnectReplyID = 0;
709}
710
711void NuPlayer::RTSPSource::setError(status_t err) {
712    Mutex::Autolock _l(mBufferingLock);
713    mFinalResult = err;
714}
715
716void NuPlayer::RTSPSource::startBufferingIfNecessary() {
717    Mutex::Autolock _l(mBufferingLock);
718
719    if (!mBuffering) {
720        mBuffering = true;
721
722        sp<AMessage> notify = dupNotify();
723        notify->setInt32("what", kWhatBufferingStart);
724        notify->post();
725    }
726}
727
728bool NuPlayer::RTSPSource::stopBufferingIfNecessary() {
729    Mutex::Autolock _l(mBufferingLock);
730
731    if (mBuffering) {
732        if (!haveSufficientDataOnAllTracks()) {
733            return false;
734        }
735
736        mBuffering = false;
737
738        sp<AMessage> notify = dupNotify();
739        notify->setInt32("what", kWhatBufferingEnd);
740        notify->post();
741    }
742
743    return true;
744}
745
746void NuPlayer::RTSPSource::finishSeek(status_t err) {
747    CHECK(mSeekReplyID != NULL);
748    sp<AMessage> seekReply = new AMessage;
749    seekReply->setInt32("err", err);
750    seekReply->postReply(mSeekReplyID);
751    mSeekReplyID = NULL;
752}
753
754}  // namespace android
755