RTSPSource.cpp revision 8d237a5ce1e3c1dbc1d538f47e68cff2cc52d799
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "RTSPSource"
19#include <utils/Log.h>
20
21#include "RTSPSource.h"
22
23#include "AnotherPacketSource.h"
24#include "MyHandler.h"
25#include "SDPLoader.h"
26
27#include <media/IMediaHTTPService.h>
28#include <media/stagefright/MediaDefs.h>
29#include <media/stagefright/MetaData.h>
30
31namespace android {
32
33const int64_t kNearEOSTimeoutUs = 2000000ll; // 2 secs
34
35NuPlayer::RTSPSource::RTSPSource(
36        const sp<AMessage> &notify,
37        const sp<IMediaHTTPService> &httpService,
38        const char *url,
39        const KeyedVector<String8, String8> *headers,
40        bool uidValid,
41        uid_t uid,
42        bool isSDP)
43    : Source(notify),
44      mHTTPService(httpService),
45      mURL(url),
46      mUIDValid(uidValid),
47      mUID(uid),
48      mFlags(0),
49      mIsSDP(isSDP),
50      mState(DISCONNECTED),
51      mFinalResult(OK),
52      mDisconnectReplyID(0),
53      mBuffering(false),
54      mSeekGeneration(0),
55      mEOSTimeoutAudio(0),
56      mEOSTimeoutVideo(0) {
57    if (headers) {
58        mExtraHeaders = *headers;
59
60        ssize_t index =
61            mExtraHeaders.indexOfKey(String8("x-hide-urls-from-log"));
62
63        if (index >= 0) {
64            mFlags |= kFlagIncognito;
65
66            mExtraHeaders.removeItemsAt(index);
67        }
68    }
69}
70
71NuPlayer::RTSPSource::~RTSPSource() {
72    if (mLooper != NULL) {
73        mLooper->unregisterHandler(id());
74        mLooper->stop();
75    }
76}
77
78void NuPlayer::RTSPSource::prepareAsync() {
79    if (mLooper == NULL) {
80        mLooper = new ALooper;
81        mLooper->setName("rtsp");
82        mLooper->start();
83
84        mLooper->registerHandler(this);
85    }
86
87    CHECK(mHandler == NULL);
88    CHECK(mSDPLoader == NULL);
89
90    sp<AMessage> notify = new AMessage(kWhatNotify, this);
91
92    CHECK_EQ(mState, (int)DISCONNECTED);
93    mState = CONNECTING;
94
95    if (mIsSDP) {
96        mSDPLoader = new SDPLoader(notify,
97                (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0,
98                mHTTPService);
99
100        mSDPLoader->load(
101                mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders);
102    } else {
103        mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID);
104        mLooper->registerHandler(mHandler);
105
106        mHandler->connect();
107    }
108
109    startBufferingIfNecessary();
110}
111
112void NuPlayer::RTSPSource::start() {
113}
114
115void NuPlayer::RTSPSource::stop() {
116    if (mLooper == NULL) {
117        return;
118    }
119    sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
120
121    sp<AMessage> dummy;
122    msg->postAndAwaitResponse(&dummy);
123}
124
125void NuPlayer::RTSPSource::pause() {
126    int64_t mediaDurationUs = 0;
127    getDuration(&mediaDurationUs);
128    for (size_t index = 0; index < mTracks.size(); index++) {
129        TrackInfo *info = &mTracks.editItemAt(index);
130        sp<AnotherPacketSource> source = info->mSource;
131
132        // Check if EOS or ERROR is received
133        if (source != NULL && source->isFinished(mediaDurationUs)) {
134            return;
135        }
136    }
137    mHandler->pause();
138}
139
140void NuPlayer::RTSPSource::resume() {
141    if (mHandler != NULL) {
142        mHandler->resume();
143    }
144}
145
146status_t NuPlayer::RTSPSource::feedMoreTSData() {
147    Mutex::Autolock _l(mBufferingLock);
148    return mFinalResult;
149}
150
151sp<MetaData> NuPlayer::RTSPSource::getFormatMeta(bool audio) {
152    sp<AnotherPacketSource> source = getSource(audio);
153
154    if (source == NULL) {
155        return NULL;
156    }
157
158    return source->getFormat();
159}
160
161bool NuPlayer::RTSPSource::haveSufficientDataOnAllTracks() {
162    // We're going to buffer at least 2 secs worth data on all tracks before
163    // starting playback (both at startup and after a seek).
164
165    static const int64_t kMinDurationUs = 2000000ll;
166
167    int64_t mediaDurationUs = 0;
168    getDuration(&mediaDurationUs);
169    if ((mAudioTrack != NULL && mAudioTrack->isFinished(mediaDurationUs))
170            || (mVideoTrack != NULL && mVideoTrack->isFinished(mediaDurationUs))) {
171        return true;
172    }
173
174    status_t err;
175    int64_t durationUs;
176    if (mAudioTrack != NULL
177            && (durationUs = mAudioTrack->getBufferedDurationUs(&err))
178                    < kMinDurationUs
179            && err == OK) {
180        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
181              durationUs / 1E6);
182        return false;
183    }
184
185    if (mVideoTrack != NULL
186            && (durationUs = mVideoTrack->getBufferedDurationUs(&err))
187                    < kMinDurationUs
188            && err == OK) {
189        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
190              durationUs / 1E6);
191        return false;
192    }
193
194    return true;
195}
196
197status_t NuPlayer::RTSPSource::dequeueAccessUnit(
198        bool audio, sp<ABuffer> *accessUnit) {
199    if (!stopBufferingIfNecessary()) {
200        return -EWOULDBLOCK;
201    }
202
203    sp<AnotherPacketSource> source = getSource(audio);
204
205    if (source == NULL) {
206        return -EWOULDBLOCK;
207    }
208
209    status_t finalResult;
210    if (!source->hasBufferAvailable(&finalResult)) {
211        if (finalResult == OK) {
212            int64_t mediaDurationUs = 0;
213            getDuration(&mediaDurationUs);
214            sp<AnotherPacketSource> otherSource = getSource(!audio);
215            status_t otherFinalResult;
216
217            // If other source already signaled EOS, this source should also signal EOS
218            if (otherSource != NULL &&
219                    !otherSource->hasBufferAvailable(&otherFinalResult) &&
220                    otherFinalResult == ERROR_END_OF_STREAM) {
221                source->signalEOS(ERROR_END_OF_STREAM);
222                return ERROR_END_OF_STREAM;
223            }
224
225            // If this source has detected near end, give it some time to retrieve more
226            // data before signaling EOS
227            if (source->isFinished(mediaDurationUs)) {
228                int64_t eosTimeout = audio ? mEOSTimeoutAudio : mEOSTimeoutVideo;
229                if (eosTimeout == 0) {
230                    setEOSTimeout(audio, ALooper::GetNowUs());
231                } else if ((ALooper::GetNowUs() - eosTimeout) > kNearEOSTimeoutUs) {
232                    setEOSTimeout(audio, 0);
233                    source->signalEOS(ERROR_END_OF_STREAM);
234                    return ERROR_END_OF_STREAM;
235                }
236                return -EWOULDBLOCK;
237            }
238
239            if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) {
240                // We should not enter buffering mode
241                // if any of the sources already have detected EOS.
242                startBufferingIfNecessary();
243            }
244
245            return -EWOULDBLOCK;
246        }
247        return finalResult;
248    }
249
250    setEOSTimeout(audio, 0);
251
252    return source->dequeueAccessUnit(accessUnit);
253}
254
255sp<AnotherPacketSource> NuPlayer::RTSPSource::getSource(bool audio) {
256    if (mTSParser != NULL) {
257        sp<MediaSource> source = mTSParser->getSource(
258                audio ? ATSParser::AUDIO : ATSParser::VIDEO);
259
260        return static_cast<AnotherPacketSource *>(source.get());
261    }
262
263    return audio ? mAudioTrack : mVideoTrack;
264}
265
266void NuPlayer::RTSPSource::setEOSTimeout(bool audio, int64_t timeout) {
267    if (audio) {
268        mEOSTimeoutAudio = timeout;
269    } else {
270        mEOSTimeoutVideo = timeout;
271    }
272}
273
274status_t NuPlayer::RTSPSource::getDuration(int64_t *durationUs) {
275    *durationUs = 0ll;
276
277    int64_t audioDurationUs;
278    if (mAudioTrack != NULL
279            && mAudioTrack->getFormat()->findInt64(
280                kKeyDuration, &audioDurationUs)
281            && audioDurationUs > *durationUs) {
282        *durationUs = audioDurationUs;
283    }
284
285    int64_t videoDurationUs;
286    if (mVideoTrack != NULL
287            && mVideoTrack->getFormat()->findInt64(
288                kKeyDuration, &videoDurationUs)
289            && videoDurationUs > *durationUs) {
290        *durationUs = videoDurationUs;
291    }
292
293    return OK;
294}
295
296status_t NuPlayer::RTSPSource::seekTo(int64_t seekTimeUs) {
297    sp<AMessage> msg = new AMessage(kWhatPerformSeek, this);
298    msg->setInt32("generation", ++mSeekGeneration);
299    msg->setInt64("timeUs", seekTimeUs);
300
301    sp<AMessage> response;
302    status_t err = msg->postAndAwaitResponse(&response);
303    if (err == OK && response != NULL) {
304        CHECK(response->findInt32("err", &err));
305    }
306
307    return err;
308}
309
310void NuPlayer::RTSPSource::performSeek(int64_t seekTimeUs) {
311    if (mState != CONNECTED) {
312        finishSeek(INVALID_OPERATION);
313        return;
314    }
315
316    mState = SEEKING;
317    mHandler->seek(seekTimeUs);
318}
319
320void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) {
321    if (msg->what() == kWhatDisconnect) {
322        sp<AReplyToken> replyID;
323        CHECK(msg->senderAwaitsResponse(&replyID));
324
325        mDisconnectReplyID = replyID;
326        finishDisconnectIfPossible();
327        return;
328    } else if (msg->what() == kWhatPerformSeek) {
329        int32_t generation;
330        CHECK(msg->findInt32("generation", &generation));
331        CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
332
333        if (generation != mSeekGeneration) {
334            // obsolete.
335            finishSeek(OK);
336            return;
337        }
338
339        int64_t seekTimeUs;
340        CHECK(msg->findInt64("timeUs", &seekTimeUs));
341
342        performSeek(seekTimeUs);
343        return;
344    }
345
346    CHECK_EQ(msg->what(), (int)kWhatNotify);
347
348    int32_t what;
349    CHECK(msg->findInt32("what", &what));
350
351    switch (what) {
352        case MyHandler::kWhatConnected:
353        {
354            onConnected();
355
356            notifyVideoSizeChanged();
357
358            uint32_t flags = 0;
359
360            if (mHandler->isSeekable()) {
361                flags = FLAG_CAN_PAUSE
362                        | FLAG_CAN_SEEK
363                        | FLAG_CAN_SEEK_BACKWARD
364                        | FLAG_CAN_SEEK_FORWARD;
365            }
366
367            notifyFlagsChanged(flags);
368            notifyPrepared();
369            break;
370        }
371
372        case MyHandler::kWhatDisconnected:
373        {
374            onDisconnected(msg);
375            break;
376        }
377
378        case MyHandler::kWhatSeekDone:
379        {
380            mState = CONNECTED;
381            if (mSeekReplyID != NULL) {
382                // Unblock seekTo here in case we attempted to seek in a live stream
383                finishSeek(OK);
384            }
385            break;
386        }
387
388        case MyHandler::kWhatSeekPaused:
389        {
390            sp<AnotherPacketSource> source = getSource(true /* audio */);
391            if (source != NULL) {
392                source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE,
393                        /* extra */ NULL,
394                        /* discard */ true);
395            }
396            source = getSource(false /* video */);
397            if (source != NULL) {
398                source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE,
399                        /* extra */ NULL,
400                        /* discard */ true);
401            };
402
403            status_t err = OK;
404            msg->findInt32("err", &err);
405            finishSeek(err);
406
407            if (err == OK) {
408                int64_t timeUs;
409                CHECK(msg->findInt64("time", &timeUs));
410                mHandler->continueSeekAfterPause(timeUs);
411            }
412            break;
413        }
414
415        case MyHandler::kWhatAccessUnit:
416        {
417            size_t trackIndex;
418            CHECK(msg->findSize("trackIndex", &trackIndex));
419
420            if (mTSParser == NULL) {
421                CHECK_LT(trackIndex, mTracks.size());
422            } else {
423                CHECK_EQ(trackIndex, 0u);
424            }
425
426            sp<ABuffer> accessUnit;
427            CHECK(msg->findBuffer("accessUnit", &accessUnit));
428
429            int32_t damaged;
430            if (accessUnit->meta()->findInt32("damaged", &damaged)
431                    && damaged) {
432                ALOGI("dropping damaged access unit.");
433                break;
434            }
435
436            if (mTSParser != NULL) {
437                size_t offset = 0;
438                status_t err = OK;
439                while (offset + 188 <= accessUnit->size()) {
440                    err = mTSParser->feedTSPacket(
441                            accessUnit->data() + offset, 188);
442                    if (err != OK) {
443                        break;
444                    }
445
446                    offset += 188;
447                }
448
449                if (offset < accessUnit->size()) {
450                    err = ERROR_MALFORMED;
451                }
452
453                if (err != OK) {
454                    sp<AnotherPacketSource> source = getSource(false /* audio */);
455                    if (source != NULL) {
456                        source->signalEOS(err);
457                    }
458
459                    source = getSource(true /* audio */);
460                    if (source != NULL) {
461                        source->signalEOS(err);
462                    }
463                }
464                break;
465            }
466
467            TrackInfo *info = &mTracks.editItemAt(trackIndex);
468
469            sp<AnotherPacketSource> source = info->mSource;
470            if (source != NULL) {
471                uint32_t rtpTime;
472                CHECK(accessUnit->meta()->findInt32("rtp-time", (int32_t *)&rtpTime));
473
474                if (!info->mNPTMappingValid) {
475                    // This is a live stream, we didn't receive any normal
476                    // playtime mapping. We won't map to npt time.
477                    source->queueAccessUnit(accessUnit);
478                    break;
479                }
480
481                int64_t nptUs =
482                    ((double)rtpTime - (double)info->mRTPTime)
483                        / info->mTimeScale
484                        * 1000000ll
485                        + info->mNormalPlaytimeUs;
486
487                accessUnit->meta()->setInt64("timeUs", nptUs);
488
489                source->queueAccessUnit(accessUnit);
490            }
491            break;
492        }
493
494        case MyHandler::kWhatEOS:
495        {
496            int32_t finalResult;
497            CHECK(msg->findInt32("finalResult", &finalResult));
498            CHECK_NE(finalResult, (status_t)OK);
499
500            if (mTSParser != NULL) {
501                sp<AnotherPacketSource> source = getSource(false /* audio */);
502                if (source != NULL) {
503                    source->signalEOS(finalResult);
504                }
505
506                source = getSource(true /* audio */);
507                if (source != NULL) {
508                    source->signalEOS(finalResult);
509                }
510
511                return;
512            }
513
514            size_t trackIndex;
515            CHECK(msg->findSize("trackIndex", &trackIndex));
516            CHECK_LT(trackIndex, mTracks.size());
517
518            TrackInfo *info = &mTracks.editItemAt(trackIndex);
519            sp<AnotherPacketSource> source = info->mSource;
520            if (source != NULL) {
521                source->signalEOS(finalResult);
522            }
523
524            break;
525        }
526
527        case MyHandler::kWhatSeekDiscontinuity:
528        {
529            size_t trackIndex;
530            CHECK(msg->findSize("trackIndex", &trackIndex));
531            CHECK_LT(trackIndex, mTracks.size());
532
533            TrackInfo *info = &mTracks.editItemAt(trackIndex);
534            sp<AnotherPacketSource> source = info->mSource;
535            if (source != NULL) {
536                source->queueDiscontinuity(
537                        ATSParser::DISCONTINUITY_TIME,
538                        NULL,
539                        true /* discard */);
540            }
541
542            break;
543        }
544
545        case MyHandler::kWhatNormalPlayTimeMapping:
546        {
547            size_t trackIndex;
548            CHECK(msg->findSize("trackIndex", &trackIndex));
549            CHECK_LT(trackIndex, mTracks.size());
550
551            uint32_t rtpTime;
552            CHECK(msg->findInt32("rtpTime", (int32_t *)&rtpTime));
553
554            int64_t nptUs;
555            CHECK(msg->findInt64("nptUs", &nptUs));
556
557            TrackInfo *info = &mTracks.editItemAt(trackIndex);
558            info->mRTPTime = rtpTime;
559            info->mNormalPlaytimeUs = nptUs;
560            info->mNPTMappingValid = true;
561            break;
562        }
563
564        case SDPLoader::kWhatSDPLoaded:
565        {
566            onSDPLoaded(msg);
567            break;
568        }
569
570        default:
571            TRESPASS();
572    }
573}
574
575void NuPlayer::RTSPSource::onConnected() {
576    CHECK(mAudioTrack == NULL);
577    CHECK(mVideoTrack == NULL);
578
579    size_t numTracks = mHandler->countTracks();
580    for (size_t i = 0; i < numTracks; ++i) {
581        int32_t timeScale;
582        sp<MetaData> format = mHandler->getTrackFormat(i, &timeScale);
583
584        const char *mime;
585        CHECK(format->findCString(kKeyMIMEType, &mime));
586
587        if (!strcasecmp(mime, MEDIA_MIMETYPE_CONTAINER_MPEG2TS)) {
588            // Very special case for MPEG2 Transport Streams.
589            CHECK_EQ(numTracks, 1u);
590
591            mTSParser = new ATSParser;
592            return;
593        }
594
595        bool isAudio = !strncasecmp(mime, "audio/", 6);
596        bool isVideo = !strncasecmp(mime, "video/", 6);
597
598        TrackInfo info;
599        info.mTimeScale = timeScale;
600        info.mRTPTime = 0;
601        info.mNormalPlaytimeUs = 0ll;
602        info.mNPTMappingValid = false;
603
604        if ((isAudio && mAudioTrack == NULL)
605                || (isVideo && mVideoTrack == NULL)) {
606            sp<AnotherPacketSource> source = new AnotherPacketSource(format);
607
608            if (isAudio) {
609                mAudioTrack = source;
610            } else {
611                mVideoTrack = source;
612            }
613
614            info.mSource = source;
615        }
616
617        mTracks.push(info);
618    }
619
620    mState = CONNECTED;
621}
622
623void NuPlayer::RTSPSource::onSDPLoaded(const sp<AMessage> &msg) {
624    status_t err;
625    CHECK(msg->findInt32("result", &err));
626
627    mSDPLoader.clear();
628
629    if (mDisconnectReplyID != 0) {
630        err = UNKNOWN_ERROR;
631    }
632
633    if (err == OK) {
634        sp<ASessionDescription> desc;
635        sp<RefBase> obj;
636        CHECK(msg->findObject("description", &obj));
637        desc = static_cast<ASessionDescription *>(obj.get());
638
639        AString rtspUri;
640        if (!desc->findAttribute(0, "a=control", &rtspUri)) {
641            ALOGE("Unable to find url in SDP");
642            err = UNKNOWN_ERROR;
643        } else {
644            sp<AMessage> notify = new AMessage(kWhatNotify, this);
645
646            mHandler = new MyHandler(rtspUri.c_str(), notify, mUIDValid, mUID);
647            mLooper->registerHandler(mHandler);
648
649            mHandler->loadSDP(desc);
650        }
651    }
652
653    if (err != OK) {
654        if (mState == CONNECTING) {
655            // We're still in the preparation phase, signal that it
656            // failed.
657            notifyPrepared(err);
658        }
659
660        mState = DISCONNECTED;
661        setError(err);
662
663        if (mDisconnectReplyID != 0) {
664            finishDisconnectIfPossible();
665        }
666    }
667}
668
669void NuPlayer::RTSPSource::onDisconnected(const sp<AMessage> &msg) {
670    if (mState == DISCONNECTED) {
671        return;
672    }
673
674    status_t err;
675    CHECK(msg->findInt32("result", &err));
676    CHECK_NE(err, (status_t)OK);
677
678    mLooper->unregisterHandler(mHandler->id());
679    mHandler.clear();
680
681    if (mState == CONNECTING) {
682        // We're still in the preparation phase, signal that it
683        // failed.
684        notifyPrepared(err);
685    }
686
687    mState = DISCONNECTED;
688    setError(err);
689
690    if (mDisconnectReplyID != 0) {
691        finishDisconnectIfPossible();
692    }
693}
694
695void NuPlayer::RTSPSource::finishDisconnectIfPossible() {
696    if (mState != DISCONNECTED) {
697        if (mHandler != NULL) {
698            mHandler->disconnect();
699        } else if (mSDPLoader != NULL) {
700            mSDPLoader->cancel();
701        }
702        return;
703    }
704
705    (new AMessage)->postReply(mDisconnectReplyID);
706    mDisconnectReplyID = 0;
707}
708
709void NuPlayer::RTSPSource::setError(status_t err) {
710    Mutex::Autolock _l(mBufferingLock);
711    mFinalResult = err;
712}
713
714void NuPlayer::RTSPSource::startBufferingIfNecessary() {
715    Mutex::Autolock _l(mBufferingLock);
716
717    if (!mBuffering) {
718        mBuffering = true;
719
720        sp<AMessage> notify = dupNotify();
721        notify->setInt32("what", kWhatBufferingStart);
722        notify->post();
723    }
724}
725
726bool NuPlayer::RTSPSource::stopBufferingIfNecessary() {
727    Mutex::Autolock _l(mBufferingLock);
728
729    if (mBuffering) {
730        if (!haveSufficientDataOnAllTracks()) {
731            return false;
732        }
733
734        mBuffering = false;
735
736        sp<AMessage> notify = dupNotify();
737        notify->setInt32("what", kWhatBufferingEnd);
738        notify->post();
739    }
740
741    return true;
742}
743
744void NuPlayer::RTSPSource::finishSeek(status_t err) {
745    CHECK(mSeekReplyID != NULL);
746    sp<AMessage> seekReply = new AMessage;
747    seekReply->setInt32("err", err);
748    seekReply->postReply(mSeekReplyID);
749    mSeekReplyID = NULL;
750}
751
752}  // namespace android
753