RTSPSource.cpp revision 8d237a5ce1e3c1dbc1d538f47e68cff2cc52d799
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "RTSPSource" 19#include <utils/Log.h> 20 21#include "RTSPSource.h" 22 23#include "AnotherPacketSource.h" 24#include "MyHandler.h" 25#include "SDPLoader.h" 26 27#include <media/IMediaHTTPService.h> 28#include <media/stagefright/MediaDefs.h> 29#include <media/stagefright/MetaData.h> 30 31namespace android { 32 33const int64_t kNearEOSTimeoutUs = 2000000ll; // 2 secs 34 35NuPlayer::RTSPSource::RTSPSource( 36 const sp<AMessage> ¬ify, 37 const sp<IMediaHTTPService> &httpService, 38 const char *url, 39 const KeyedVector<String8, String8> *headers, 40 bool uidValid, 41 uid_t uid, 42 bool isSDP) 43 : Source(notify), 44 mHTTPService(httpService), 45 mURL(url), 46 mUIDValid(uidValid), 47 mUID(uid), 48 mFlags(0), 49 mIsSDP(isSDP), 50 mState(DISCONNECTED), 51 mFinalResult(OK), 52 mDisconnectReplyID(0), 53 mBuffering(false), 54 mSeekGeneration(0), 55 mEOSTimeoutAudio(0), 56 mEOSTimeoutVideo(0) { 57 if (headers) { 58 mExtraHeaders = *headers; 59 60 ssize_t index = 61 mExtraHeaders.indexOfKey(String8("x-hide-urls-from-log")); 62 63 if (index >= 0) { 64 mFlags |= kFlagIncognito; 65 66 mExtraHeaders.removeItemsAt(index); 67 } 68 } 69} 70 71NuPlayer::RTSPSource::~RTSPSource() { 72 if (mLooper != NULL) { 73 mLooper->unregisterHandler(id()); 74 mLooper->stop(); 75 } 76} 77 78void NuPlayer::RTSPSource::prepareAsync() { 79 if (mLooper == NULL) { 80 mLooper = new ALooper; 81 mLooper->setName("rtsp"); 82 mLooper->start(); 83 84 mLooper->registerHandler(this); 85 } 86 87 CHECK(mHandler == NULL); 88 CHECK(mSDPLoader == NULL); 89 90 sp<AMessage> notify = new AMessage(kWhatNotify, this); 91 92 CHECK_EQ(mState, (int)DISCONNECTED); 93 mState = CONNECTING; 94 95 if (mIsSDP) { 96 mSDPLoader = new SDPLoader(notify, 97 (mFlags & kFlagIncognito) ? SDPLoader::kFlagIncognito : 0, 98 mHTTPService); 99 100 mSDPLoader->load( 101 mURL.c_str(), mExtraHeaders.isEmpty() ? NULL : &mExtraHeaders); 102 } else { 103 mHandler = new MyHandler(mURL.c_str(), notify, mUIDValid, mUID); 104 mLooper->registerHandler(mHandler); 105 106 mHandler->connect(); 107 } 108 109 startBufferingIfNecessary(); 110} 111 112void NuPlayer::RTSPSource::start() { 113} 114 115void NuPlayer::RTSPSource::stop() { 116 if (mLooper == NULL) { 117 return; 118 } 119 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 120 121 sp<AMessage> dummy; 122 msg->postAndAwaitResponse(&dummy); 123} 124 125void NuPlayer::RTSPSource::pause() { 126 int64_t mediaDurationUs = 0; 127 getDuration(&mediaDurationUs); 128 for (size_t index = 0; index < mTracks.size(); index++) { 129 TrackInfo *info = &mTracks.editItemAt(index); 130 sp<AnotherPacketSource> source = info->mSource; 131 132 // Check if EOS or ERROR is received 133 if (source != NULL && source->isFinished(mediaDurationUs)) { 134 return; 135 } 136 } 137 mHandler->pause(); 138} 139 140void NuPlayer::RTSPSource::resume() { 141 if (mHandler != NULL) { 142 mHandler->resume(); 143 } 144} 145 146status_t NuPlayer::RTSPSource::feedMoreTSData() { 147 Mutex::Autolock _l(mBufferingLock); 148 return mFinalResult; 149} 150 151sp<MetaData> NuPlayer::RTSPSource::getFormatMeta(bool audio) { 152 sp<AnotherPacketSource> source = getSource(audio); 153 154 if (source == NULL) { 155 return NULL; 156 } 157 158 return source->getFormat(); 159} 160 161bool NuPlayer::RTSPSource::haveSufficientDataOnAllTracks() { 162 // We're going to buffer at least 2 secs worth data on all tracks before 163 // starting playback (both at startup and after a seek). 164 165 static const int64_t kMinDurationUs = 2000000ll; 166 167 int64_t mediaDurationUs = 0; 168 getDuration(&mediaDurationUs); 169 if ((mAudioTrack != NULL && mAudioTrack->isFinished(mediaDurationUs)) 170 || (mVideoTrack != NULL && mVideoTrack->isFinished(mediaDurationUs))) { 171 return true; 172 } 173 174 status_t err; 175 int64_t durationUs; 176 if (mAudioTrack != NULL 177 && (durationUs = mAudioTrack->getBufferedDurationUs(&err)) 178 < kMinDurationUs 179 && err == OK) { 180 ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)", 181 durationUs / 1E6); 182 return false; 183 } 184 185 if (mVideoTrack != NULL 186 && (durationUs = mVideoTrack->getBufferedDurationUs(&err)) 187 < kMinDurationUs 188 && err == OK) { 189 ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)", 190 durationUs / 1E6); 191 return false; 192 } 193 194 return true; 195} 196 197status_t NuPlayer::RTSPSource::dequeueAccessUnit( 198 bool audio, sp<ABuffer> *accessUnit) { 199 if (!stopBufferingIfNecessary()) { 200 return -EWOULDBLOCK; 201 } 202 203 sp<AnotherPacketSource> source = getSource(audio); 204 205 if (source == NULL) { 206 return -EWOULDBLOCK; 207 } 208 209 status_t finalResult; 210 if (!source->hasBufferAvailable(&finalResult)) { 211 if (finalResult == OK) { 212 int64_t mediaDurationUs = 0; 213 getDuration(&mediaDurationUs); 214 sp<AnotherPacketSource> otherSource = getSource(!audio); 215 status_t otherFinalResult; 216 217 // If other source already signaled EOS, this source should also signal EOS 218 if (otherSource != NULL && 219 !otherSource->hasBufferAvailable(&otherFinalResult) && 220 otherFinalResult == ERROR_END_OF_STREAM) { 221 source->signalEOS(ERROR_END_OF_STREAM); 222 return ERROR_END_OF_STREAM; 223 } 224 225 // If this source has detected near end, give it some time to retrieve more 226 // data before signaling EOS 227 if (source->isFinished(mediaDurationUs)) { 228 int64_t eosTimeout = audio ? mEOSTimeoutAudio : mEOSTimeoutVideo; 229 if (eosTimeout == 0) { 230 setEOSTimeout(audio, ALooper::GetNowUs()); 231 } else if ((ALooper::GetNowUs() - eosTimeout) > kNearEOSTimeoutUs) { 232 setEOSTimeout(audio, 0); 233 source->signalEOS(ERROR_END_OF_STREAM); 234 return ERROR_END_OF_STREAM; 235 } 236 return -EWOULDBLOCK; 237 } 238 239 if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) { 240 // We should not enter buffering mode 241 // if any of the sources already have detected EOS. 242 startBufferingIfNecessary(); 243 } 244 245 return -EWOULDBLOCK; 246 } 247 return finalResult; 248 } 249 250 setEOSTimeout(audio, 0); 251 252 return source->dequeueAccessUnit(accessUnit); 253} 254 255sp<AnotherPacketSource> NuPlayer::RTSPSource::getSource(bool audio) { 256 if (mTSParser != NULL) { 257 sp<MediaSource> source = mTSParser->getSource( 258 audio ? ATSParser::AUDIO : ATSParser::VIDEO); 259 260 return static_cast<AnotherPacketSource *>(source.get()); 261 } 262 263 return audio ? mAudioTrack : mVideoTrack; 264} 265 266void NuPlayer::RTSPSource::setEOSTimeout(bool audio, int64_t timeout) { 267 if (audio) { 268 mEOSTimeoutAudio = timeout; 269 } else { 270 mEOSTimeoutVideo = timeout; 271 } 272} 273 274status_t NuPlayer::RTSPSource::getDuration(int64_t *durationUs) { 275 *durationUs = 0ll; 276 277 int64_t audioDurationUs; 278 if (mAudioTrack != NULL 279 && mAudioTrack->getFormat()->findInt64( 280 kKeyDuration, &audioDurationUs) 281 && audioDurationUs > *durationUs) { 282 *durationUs = audioDurationUs; 283 } 284 285 int64_t videoDurationUs; 286 if (mVideoTrack != NULL 287 && mVideoTrack->getFormat()->findInt64( 288 kKeyDuration, &videoDurationUs) 289 && videoDurationUs > *durationUs) { 290 *durationUs = videoDurationUs; 291 } 292 293 return OK; 294} 295 296status_t NuPlayer::RTSPSource::seekTo(int64_t seekTimeUs) { 297 sp<AMessage> msg = new AMessage(kWhatPerformSeek, this); 298 msg->setInt32("generation", ++mSeekGeneration); 299 msg->setInt64("timeUs", seekTimeUs); 300 301 sp<AMessage> response; 302 status_t err = msg->postAndAwaitResponse(&response); 303 if (err == OK && response != NULL) { 304 CHECK(response->findInt32("err", &err)); 305 } 306 307 return err; 308} 309 310void NuPlayer::RTSPSource::performSeek(int64_t seekTimeUs) { 311 if (mState != CONNECTED) { 312 finishSeek(INVALID_OPERATION); 313 return; 314 } 315 316 mState = SEEKING; 317 mHandler->seek(seekTimeUs); 318} 319 320void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) { 321 if (msg->what() == kWhatDisconnect) { 322 sp<AReplyToken> replyID; 323 CHECK(msg->senderAwaitsResponse(&replyID)); 324 325 mDisconnectReplyID = replyID; 326 finishDisconnectIfPossible(); 327 return; 328 } else if (msg->what() == kWhatPerformSeek) { 329 int32_t generation; 330 CHECK(msg->findInt32("generation", &generation)); 331 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 332 333 if (generation != mSeekGeneration) { 334 // obsolete. 335 finishSeek(OK); 336 return; 337 } 338 339 int64_t seekTimeUs; 340 CHECK(msg->findInt64("timeUs", &seekTimeUs)); 341 342 performSeek(seekTimeUs); 343 return; 344 } 345 346 CHECK_EQ(msg->what(), (int)kWhatNotify); 347 348 int32_t what; 349 CHECK(msg->findInt32("what", &what)); 350 351 switch (what) { 352 case MyHandler::kWhatConnected: 353 { 354 onConnected(); 355 356 notifyVideoSizeChanged(); 357 358 uint32_t flags = 0; 359 360 if (mHandler->isSeekable()) { 361 flags = FLAG_CAN_PAUSE 362 | FLAG_CAN_SEEK 363 | FLAG_CAN_SEEK_BACKWARD 364 | FLAG_CAN_SEEK_FORWARD; 365 } 366 367 notifyFlagsChanged(flags); 368 notifyPrepared(); 369 break; 370 } 371 372 case MyHandler::kWhatDisconnected: 373 { 374 onDisconnected(msg); 375 break; 376 } 377 378 case MyHandler::kWhatSeekDone: 379 { 380 mState = CONNECTED; 381 if (mSeekReplyID != NULL) { 382 // Unblock seekTo here in case we attempted to seek in a live stream 383 finishSeek(OK); 384 } 385 break; 386 } 387 388 case MyHandler::kWhatSeekPaused: 389 { 390 sp<AnotherPacketSource> source = getSource(true /* audio */); 391 if (source != NULL) { 392 source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE, 393 /* extra */ NULL, 394 /* discard */ true); 395 } 396 source = getSource(false /* video */); 397 if (source != NULL) { 398 source->queueDiscontinuity(ATSParser::DISCONTINUITY_NONE, 399 /* extra */ NULL, 400 /* discard */ true); 401 }; 402 403 status_t err = OK; 404 msg->findInt32("err", &err); 405 finishSeek(err); 406 407 if (err == OK) { 408 int64_t timeUs; 409 CHECK(msg->findInt64("time", &timeUs)); 410 mHandler->continueSeekAfterPause(timeUs); 411 } 412 break; 413 } 414 415 case MyHandler::kWhatAccessUnit: 416 { 417 size_t trackIndex; 418 CHECK(msg->findSize("trackIndex", &trackIndex)); 419 420 if (mTSParser == NULL) { 421 CHECK_LT(trackIndex, mTracks.size()); 422 } else { 423 CHECK_EQ(trackIndex, 0u); 424 } 425 426 sp<ABuffer> accessUnit; 427 CHECK(msg->findBuffer("accessUnit", &accessUnit)); 428 429 int32_t damaged; 430 if (accessUnit->meta()->findInt32("damaged", &damaged) 431 && damaged) { 432 ALOGI("dropping damaged access unit."); 433 break; 434 } 435 436 if (mTSParser != NULL) { 437 size_t offset = 0; 438 status_t err = OK; 439 while (offset + 188 <= accessUnit->size()) { 440 err = mTSParser->feedTSPacket( 441 accessUnit->data() + offset, 188); 442 if (err != OK) { 443 break; 444 } 445 446 offset += 188; 447 } 448 449 if (offset < accessUnit->size()) { 450 err = ERROR_MALFORMED; 451 } 452 453 if (err != OK) { 454 sp<AnotherPacketSource> source = getSource(false /* audio */); 455 if (source != NULL) { 456 source->signalEOS(err); 457 } 458 459 source = getSource(true /* audio */); 460 if (source != NULL) { 461 source->signalEOS(err); 462 } 463 } 464 break; 465 } 466 467 TrackInfo *info = &mTracks.editItemAt(trackIndex); 468 469 sp<AnotherPacketSource> source = info->mSource; 470 if (source != NULL) { 471 uint32_t rtpTime; 472 CHECK(accessUnit->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); 473 474 if (!info->mNPTMappingValid) { 475 // This is a live stream, we didn't receive any normal 476 // playtime mapping. We won't map to npt time. 477 source->queueAccessUnit(accessUnit); 478 break; 479 } 480 481 int64_t nptUs = 482 ((double)rtpTime - (double)info->mRTPTime) 483 / info->mTimeScale 484 * 1000000ll 485 + info->mNormalPlaytimeUs; 486 487 accessUnit->meta()->setInt64("timeUs", nptUs); 488 489 source->queueAccessUnit(accessUnit); 490 } 491 break; 492 } 493 494 case MyHandler::kWhatEOS: 495 { 496 int32_t finalResult; 497 CHECK(msg->findInt32("finalResult", &finalResult)); 498 CHECK_NE(finalResult, (status_t)OK); 499 500 if (mTSParser != NULL) { 501 sp<AnotherPacketSource> source = getSource(false /* audio */); 502 if (source != NULL) { 503 source->signalEOS(finalResult); 504 } 505 506 source = getSource(true /* audio */); 507 if (source != NULL) { 508 source->signalEOS(finalResult); 509 } 510 511 return; 512 } 513 514 size_t trackIndex; 515 CHECK(msg->findSize("trackIndex", &trackIndex)); 516 CHECK_LT(trackIndex, mTracks.size()); 517 518 TrackInfo *info = &mTracks.editItemAt(trackIndex); 519 sp<AnotherPacketSource> source = info->mSource; 520 if (source != NULL) { 521 source->signalEOS(finalResult); 522 } 523 524 break; 525 } 526 527 case MyHandler::kWhatSeekDiscontinuity: 528 { 529 size_t trackIndex; 530 CHECK(msg->findSize("trackIndex", &trackIndex)); 531 CHECK_LT(trackIndex, mTracks.size()); 532 533 TrackInfo *info = &mTracks.editItemAt(trackIndex); 534 sp<AnotherPacketSource> source = info->mSource; 535 if (source != NULL) { 536 source->queueDiscontinuity( 537 ATSParser::DISCONTINUITY_TIME, 538 NULL, 539 true /* discard */); 540 } 541 542 break; 543 } 544 545 case MyHandler::kWhatNormalPlayTimeMapping: 546 { 547 size_t trackIndex; 548 CHECK(msg->findSize("trackIndex", &trackIndex)); 549 CHECK_LT(trackIndex, mTracks.size()); 550 551 uint32_t rtpTime; 552 CHECK(msg->findInt32("rtpTime", (int32_t *)&rtpTime)); 553 554 int64_t nptUs; 555 CHECK(msg->findInt64("nptUs", &nptUs)); 556 557 TrackInfo *info = &mTracks.editItemAt(trackIndex); 558 info->mRTPTime = rtpTime; 559 info->mNormalPlaytimeUs = nptUs; 560 info->mNPTMappingValid = true; 561 break; 562 } 563 564 case SDPLoader::kWhatSDPLoaded: 565 { 566 onSDPLoaded(msg); 567 break; 568 } 569 570 default: 571 TRESPASS(); 572 } 573} 574 575void NuPlayer::RTSPSource::onConnected() { 576 CHECK(mAudioTrack == NULL); 577 CHECK(mVideoTrack == NULL); 578 579 size_t numTracks = mHandler->countTracks(); 580 for (size_t i = 0; i < numTracks; ++i) { 581 int32_t timeScale; 582 sp<MetaData> format = mHandler->getTrackFormat(i, &timeScale); 583 584 const char *mime; 585 CHECK(format->findCString(kKeyMIMEType, &mime)); 586 587 if (!strcasecmp(mime, MEDIA_MIMETYPE_CONTAINER_MPEG2TS)) { 588 // Very special case for MPEG2 Transport Streams. 589 CHECK_EQ(numTracks, 1u); 590 591 mTSParser = new ATSParser; 592 return; 593 } 594 595 bool isAudio = !strncasecmp(mime, "audio/", 6); 596 bool isVideo = !strncasecmp(mime, "video/", 6); 597 598 TrackInfo info; 599 info.mTimeScale = timeScale; 600 info.mRTPTime = 0; 601 info.mNormalPlaytimeUs = 0ll; 602 info.mNPTMappingValid = false; 603 604 if ((isAudio && mAudioTrack == NULL) 605 || (isVideo && mVideoTrack == NULL)) { 606 sp<AnotherPacketSource> source = new AnotherPacketSource(format); 607 608 if (isAudio) { 609 mAudioTrack = source; 610 } else { 611 mVideoTrack = source; 612 } 613 614 info.mSource = source; 615 } 616 617 mTracks.push(info); 618 } 619 620 mState = CONNECTED; 621} 622 623void NuPlayer::RTSPSource::onSDPLoaded(const sp<AMessage> &msg) { 624 status_t err; 625 CHECK(msg->findInt32("result", &err)); 626 627 mSDPLoader.clear(); 628 629 if (mDisconnectReplyID != 0) { 630 err = UNKNOWN_ERROR; 631 } 632 633 if (err == OK) { 634 sp<ASessionDescription> desc; 635 sp<RefBase> obj; 636 CHECK(msg->findObject("description", &obj)); 637 desc = static_cast<ASessionDescription *>(obj.get()); 638 639 AString rtspUri; 640 if (!desc->findAttribute(0, "a=control", &rtspUri)) { 641 ALOGE("Unable to find url in SDP"); 642 err = UNKNOWN_ERROR; 643 } else { 644 sp<AMessage> notify = new AMessage(kWhatNotify, this); 645 646 mHandler = new MyHandler(rtspUri.c_str(), notify, mUIDValid, mUID); 647 mLooper->registerHandler(mHandler); 648 649 mHandler->loadSDP(desc); 650 } 651 } 652 653 if (err != OK) { 654 if (mState == CONNECTING) { 655 // We're still in the preparation phase, signal that it 656 // failed. 657 notifyPrepared(err); 658 } 659 660 mState = DISCONNECTED; 661 setError(err); 662 663 if (mDisconnectReplyID != 0) { 664 finishDisconnectIfPossible(); 665 } 666 } 667} 668 669void NuPlayer::RTSPSource::onDisconnected(const sp<AMessage> &msg) { 670 if (mState == DISCONNECTED) { 671 return; 672 } 673 674 status_t err; 675 CHECK(msg->findInt32("result", &err)); 676 CHECK_NE(err, (status_t)OK); 677 678 mLooper->unregisterHandler(mHandler->id()); 679 mHandler.clear(); 680 681 if (mState == CONNECTING) { 682 // We're still in the preparation phase, signal that it 683 // failed. 684 notifyPrepared(err); 685 } 686 687 mState = DISCONNECTED; 688 setError(err); 689 690 if (mDisconnectReplyID != 0) { 691 finishDisconnectIfPossible(); 692 } 693} 694 695void NuPlayer::RTSPSource::finishDisconnectIfPossible() { 696 if (mState != DISCONNECTED) { 697 if (mHandler != NULL) { 698 mHandler->disconnect(); 699 } else if (mSDPLoader != NULL) { 700 mSDPLoader->cancel(); 701 } 702 return; 703 } 704 705 (new AMessage)->postReply(mDisconnectReplyID); 706 mDisconnectReplyID = 0; 707} 708 709void NuPlayer::RTSPSource::setError(status_t err) { 710 Mutex::Autolock _l(mBufferingLock); 711 mFinalResult = err; 712} 713 714void NuPlayer::RTSPSource::startBufferingIfNecessary() { 715 Mutex::Autolock _l(mBufferingLock); 716 717 if (!mBuffering) { 718 mBuffering = true; 719 720 sp<AMessage> notify = dupNotify(); 721 notify->setInt32("what", kWhatBufferingStart); 722 notify->post(); 723 } 724} 725 726bool NuPlayer::RTSPSource::stopBufferingIfNecessary() { 727 Mutex::Autolock _l(mBufferingLock); 728 729 if (mBuffering) { 730 if (!haveSufficientDataOnAllTracks()) { 731 return false; 732 } 733 734 mBuffering = false; 735 736 sp<AMessage> notify = dupNotify(); 737 notify->setInt32("what", kWhatBufferingEnd); 738 notify->post(); 739 } 740 741 return true; 742} 743 744void NuPlayer::RTSPSource::finishSeek(status_t err) { 745 CHECK(mSeekReplyID != NULL); 746 sp<AMessage> seekReply = new AMessage; 747 seekReply->setInt32("err", err); 748 seekReply->postReply(mSeekReplyID); 749 mSeekReplyID = NULL; 750} 751 752} // namespace android 753