1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22 23#ifndef LOG_TAG 24#define LOG_TAG "MyHandler" 25#endif 26 27#include <utils/Log.h> 28#include <cutils/properties.h> // for property_get 29 30#include "APacketSource.h" 31#include "ARTPConnection.h" 32#include "ARTSPConnection.h" 33#include "ASessionDescription.h" 34 35#include <ctype.h> 36 37#include <media/stagefright/foundation/ABuffer.h> 38#include <media/stagefright/foundation/ADebug.h> 39#include <media/stagefright/foundation/ALooper.h> 40#include <media/stagefright/foundation/AMessage.h> 41#include <media/stagefright/MediaErrors.h> 42#include <media/stagefright/Utils.h> 43 44#include <arpa/inet.h> 45#include <sys/socket.h> 46#include <netdb.h> 47 48#include "HTTPBase.h" 49 50#if LOG_NDEBUG 51#define UNUSED_UNLESS_VERBOSE(x) (void)(x) 52#else 53#define UNUSED_UNLESS_VERBOSE(x) 54#endif 55 56// If no access units are received within 5 secs, assume that the rtp 57// stream has ended and signal end of stream. 58static int64_t kAccessUnitTimeoutUs = 10000000ll; 59 60// If no access units arrive for the first 10 secs after starting the 61// stream, assume none ever will and signal EOS or switch transports. 62static int64_t kStartupTimeoutUs = 10000000ll; 63 64static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 65 66static int64_t kPauseDelayUs = 3000000ll; 67 68// The allowed maximum number of stale access units at the beginning of 69// a new sequence. 70static int32_t kMaxAllowedStaleAccessUnits = 20; 71 72namespace android { 73 74static bool GetAttribute(const char *s, const char *key, AString *value) { 75 value->clear(); 76 77 size_t keyLen = strlen(key); 78 79 for (;;) { 80 while (isspace(*s)) { 81 ++s; 82 } 83 84 const char *colonPos = strchr(s, ';'); 85 86 size_t len = 87 (colonPos == NULL) ? strlen(s) : colonPos - s; 88 89 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 90 value->setTo(&s[keyLen + 1], len - keyLen - 1); 91 return true; 92 } 93 94 if (colonPos == NULL) { 95 return false; 96 } 97 98 s = colonPos + 1; 99 } 100} 101 102struct MyHandler : public AHandler { 103 enum { 104 kWhatConnected = 'conn', 105 kWhatDisconnected = 'disc', 106 kWhatSeekPaused = 'spau', 107 kWhatSeekDone = 'sdon', 108 109 kWhatAccessUnit = 'accU', 110 kWhatEOS = 'eos!', 111 kWhatSeekDiscontinuity = 'seeD', 112 kWhatNormalPlayTimeMapping = 'nptM', 113 }; 114 115 MyHandler( 116 const char *url, 117 const sp<AMessage> ¬ify, 118 bool uidValid = false, uid_t uid = 0) 119 : mNotify(notify), 120 mUIDValid(uidValid), 121 mUID(uid), 122 mNetLooper(new ALooper), 123 mConn(new ARTSPConnection(mUIDValid, mUID)), 124 mRTPConn(new ARTPConnection), 125 mOriginalSessionURL(url), 126 mSessionURL(url), 127 mSetupTracksSuccessful(false), 128 mSeekPending(false), 129 mFirstAccessUnit(true), 130 mAllTracksHaveTime(false), 131 mNTPAnchorUs(-1), 132 mMediaAnchorUs(-1), 133 mLastMediaTimeUs(0), 134 mNumAccessUnitsReceived(0), 135 mCheckPending(false), 136 mCheckGeneration(0), 137 mCheckTimeoutGeneration(0), 138 mTryTCPInterleaving(false), 139 mTryFakeRTCP(false), 140 mReceivedFirstRTCPPacket(false), 141 mReceivedFirstRTPPacket(false), 142 mSeekable(true), 143 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 144 mKeepAliveGeneration(0), 145 mPausing(false), 146 mPauseGeneration(0), 147 mPlayResponseParsed(false) { 148 mNetLooper->setName("rtsp net"); 149 mNetLooper->start(false /* runOnCallingThread */, 150 false /* canCallJava */, 151 PRIORITY_HIGHEST); 152 153 // Strip any authentication info from the session url, we don't 154 // want to transmit user/pass in cleartext. 155 AString host, path, user, pass; 156 unsigned port; 157 CHECK(ARTSPConnection::ParseURL( 158 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 159 160 if (user.size() > 0) { 161 mSessionURL.clear(); 162 mSessionURL.append("rtsp://"); 163 mSessionURL.append(host); 164 mSessionURL.append(":"); 165 mSessionURL.append(AStringPrintf("%u", port)); 166 mSessionURL.append(path); 167 168 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 169 } 170 171 mSessionHost = host; 172 } 173 174 void connect() { 175 looper()->registerHandler(mConn); 176 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 177 178 sp<AMessage> notify = new AMessage('biny', this); 179 mConn->observeBinaryData(notify); 180 181 sp<AMessage> reply = new AMessage('conn', this); 182 mConn->connect(mOriginalSessionURL.c_str(), reply); 183 } 184 185 void loadSDP(const sp<ASessionDescription>& desc) { 186 looper()->registerHandler(mConn); 187 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 188 189 sp<AMessage> notify = new AMessage('biny', this); 190 mConn->observeBinaryData(notify); 191 192 sp<AMessage> reply = new AMessage('sdpl', this); 193 reply->setObject("description", desc); 194 mConn->connect(mOriginalSessionURL.c_str(), reply); 195 } 196 197 AString getControlURL() { 198 AString sessionLevelControlURL; 199 if (mSessionDesc->findAttribute( 200 0, 201 "a=control", 202 &sessionLevelControlURL)) { 203 if (sessionLevelControlURL.compare("*") == 0) { 204 return mBaseURL; 205 } else { 206 AString controlURL; 207 CHECK(MakeURL( 208 mBaseURL.c_str(), 209 sessionLevelControlURL.c_str(), 210 &controlURL)); 211 return controlURL; 212 } 213 } else { 214 return mSessionURL; 215 } 216 } 217 218 void disconnect() { 219 (new AMessage('abor', this))->post(); 220 } 221 222 void seek(int64_t timeUs) { 223 sp<AMessage> msg = new AMessage('seek', this); 224 msg->setInt64("time", timeUs); 225 mPauseGeneration++; 226 msg->post(); 227 } 228 229 void continueSeekAfterPause(int64_t timeUs) { 230 sp<AMessage> msg = new AMessage('see1', this); 231 msg->setInt64("time", timeUs); 232 msg->post(); 233 } 234 235 bool isSeekable() const { 236 return mSeekable; 237 } 238 239 void pause() { 240 sp<AMessage> msg = new AMessage('paus', this); 241 mPauseGeneration++; 242 msg->setInt32("pausecheck", mPauseGeneration); 243 msg->post(); 244 } 245 246 void resume() { 247 sp<AMessage> msg = new AMessage('resu', this); 248 mPauseGeneration++; 249 msg->post(); 250 } 251 252 static void addRR(const sp<ABuffer> &buf) { 253 uint8_t *ptr = buf->data() + buf->size(); 254 ptr[0] = 0x80 | 0; 255 ptr[1] = 201; // RR 256 ptr[2] = 0; 257 ptr[3] = 1; 258 ptr[4] = 0xde; // SSRC 259 ptr[5] = 0xad; 260 ptr[6] = 0xbe; 261 ptr[7] = 0xef; 262 263 buf->setRange(0, buf->size() + 8); 264 } 265 266 static void addSDES(int s, const sp<ABuffer> &buffer) { 267 struct sockaddr_in addr; 268 socklen_t addrSize = sizeof(addr); 269 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 270 inet_aton("0.0.0.0", &(addr.sin_addr)); 271 } 272 273 uint8_t *data = buffer->data() + buffer->size(); 274 data[0] = 0x80 | 1; 275 data[1] = 202; // SDES 276 data[4] = 0xde; // SSRC 277 data[5] = 0xad; 278 data[6] = 0xbe; 279 data[7] = 0xef; 280 281 size_t offset = 8; 282 283 data[offset++] = 1; // CNAME 284 285 AString cname = "stagefright@"; 286 cname.append(inet_ntoa(addr.sin_addr)); 287 data[offset++] = cname.size(); 288 289 memcpy(&data[offset], cname.c_str(), cname.size()); 290 offset += cname.size(); 291 292 data[offset++] = 6; // TOOL 293 294 AString tool = MakeUserAgent(); 295 296 data[offset++] = tool.size(); 297 298 memcpy(&data[offset], tool.c_str(), tool.size()); 299 offset += tool.size(); 300 301 data[offset++] = 0; 302 303 if ((offset % 4) > 0) { 304 size_t count = 4 - (offset % 4); 305 switch (count) { 306 case 3: 307 data[offset++] = 0; 308 case 2: 309 data[offset++] = 0; 310 case 1: 311 data[offset++] = 0; 312 } 313 } 314 315 size_t numWords = (offset / 4) - 1; 316 data[2] = numWords >> 8; 317 data[3] = numWords & 0xff; 318 319 buffer->setRange(buffer->offset(), buffer->size() + offset); 320 } 321 322 // In case we're behind NAT, fire off two UDP packets to the remote 323 // rtp/rtcp ports to poke a hole into the firewall for future incoming 324 // packets. We're going to send an RR/SDES RTCP packet to both of them. 325 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 326 struct sockaddr_in addr; 327 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 328 addr.sin_family = AF_INET; 329 330 AString source; 331 AString server_port; 332 if (!GetAttribute(transport.c_str(), 333 "source", 334 &source)) { 335 ALOGW("Missing 'source' field in Transport response. Using " 336 "RTSP endpoint address."); 337 338 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 339 if (ent == NULL) { 340 ALOGE("Failed to look up address of session host '%s'", 341 mSessionHost.c_str()); 342 343 return false; 344 } 345 346 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 347 } else { 348 addr.sin_addr.s_addr = inet_addr(source.c_str()); 349 } 350 351 if (!GetAttribute(transport.c_str(), 352 "server_port", 353 &server_port)) { 354 ALOGI("Missing 'server_port' field in Transport response."); 355 return false; 356 } 357 358 int rtpPort, rtcpPort; 359 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 360 || rtpPort <= 0 || rtpPort > 65535 361 || rtcpPort <=0 || rtcpPort > 65535 362 || rtcpPort != rtpPort + 1) { 363 ALOGE("Server picked invalid RTP/RTCP port pair %s," 364 " RTP port must be even, RTCP port must be one higher.", 365 server_port.c_str()); 366 367 return false; 368 } 369 370 if (rtpPort & 1) { 371 ALOGW("Server picked an odd RTP port, it should've picked an " 372 "even one, we'll let it pass for now, but this may break " 373 "in the future."); 374 } 375 376 if (addr.sin_addr.s_addr == INADDR_NONE) { 377 return true; 378 } 379 380 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 381 // No firewalls to traverse on the loopback interface. 382 return true; 383 } 384 385 // Make up an RR/SDES RTCP packet. 386 sp<ABuffer> buf = new ABuffer(65536); 387 buf->setRange(0, 0); 388 addRR(buf); 389 addSDES(rtpSocket, buf); 390 391 addr.sin_port = htons(rtpPort); 392 393 ssize_t n = sendto( 394 rtpSocket, buf->data(), buf->size(), 0, 395 (const sockaddr *)&addr, sizeof(addr)); 396 397 if (n < (ssize_t)buf->size()) { 398 ALOGE("failed to poke a hole for RTP packets"); 399 return false; 400 } 401 402 addr.sin_port = htons(rtcpPort); 403 404 n = sendto( 405 rtcpSocket, buf->data(), buf->size(), 0, 406 (const sockaddr *)&addr, sizeof(addr)); 407 408 if (n < (ssize_t)buf->size()) { 409 ALOGE("failed to poke a hole for RTCP packets"); 410 return false; 411 } 412 413 ALOGV("successfully poked holes."); 414 415 return true; 416 } 417 418 static bool isLiveStream(const sp<ASessionDescription> &desc) { 419 AString attrLiveStream; 420 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 421 ssize_t semicolonPos = attrLiveStream.find(";", 2); 422 423 const char* liveStreamValue; 424 if (semicolonPos < 0) { 425 liveStreamValue = attrLiveStream.c_str(); 426 } else { 427 AString valString; 428 valString.setTo(attrLiveStream, 429 semicolonPos + 1, 430 attrLiveStream.size() - semicolonPos - 1); 431 liveStreamValue = valString.c_str(); 432 } 433 434 uint32_t value = strtoul(liveStreamValue, NULL, 10); 435 if (value == 1) { 436 ALOGV("found live stream"); 437 return true; 438 } 439 } else { 440 // It is a live stream if no duration is returned 441 int64_t durationUs; 442 if (!desc->getDurationUs(&durationUs)) { 443 ALOGV("No duration found, assume live stream"); 444 return true; 445 } 446 } 447 448 return false; 449 } 450 451 virtual void onMessageReceived(const sp<AMessage> &msg) { 452 switch (msg->what()) { 453 case 'conn': 454 { 455 int32_t result; 456 CHECK(msg->findInt32("result", &result)); 457 458 ALOGI("connection request completed with result %d (%s)", 459 result, strerror(-result)); 460 461 if (result == OK) { 462 AString request; 463 request = "DESCRIBE "; 464 request.append(mSessionURL); 465 request.append(" RTSP/1.0\r\n"); 466 request.append("Accept: application/sdp\r\n"); 467 request.append("\r\n"); 468 469 sp<AMessage> reply = new AMessage('desc', this); 470 mConn->sendRequest(request.c_str(), reply); 471 } else { 472 (new AMessage('disc', this))->post(); 473 } 474 break; 475 } 476 477 case 'disc': 478 { 479 ++mKeepAliveGeneration; 480 481 int32_t reconnect; 482 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 483 sp<AMessage> reply = new AMessage('conn', this); 484 mConn->connect(mOriginalSessionURL.c_str(), reply); 485 } else { 486 (new AMessage('quit', this))->post(); 487 } 488 break; 489 } 490 491 case 'desc': 492 { 493 int32_t result; 494 CHECK(msg->findInt32("result", &result)); 495 496 ALOGI("DESCRIBE completed with result %d (%s)", 497 result, strerror(-result)); 498 499 if (result == OK) { 500 sp<RefBase> obj; 501 CHECK(msg->findObject("response", &obj)); 502 sp<ARTSPResponse> response = 503 static_cast<ARTSPResponse *>(obj.get()); 504 505 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 506 ssize_t i = response->mHeaders.indexOfKey("location"); 507 CHECK_GE(i, 0); 508 509 mOriginalSessionURL = response->mHeaders.valueAt(i); 510 mSessionURL = mOriginalSessionURL; 511 512 // Strip any authentication info from the session url, we don't 513 // want to transmit user/pass in cleartext. 514 AString host, path, user, pass; 515 unsigned port; 516 if (ARTSPConnection::ParseURL( 517 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 518 && user.size() > 0) { 519 mSessionURL.clear(); 520 mSessionURL.append("rtsp://"); 521 mSessionURL.append(host); 522 mSessionURL.append(":"); 523 mSessionURL.append(AStringPrintf("%u", port)); 524 mSessionURL.append(path); 525 526 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 527 } 528 529 sp<AMessage> reply = new AMessage('conn', this); 530 mConn->connect(mOriginalSessionURL.c_str(), reply); 531 break; 532 } 533 534 if (response->mStatusCode != 200) { 535 result = UNKNOWN_ERROR; 536 } else if (response->mContent == NULL) { 537 result = ERROR_MALFORMED; 538 ALOGE("The response has no content."); 539 } else { 540 mSessionDesc = new ASessionDescription; 541 542 mSessionDesc->setTo( 543 response->mContent->data(), 544 response->mContent->size()); 545 546 if (!mSessionDesc->isValid()) { 547 ALOGE("Failed to parse session description."); 548 result = ERROR_MALFORMED; 549 } else { 550 ssize_t i = response->mHeaders.indexOfKey("content-base"); 551 if (i >= 0) { 552 mBaseURL = response->mHeaders.valueAt(i); 553 } else { 554 i = response->mHeaders.indexOfKey("content-location"); 555 if (i >= 0) { 556 mBaseURL = response->mHeaders.valueAt(i); 557 } else { 558 mBaseURL = mSessionURL; 559 } 560 } 561 562 mSeekable = !isLiveStream(mSessionDesc); 563 564 if (!mBaseURL.startsWith("rtsp://")) { 565 // Some misbehaving servers specify a relative 566 // URL in one of the locations above, combine 567 // it with the absolute session URL to get 568 // something usable... 569 570 ALOGW("Server specified a non-absolute base URL" 571 ", combining it with the session URL to " 572 "get something usable..."); 573 574 AString tmp; 575 CHECK(MakeURL( 576 mSessionURL.c_str(), 577 mBaseURL.c_str(), 578 &tmp)); 579 580 mBaseURL = tmp; 581 } 582 583 mControlURL = getControlURL(); 584 585 if (mSessionDesc->countTracks() < 2) { 586 // There's no actual tracks in this session. 587 // The first "track" is merely session meta 588 // data. 589 590 ALOGW("Session doesn't contain any playable " 591 "tracks. Aborting."); 592 result = ERROR_UNSUPPORTED; 593 } else { 594 setupTrack(1); 595 } 596 } 597 } 598 } 599 600 if (result != OK) { 601 sp<AMessage> reply = new AMessage('disc', this); 602 mConn->disconnect(reply); 603 } 604 break; 605 } 606 607 case 'sdpl': 608 { 609 int32_t result; 610 CHECK(msg->findInt32("result", &result)); 611 612 ALOGI("SDP connection request completed with result %d (%s)", 613 result, strerror(-result)); 614 615 if (result == OK) { 616 sp<RefBase> obj; 617 CHECK(msg->findObject("description", &obj)); 618 mSessionDesc = 619 static_cast<ASessionDescription *>(obj.get()); 620 621 if (!mSessionDesc->isValid()) { 622 ALOGE("Failed to parse session description."); 623 result = ERROR_MALFORMED; 624 } else { 625 mBaseURL = mSessionURL; 626 627 mSeekable = !isLiveStream(mSessionDesc); 628 629 mControlURL = getControlURL(); 630 631 if (mSessionDesc->countTracks() < 2) { 632 // There's no actual tracks in this session. 633 // The first "track" is merely session meta 634 // data. 635 636 ALOGW("Session doesn't contain any playable " 637 "tracks. Aborting."); 638 result = ERROR_UNSUPPORTED; 639 } else { 640 setupTrack(1); 641 } 642 } 643 } 644 645 if (result != OK) { 646 sp<AMessage> reply = new AMessage('disc', this); 647 mConn->disconnect(reply); 648 } 649 break; 650 } 651 652 case 'setu': 653 { 654 size_t index; 655 CHECK(msg->findSize("index", &index)); 656 657 TrackInfo *track = NULL; 658 size_t trackIndex; 659 if (msg->findSize("track-index", &trackIndex)) { 660 track = &mTracks.editItemAt(trackIndex); 661 } 662 663 int32_t result; 664 CHECK(msg->findInt32("result", &result)); 665 666 ALOGI("SETUP(%zu) completed with result %d (%s)", 667 index, result, strerror(-result)); 668 669 if (result == OK) { 670 CHECK(track != NULL); 671 672 sp<RefBase> obj; 673 CHECK(msg->findObject("response", &obj)); 674 sp<ARTSPResponse> response = 675 static_cast<ARTSPResponse *>(obj.get()); 676 677 if (response->mStatusCode != 200) { 678 result = UNKNOWN_ERROR; 679 } else { 680 ssize_t i = response->mHeaders.indexOfKey("session"); 681 CHECK_GE(i, 0); 682 683 mSessionID = response->mHeaders.valueAt(i); 684 685 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 686 AString timeoutStr; 687 if (GetAttribute( 688 mSessionID.c_str(), "timeout", &timeoutStr)) { 689 char *end; 690 unsigned long timeoutSecs = 691 strtoul(timeoutStr.c_str(), &end, 10); 692 693 if (end == timeoutStr.c_str() || *end != '\0') { 694 ALOGW("server specified malformed timeout '%s'", 695 timeoutStr.c_str()); 696 697 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 698 } else if (timeoutSecs < 15) { 699 ALOGW("server specified too short a timeout " 700 "(%lu secs), using default.", 701 timeoutSecs); 702 703 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 704 } else { 705 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 706 707 ALOGI("server specified timeout of %lu secs.", 708 timeoutSecs); 709 } 710 } 711 712 i = mSessionID.find(";"); 713 if (i >= 0) { 714 // Remove options, i.e. ";timeout=90" 715 mSessionID.erase(i, mSessionID.size() - i); 716 } 717 718 sp<AMessage> notify = new AMessage('accu', this); 719 notify->setSize("track-index", trackIndex); 720 721 i = response->mHeaders.indexOfKey("transport"); 722 CHECK_GE(i, 0); 723 724 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 725 if (!track->mUsingInterleavedTCP) { 726 AString transport = response->mHeaders.valueAt(i); 727 728 // We are going to continue even if we were 729 // unable to poke a hole into the firewall... 730 pokeAHole( 731 track->mRTPSocket, 732 track->mRTCPSocket, 733 transport); 734 } 735 736 mRTPConn->addStream( 737 track->mRTPSocket, track->mRTCPSocket, 738 mSessionDesc, index, 739 notify, track->mUsingInterleavedTCP); 740 741 mSetupTracksSuccessful = true; 742 } else { 743 result = BAD_VALUE; 744 } 745 } 746 } 747 748 if (result != OK) { 749 if (track) { 750 if (!track->mUsingInterleavedTCP) { 751 // Clear the tag 752 if (mUIDValid) { 753 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 754 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 755 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 756 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 757 } 758 759 close(track->mRTPSocket); 760 close(track->mRTCPSocket); 761 } 762 763 mTracks.removeItemsAt(trackIndex); 764 } 765 } 766 767 ++index; 768 if (result == OK && index < mSessionDesc->countTracks()) { 769 setupTrack(index); 770 } else if (mSetupTracksSuccessful) { 771 ++mKeepAliveGeneration; 772 postKeepAlive(); 773 774 AString request = "PLAY "; 775 request.append(mControlURL); 776 request.append(" RTSP/1.0\r\n"); 777 778 request.append("Session: "); 779 request.append(mSessionID); 780 request.append("\r\n"); 781 782 request.append("\r\n"); 783 784 sp<AMessage> reply = new AMessage('play', this); 785 mConn->sendRequest(request.c_str(), reply); 786 } else { 787 sp<AMessage> reply = new AMessage('disc', this); 788 mConn->disconnect(reply); 789 } 790 break; 791 } 792 793 case 'play': 794 { 795 int32_t result; 796 CHECK(msg->findInt32("result", &result)); 797 798 ALOGI("PLAY completed with result %d (%s)", 799 result, strerror(-result)); 800 801 if (result == OK) { 802 sp<RefBase> obj; 803 CHECK(msg->findObject("response", &obj)); 804 sp<ARTSPResponse> response = 805 static_cast<ARTSPResponse *>(obj.get()); 806 807 if (response->mStatusCode != 200) { 808 result = UNKNOWN_ERROR; 809 } else { 810 parsePlayResponse(response); 811 postTimeout(); 812 } 813 } 814 815 if (result != OK) { 816 sp<AMessage> reply = new AMessage('disc', this); 817 mConn->disconnect(reply); 818 } 819 820 break; 821 } 822 823 case 'aliv': 824 { 825 int32_t generation; 826 CHECK(msg->findInt32("generation", &generation)); 827 828 if (generation != mKeepAliveGeneration) { 829 // obsolete event. 830 break; 831 } 832 833 AString request; 834 request.append("OPTIONS "); 835 request.append(mSessionURL); 836 request.append(" RTSP/1.0\r\n"); 837 request.append("Session: "); 838 request.append(mSessionID); 839 request.append("\r\n"); 840 request.append("\r\n"); 841 842 sp<AMessage> reply = new AMessage('opts', this); 843 reply->setInt32("generation", mKeepAliveGeneration); 844 mConn->sendRequest(request.c_str(), reply); 845 break; 846 } 847 848 case 'opts': 849 { 850 int32_t result; 851 CHECK(msg->findInt32("result", &result)); 852 853 ALOGI("OPTIONS completed with result %d (%s)", 854 result, strerror(-result)); 855 856 int32_t generation; 857 CHECK(msg->findInt32("generation", &generation)); 858 859 if (generation != mKeepAliveGeneration) { 860 // obsolete event. 861 break; 862 } 863 864 postKeepAlive(); 865 break; 866 } 867 868 case 'abor': 869 { 870 for (size_t i = 0; i < mTracks.size(); ++i) { 871 TrackInfo *info = &mTracks.editItemAt(i); 872 873 if (!mFirstAccessUnit) { 874 postQueueEOS(i, ERROR_END_OF_STREAM); 875 } 876 877 if (!info->mUsingInterleavedTCP) { 878 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 879 880 // Clear the tag 881 if (mUIDValid) { 882 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 883 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 884 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 885 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 886 } 887 888 close(info->mRTPSocket); 889 close(info->mRTCPSocket); 890 } 891 } 892 mTracks.clear(); 893 mSetupTracksSuccessful = false; 894 mSeekPending = false; 895 mFirstAccessUnit = true; 896 mAllTracksHaveTime = false; 897 mNTPAnchorUs = -1; 898 mMediaAnchorUs = -1; 899 mNumAccessUnitsReceived = 0; 900 mReceivedFirstRTCPPacket = false; 901 mReceivedFirstRTPPacket = false; 902 mPausing = false; 903 mSeekable = true; 904 905 sp<AMessage> reply = new AMessage('tear', this); 906 907 int32_t reconnect; 908 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 909 reply->setInt32("reconnect", true); 910 } 911 912 AString request; 913 request = "TEARDOWN "; 914 915 // XXX should use aggregate url from SDP here... 916 request.append(mSessionURL); 917 request.append(" RTSP/1.0\r\n"); 918 919 request.append("Session: "); 920 request.append(mSessionID); 921 request.append("\r\n"); 922 923 request.append("\r\n"); 924 925 mConn->sendRequest(request.c_str(), reply); 926 break; 927 } 928 929 case 'tear': 930 { 931 int32_t result; 932 CHECK(msg->findInt32("result", &result)); 933 934 ALOGI("TEARDOWN completed with result %d (%s)", 935 result, strerror(-result)); 936 937 sp<AMessage> reply = new AMessage('disc', this); 938 939 int32_t reconnect; 940 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 941 reply->setInt32("reconnect", true); 942 } 943 944 mConn->disconnect(reply); 945 break; 946 } 947 948 case 'quit': 949 { 950 sp<AMessage> msg = mNotify->dup(); 951 msg->setInt32("what", kWhatDisconnected); 952 msg->setInt32("result", UNKNOWN_ERROR); 953 msg->post(); 954 break; 955 } 956 957 case 'chek': 958 { 959 int32_t generation; 960 CHECK(msg->findInt32("generation", &generation)); 961 if (generation != mCheckGeneration) { 962 // This is an outdated message. Ignore. 963 break; 964 } 965 966 if (mNumAccessUnitsReceived == 0) { 967#if 1 968 ALOGI("stream ended? aborting."); 969 (new AMessage('abor', this))->post(); 970 break; 971#else 972 ALOGI("haven't seen an AU in a looong time."); 973#endif 974 } 975 976 mNumAccessUnitsReceived = 0; 977 msg->post(kAccessUnitTimeoutUs); 978 break; 979 } 980 981 case 'accu': 982 { 983 if (mSeekPending) { 984 ALOGV("Stale access unit."); 985 break; 986 } 987 988 int32_t timeUpdate; 989 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 990 size_t trackIndex; 991 CHECK(msg->findSize("track-index", &trackIndex)); 992 993 uint32_t rtpTime; 994 uint64_t ntpTime; 995 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 996 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 997 998 onTimeUpdate(trackIndex, rtpTime, ntpTime); 999 break; 1000 } 1001 1002 int32_t first; 1003 if (msg->findInt32("first-rtcp", &first)) { 1004 mReceivedFirstRTCPPacket = true; 1005 break; 1006 } 1007 1008 if (msg->findInt32("first-rtp", &first)) { 1009 mReceivedFirstRTPPacket = true; 1010 break; 1011 } 1012 1013 ++mNumAccessUnitsReceived; 1014 postAccessUnitTimeoutCheck(); 1015 1016 size_t trackIndex; 1017 CHECK(msg->findSize("track-index", &trackIndex)); 1018 1019 if (trackIndex >= mTracks.size()) { 1020 ALOGV("late packets ignored."); 1021 break; 1022 } 1023 1024 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1025 1026 int32_t eos; 1027 if (msg->findInt32("eos", &eos)) { 1028 ALOGI("received BYE on track index %zu", trackIndex); 1029 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1030 ALOGI("No time established => fake existing data"); 1031 1032 track->mEOSReceived = true; 1033 mTryFakeRTCP = true; 1034 mReceivedFirstRTCPPacket = true; 1035 fakeTimestamps(); 1036 } else { 1037 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1038 } 1039 return; 1040 } 1041 1042 if (mSeekPending) { 1043 ALOGV("we're seeking, dropping stale packet."); 1044 break; 1045 } 1046 1047 sp<ABuffer> accessUnit; 1048 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1049 onAccessUnitComplete(trackIndex, accessUnit); 1050 break; 1051 } 1052 1053 case 'paus': 1054 { 1055 int32_t generation; 1056 CHECK(msg->findInt32("pausecheck", &generation)); 1057 if (generation != mPauseGeneration) { 1058 ALOGV("Ignoring outdated pause message."); 1059 break; 1060 } 1061 1062 if (!mSeekable) { 1063 ALOGW("This is a live stream, ignoring pause request."); 1064 break; 1065 } 1066 1067 if (mPausing) { 1068 ALOGV("This stream is already paused."); 1069 break; 1070 } 1071 1072 mCheckPending = true; 1073 ++mCheckGeneration; 1074 mPausing = true; 1075 1076 AString request = "PAUSE "; 1077 request.append(mControlURL); 1078 request.append(" RTSP/1.0\r\n"); 1079 1080 request.append("Session: "); 1081 request.append(mSessionID); 1082 request.append("\r\n"); 1083 1084 request.append("\r\n"); 1085 1086 sp<AMessage> reply = new AMessage('pau2', this); 1087 mConn->sendRequest(request.c_str(), reply); 1088 break; 1089 } 1090 1091 case 'pau2': 1092 { 1093 int32_t result; 1094 CHECK(msg->findInt32("result", &result)); 1095 mCheckTimeoutGeneration++; 1096 1097 ALOGI("PAUSE completed with result %d (%s)", 1098 result, strerror(-result)); 1099 break; 1100 } 1101 1102 case 'resu': 1103 { 1104 if (mPausing && mSeekPending) { 1105 // If seeking, Play will be sent from see1 instead 1106 break; 1107 } 1108 1109 if (!mPausing) { 1110 // Dont send PLAY if we have not paused 1111 break; 1112 } 1113 AString request = "PLAY "; 1114 request.append(mControlURL); 1115 request.append(" RTSP/1.0\r\n"); 1116 1117 request.append("Session: "); 1118 request.append(mSessionID); 1119 request.append("\r\n"); 1120 1121 request.append("\r\n"); 1122 1123 sp<AMessage> reply = new AMessage('res2', this); 1124 mConn->sendRequest(request.c_str(), reply); 1125 break; 1126 } 1127 1128 case 'res2': 1129 { 1130 int32_t result; 1131 CHECK(msg->findInt32("result", &result)); 1132 1133 ALOGI("PLAY (for resume) completed with result %d (%s)", 1134 result, strerror(-result)); 1135 1136 mCheckPending = false; 1137 ++mCheckGeneration; 1138 postAccessUnitTimeoutCheck(); 1139 1140 if (result == OK) { 1141 sp<RefBase> obj; 1142 CHECK(msg->findObject("response", &obj)); 1143 sp<ARTSPResponse> response = 1144 static_cast<ARTSPResponse *>(obj.get()); 1145 1146 if (response->mStatusCode != 200) { 1147 result = UNKNOWN_ERROR; 1148 } else { 1149 parsePlayResponse(response); 1150 1151 // Post new timeout in order to make sure to use 1152 // fake timestamps if no new Sender Reports arrive 1153 postTimeout(); 1154 } 1155 } 1156 1157 if (result != OK) { 1158 ALOGE("resume failed, aborting."); 1159 (new AMessage('abor', this))->post(); 1160 } 1161 1162 mPausing = false; 1163 break; 1164 } 1165 1166 case 'seek': 1167 { 1168 if (!mSeekable) { 1169 ALOGW("This is a live stream, ignoring seek request."); 1170 1171 sp<AMessage> msg = mNotify->dup(); 1172 msg->setInt32("what", kWhatSeekDone); 1173 msg->post(); 1174 break; 1175 } 1176 1177 int64_t timeUs; 1178 CHECK(msg->findInt64("time", &timeUs)); 1179 1180 mSeekPending = true; 1181 1182 // Disable the access unit timeout until we resumed 1183 // playback again. 1184 mCheckPending = true; 1185 ++mCheckGeneration; 1186 1187 sp<AMessage> reply = new AMessage('see0', this); 1188 reply->setInt64("time", timeUs); 1189 1190 if (mPausing) { 1191 // PAUSE already sent 1192 ALOGI("Pause already sent"); 1193 reply->post(); 1194 break; 1195 } 1196 AString request = "PAUSE "; 1197 request.append(mControlURL); 1198 request.append(" RTSP/1.0\r\n"); 1199 1200 request.append("Session: "); 1201 request.append(mSessionID); 1202 request.append("\r\n"); 1203 1204 request.append("\r\n"); 1205 1206 mConn->sendRequest(request.c_str(), reply); 1207 break; 1208 } 1209 1210 case 'see0': 1211 { 1212 // Session is paused now. 1213 status_t err = OK; 1214 msg->findInt32("result", &err); 1215 1216 int64_t timeUs; 1217 CHECK(msg->findInt64("time", &timeUs)); 1218 1219 sp<AMessage> notify = mNotify->dup(); 1220 notify->setInt32("what", kWhatSeekPaused); 1221 notify->setInt32("err", err); 1222 notify->setInt64("time", timeUs); 1223 notify->post(); 1224 break; 1225 1226 } 1227 1228 case 'see1': 1229 { 1230 for (size_t i = 0; i < mTracks.size(); ++i) { 1231 TrackInfo *info = &mTracks.editItemAt(i); 1232 1233 postQueueSeekDiscontinuity(i); 1234 info->mEOSReceived = false; 1235 1236 info->mRTPAnchor = 0; 1237 info->mNTPAnchorUs = -1; 1238 } 1239 1240 mAllTracksHaveTime = false; 1241 mNTPAnchorUs = -1; 1242 1243 // Start new timeoutgeneration to avoid getting timeout 1244 // before PLAY response arrive 1245 postTimeout(); 1246 1247 int64_t timeUs; 1248 CHECK(msg->findInt64("time", &timeUs)); 1249 1250 AString request = "PLAY "; 1251 request.append(mControlURL); 1252 request.append(" RTSP/1.0\r\n"); 1253 1254 request.append("Session: "); 1255 request.append(mSessionID); 1256 request.append("\r\n"); 1257 1258 request.append( 1259 AStringPrintf( 1260 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1261 1262 request.append("\r\n"); 1263 1264 sp<AMessage> reply = new AMessage('see2', this); 1265 mConn->sendRequest(request.c_str(), reply); 1266 break; 1267 } 1268 1269 case 'see2': 1270 { 1271 if (mTracks.size() == 0) { 1272 // We have already hit abor, break 1273 break; 1274 } 1275 1276 int32_t result; 1277 CHECK(msg->findInt32("result", &result)); 1278 1279 ALOGI("PLAY (for seek) completed with result %d (%s)", 1280 result, strerror(-result)); 1281 1282 mCheckPending = false; 1283 ++mCheckGeneration; 1284 postAccessUnitTimeoutCheck(); 1285 1286 if (result == OK) { 1287 sp<RefBase> obj; 1288 CHECK(msg->findObject("response", &obj)); 1289 sp<ARTSPResponse> response = 1290 static_cast<ARTSPResponse *>(obj.get()); 1291 1292 if (response->mStatusCode != 200) { 1293 result = UNKNOWN_ERROR; 1294 } else { 1295 parsePlayResponse(response); 1296 1297 // Post new timeout in order to make sure to use 1298 // fake timestamps if no new Sender Reports arrive 1299 postTimeout(); 1300 1301 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1302 CHECK_GE(i, 0); 1303 1304 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1305 1306 ALOGI("seek completed."); 1307 } 1308 } 1309 1310 if (result != OK) { 1311 ALOGE("seek failed, aborting."); 1312 (new AMessage('abor', this))->post(); 1313 } 1314 1315 mPausing = false; 1316 mSeekPending = false; 1317 1318 // Discard all stale access units. 1319 for (size_t i = 0; i < mTracks.size(); ++i) { 1320 TrackInfo *track = &mTracks.editItemAt(i); 1321 track->mPackets.clear(); 1322 } 1323 1324 sp<AMessage> msg = mNotify->dup(); 1325 msg->setInt32("what", kWhatSeekDone); 1326 msg->post(); 1327 break; 1328 } 1329 1330 case 'biny': 1331 { 1332 sp<ABuffer> buffer; 1333 CHECK(msg->findBuffer("buffer", &buffer)); 1334 1335 int32_t index; 1336 CHECK(buffer->meta()->findInt32("index", &index)); 1337 1338 mRTPConn->injectPacket(index, buffer); 1339 break; 1340 } 1341 1342 case 'tiou': 1343 { 1344 int32_t timeoutGenerationCheck; 1345 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1346 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1347 // This is an outdated message. Ignore. 1348 // This typically happens if a lot of seeks are 1349 // performed, since new timeout messages now are 1350 // posted at seek as well. 1351 break; 1352 } 1353 if (!mReceivedFirstRTCPPacket) { 1354 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1355 ALOGW("We received RTP packets but no RTCP packets, " 1356 "using fake timestamps."); 1357 1358 mTryFakeRTCP = true; 1359 1360 mReceivedFirstRTCPPacket = true; 1361 1362 fakeTimestamps(); 1363 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1364 ALOGW("Never received any data, switching transports."); 1365 1366 mTryTCPInterleaving = true; 1367 1368 sp<AMessage> msg = new AMessage('abor', this); 1369 msg->setInt32("reconnect", true); 1370 msg->post(); 1371 } else { 1372 ALOGW("Never received any data, disconnecting."); 1373 (new AMessage('abor', this))->post(); 1374 } 1375 } else { 1376 if (!mAllTracksHaveTime) { 1377 ALOGW("We received some RTCP packets, but time " 1378 "could not be established on all tracks, now " 1379 "using fake timestamps"); 1380 1381 fakeTimestamps(); 1382 } 1383 } 1384 break; 1385 } 1386 1387 default: 1388 TRESPASS(); 1389 break; 1390 } 1391 } 1392 1393 void postKeepAlive() { 1394 sp<AMessage> msg = new AMessage('aliv', this); 1395 msg->setInt32("generation", mKeepAliveGeneration); 1396 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1397 } 1398 1399 void cancelAccessUnitTimeoutCheck() { 1400 ALOGV("cancelAccessUnitTimeoutCheck"); 1401 ++mCheckGeneration; 1402 } 1403 1404 void postAccessUnitTimeoutCheck() { 1405 if (mCheckPending) { 1406 return; 1407 } 1408 1409 mCheckPending = true; 1410 sp<AMessage> check = new AMessage('chek', this); 1411 check->setInt32("generation", mCheckGeneration); 1412 check->post(kAccessUnitTimeoutUs); 1413 } 1414 1415 static void SplitString( 1416 const AString &s, const char *separator, List<AString> *items) { 1417 items->clear(); 1418 size_t start = 0; 1419 while (start < s.size()) { 1420 ssize_t offset = s.find(separator, start); 1421 1422 if (offset < 0) { 1423 items->push_back(AString(s, start, s.size() - start)); 1424 break; 1425 } 1426 1427 items->push_back(AString(s, start, offset - start)); 1428 start = offset + strlen(separator); 1429 } 1430 } 1431 1432 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1433 mPlayResponseParsed = true; 1434 if (mTracks.size() == 0) { 1435 ALOGV("parsePlayResponse: late packets ignored."); 1436 return; 1437 } 1438 1439 ssize_t i = response->mHeaders.indexOfKey("range"); 1440 if (i < 0) { 1441 // Server doesn't even tell use what range it is going to 1442 // play, therefore we won't support seeking. 1443 return; 1444 } 1445 1446 AString range = response->mHeaders.valueAt(i); 1447 ALOGV("Range: %s", range.c_str()); 1448 1449 AString val; 1450 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1451 1452 float npt1, npt2; 1453 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1454 // This is a live stream and therefore not seekable. 1455 1456 ALOGI("This is a live stream"); 1457 return; 1458 } 1459 1460 i = response->mHeaders.indexOfKey("rtp-info"); 1461 CHECK_GE(i, 0); 1462 1463 AString rtpInfo = response->mHeaders.valueAt(i); 1464 List<AString> streamInfos; 1465 SplitString(rtpInfo, ",", &streamInfos); 1466 1467 int n = 1; 1468 for (List<AString>::iterator it = streamInfos.begin(); 1469 it != streamInfos.end(); ++it) { 1470 (*it).trim(); 1471 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1472 1473 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1474 1475 size_t trackIndex = 0; 1476 while (trackIndex < mTracks.size() 1477 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1478 ++trackIndex; 1479 } 1480 CHECK_LT(trackIndex, mTracks.size()); 1481 1482 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1483 1484 char *end; 1485 unsigned long seq = strtoul(val.c_str(), &end, 10); 1486 1487 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1488 info->mFirstSeqNumInSegment = seq; 1489 info->mNewSegment = true; 1490 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1491 1492 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1493 1494 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1495 1496 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1497 1498 info->mNormalPlayTimeRTP = rtpTime; 1499 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1500 1501 if (!mFirstAccessUnit) { 1502 postNormalPlayTimeMapping( 1503 trackIndex, 1504 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1505 } 1506 1507 ++n; 1508 } 1509 } 1510 1511 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1512 CHECK_GE(index, 0u); 1513 CHECK_LT(index, mTracks.size()); 1514 1515 const TrackInfo &info = mTracks.itemAt(index); 1516 1517 *timeScale = info.mTimeScale; 1518 1519 return info.mPacketSource->getFormat(); 1520 } 1521 1522 size_t countTracks() const { 1523 return mTracks.size(); 1524 } 1525 1526private: 1527 struct TrackInfo { 1528 AString mURL; 1529 int mRTPSocket; 1530 int mRTCPSocket; 1531 bool mUsingInterleavedTCP; 1532 uint32_t mFirstSeqNumInSegment; 1533 bool mNewSegment; 1534 int32_t mAllowedStaleAccessUnits; 1535 1536 uint32_t mRTPAnchor; 1537 int64_t mNTPAnchorUs; 1538 int32_t mTimeScale; 1539 bool mEOSReceived; 1540 1541 uint32_t mNormalPlayTimeRTP; 1542 int64_t mNormalPlayTimeUs; 1543 1544 sp<APacketSource> mPacketSource; 1545 1546 // Stores packets temporarily while no notion of time 1547 // has been established yet. 1548 List<sp<ABuffer> > mPackets; 1549 }; 1550 1551 sp<AMessage> mNotify; 1552 bool mUIDValid; 1553 uid_t mUID; 1554 sp<ALooper> mNetLooper; 1555 sp<ARTSPConnection> mConn; 1556 sp<ARTPConnection> mRTPConn; 1557 sp<ASessionDescription> mSessionDesc; 1558 AString mOriginalSessionURL; // This one still has user:pass@ 1559 AString mSessionURL; 1560 AString mSessionHost; 1561 AString mBaseURL; 1562 AString mControlURL; 1563 AString mSessionID; 1564 bool mSetupTracksSuccessful; 1565 bool mSeekPending; 1566 bool mFirstAccessUnit; 1567 1568 bool mAllTracksHaveTime; 1569 int64_t mNTPAnchorUs; 1570 int64_t mMediaAnchorUs; 1571 int64_t mLastMediaTimeUs; 1572 1573 int64_t mNumAccessUnitsReceived; 1574 bool mCheckPending; 1575 int32_t mCheckGeneration; 1576 int32_t mCheckTimeoutGeneration; 1577 bool mTryTCPInterleaving; 1578 bool mTryFakeRTCP; 1579 bool mReceivedFirstRTCPPacket; 1580 bool mReceivedFirstRTPPacket; 1581 bool mSeekable; 1582 int64_t mKeepAliveTimeoutUs; 1583 int32_t mKeepAliveGeneration; 1584 bool mPausing; 1585 int32_t mPauseGeneration; 1586 1587 Vector<TrackInfo> mTracks; 1588 1589 bool mPlayResponseParsed; 1590 1591 void setupTrack(size_t index) { 1592 sp<APacketSource> source = 1593 new APacketSource(mSessionDesc, index); 1594 1595 if (source->initCheck() != OK) { 1596 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1597 1598 sp<AMessage> reply = new AMessage('setu', this); 1599 reply->setSize("index", index); 1600 reply->setInt32("result", ERROR_UNSUPPORTED); 1601 reply->post(); 1602 return; 1603 } 1604 1605 AString url; 1606 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1607 1608 AString trackURL; 1609 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1610 1611 mTracks.push(TrackInfo()); 1612 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1613 info->mURL = trackURL; 1614 info->mPacketSource = source; 1615 info->mUsingInterleavedTCP = false; 1616 info->mFirstSeqNumInSegment = 0; 1617 info->mNewSegment = true; 1618 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1619 info->mRTPSocket = -1; 1620 info->mRTCPSocket = -1; 1621 info->mRTPAnchor = 0; 1622 info->mNTPAnchorUs = -1; 1623 info->mNormalPlayTimeRTP = 0; 1624 info->mNormalPlayTimeUs = 0ll; 1625 1626 unsigned long PT; 1627 AString formatDesc; 1628 AString formatParams; 1629 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1630 1631 int32_t timescale; 1632 int32_t numChannels; 1633 ASessionDescription::ParseFormatDesc( 1634 formatDesc.c_str(), ×cale, &numChannels); 1635 1636 info->mTimeScale = timescale; 1637 info->mEOSReceived = false; 1638 1639 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1640 1641 AString request = "SETUP "; 1642 request.append(trackURL); 1643 request.append(" RTSP/1.0\r\n"); 1644 1645 if (mTryTCPInterleaving) { 1646 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1647 info->mUsingInterleavedTCP = true; 1648 info->mRTPSocket = interleaveIndex; 1649 info->mRTCPSocket = interleaveIndex + 1; 1650 1651 request.append("Transport: RTP/AVP/TCP;interleaved="); 1652 request.append(interleaveIndex); 1653 request.append("-"); 1654 request.append(interleaveIndex + 1); 1655 } else { 1656 unsigned rtpPort; 1657 ARTPConnection::MakePortPair( 1658 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1659 1660 if (mUIDValid) { 1661 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1662 (uint32_t)*(uint32_t*) "RTP_"); 1663 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1664 (uint32_t)*(uint32_t*) "RTP_"); 1665 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1666 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1667 } 1668 1669 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1670 request.append(rtpPort); 1671 request.append("-"); 1672 request.append(rtpPort + 1); 1673 } 1674 1675 request.append("\r\n"); 1676 1677 if (index > 1) { 1678 request.append("Session: "); 1679 request.append(mSessionID); 1680 request.append("\r\n"); 1681 } 1682 1683 request.append("\r\n"); 1684 1685 sp<AMessage> reply = new AMessage('setu', this); 1686 reply->setSize("index", index); 1687 reply->setSize("track-index", mTracks.size() - 1); 1688 mConn->sendRequest(request.c_str(), reply); 1689 } 1690 1691 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1692 out->clear(); 1693 1694 if (strncasecmp("rtsp://", baseURL, 7)) { 1695 // Base URL must be absolute 1696 return false; 1697 } 1698 1699 if (!strncasecmp("rtsp://", url, 7)) { 1700 // "url" is already an absolute URL, ignore base URL. 1701 out->setTo(url); 1702 return true; 1703 } 1704 1705 size_t n = strlen(baseURL); 1706 out->setTo(baseURL); 1707 if (baseURL[n - 1] != '/') { 1708 out->append("/"); 1709 } 1710 out->append(url); 1711 1712 return true; 1713 } 1714 1715 void fakeTimestamps() { 1716 mNTPAnchorUs = -1ll; 1717 for (size_t i = 0; i < mTracks.size(); ++i) { 1718 onTimeUpdate(i, 0, 0ll); 1719 } 1720 } 1721 1722 bool dataReceivedOnAllChannels() { 1723 TrackInfo *track; 1724 for (size_t i = 0; i < mTracks.size(); ++i) { 1725 track = &mTracks.editItemAt(i); 1726 if (track->mPackets.empty()) { 1727 return false; 1728 } 1729 } 1730 return true; 1731 } 1732 1733 void handleFirstAccessUnit() { 1734 if (mFirstAccessUnit) { 1735 sp<AMessage> msg = mNotify->dup(); 1736 msg->setInt32("what", kWhatConnected); 1737 msg->post(); 1738 1739 if (mSeekable) { 1740 for (size_t i = 0; i < mTracks.size(); ++i) { 1741 TrackInfo *info = &mTracks.editItemAt(i); 1742 1743 postNormalPlayTimeMapping( 1744 i, 1745 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1746 } 1747 } 1748 1749 mFirstAccessUnit = false; 1750 } 1751 } 1752 1753 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1754 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1755 trackIndex, rtpTime, (long long)ntpTime); 1756 1757 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1758 1759 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1760 1761 track->mRTPAnchor = rtpTime; 1762 track->mNTPAnchorUs = ntpTimeUs; 1763 1764 if (mNTPAnchorUs < 0) { 1765 mNTPAnchorUs = ntpTimeUs; 1766 mMediaAnchorUs = mLastMediaTimeUs; 1767 } 1768 1769 if (!mAllTracksHaveTime) { 1770 bool allTracksHaveTime = (mTracks.size() > 0); 1771 for (size_t i = 0; i < mTracks.size(); ++i) { 1772 TrackInfo *track = &mTracks.editItemAt(i); 1773 if (track->mNTPAnchorUs < 0) { 1774 allTracksHaveTime = false; 1775 break; 1776 } 1777 } 1778 if (allTracksHaveTime) { 1779 mAllTracksHaveTime = true; 1780 ALOGI("Time now established for all tracks."); 1781 } 1782 } 1783 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1784 handleFirstAccessUnit(); 1785 1786 // Time is now established, lets start timestamping immediately 1787 for (size_t i = 0; i < mTracks.size(); ++i) { 1788 if (OK != processAccessUnitQueue(i)) { 1789 return; 1790 } 1791 } 1792 for (size_t i = 0; i < mTracks.size(); ++i) { 1793 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1794 if (trackInfo->mEOSReceived) { 1795 postQueueEOS(i, ERROR_END_OF_STREAM); 1796 trackInfo->mEOSReceived = false; 1797 } 1798 } 1799 } 1800 } 1801 1802 status_t processAccessUnitQueue(int32_t trackIndex) { 1803 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1804 while (!track->mPackets.empty()) { 1805 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1806 track->mPackets.erase(track->mPackets.begin()); 1807 1808 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1809 if (track->mNewSegment) { 1810 // The sequence number from RTP packet has only 16 bits and is extended 1811 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of 1812 // RTSP "PLAY" command should be used to detect the first RTP packet 1813 // after seeking. 1814 if (mSeekable) { 1815 if (track->mAllowedStaleAccessUnits > 0) { 1816 uint32_t seqNum16 = seqNum & 0xffff; 1817 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff; 1818 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits 1819 || seqNum16 < firstSeqNumInSegment16) { 1820 // Not the first rtp packet of the stream after seeking, discarding. 1821 track->mAllowedStaleAccessUnits--; 1822 ALOGV("discarding stale access unit (0x%x : 0x%x)", 1823 seqNum, track->mFirstSeqNumInSegment); 1824 continue; 1825 } 1826 ALOGW_IF(seqNum16 != firstSeqNumInSegment16, 1827 "Missing the first packet(%u), now take packet(%u) as first one", 1828 track->mFirstSeqNumInSegment, seqNum); 1829 } else { // track->mAllowedStaleAccessUnits <= 0 1830 mNumAccessUnitsReceived = 0; 1831 ALOGW_IF(track->mAllowedStaleAccessUnits == 0, 1832 "Still no first rtp packet after %d stale ones", 1833 kMaxAllowedStaleAccessUnits); 1834 track->mAllowedStaleAccessUnits = -1; 1835 return UNKNOWN_ERROR; 1836 } 1837 } 1838 1839 // Now found the first rtp packet of the stream after seeking. 1840 track->mFirstSeqNumInSegment = seqNum; 1841 track->mNewSegment = false; 1842 } 1843 1844 if (seqNum < track->mFirstSeqNumInSegment) { 1845 ALOGV("dropping stale access-unit (%d < %d)", 1846 seqNum, track->mFirstSeqNumInSegment); 1847 continue; 1848 } 1849 1850 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1851 postQueueAccessUnit(trackIndex, accessUnit); 1852 } 1853 } 1854 return OK; 1855 } 1856 1857 void onAccessUnitComplete( 1858 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1859 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1860 track->mPackets.push_back(accessUnit); 1861 1862 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1863 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum); 1864 1865 if(!mPlayResponseParsed){ 1866 ALOGV("play response is not parsed"); 1867 return; 1868 } 1869 1870 handleFirstAccessUnit(); 1871 1872 if (!mAllTracksHaveTime) { 1873 ALOGV("storing accessUnit, no time established yet"); 1874 return; 1875 } 1876 1877 if (OK != processAccessUnitQueue(trackIndex)) { 1878 return; 1879 } 1880 1881 if (track->mEOSReceived) { 1882 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1883 track->mEOSReceived = false; 1884 } 1885 } 1886 1887 bool addMediaTimestamp( 1888 int32_t trackIndex, const TrackInfo *track, 1889 const sp<ABuffer> &accessUnit) { 1890 UNUSED_UNLESS_VERBOSE(trackIndex); 1891 1892 uint32_t rtpTime; 1893 CHECK(accessUnit->meta()->findInt32( 1894 "rtp-time", (int32_t *)&rtpTime)); 1895 1896 int64_t relRtpTimeUs = 1897 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1898 / track->mTimeScale; 1899 1900 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1901 1902 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1903 1904 if (mediaTimeUs > mLastMediaTimeUs) { 1905 mLastMediaTimeUs = mediaTimeUs; 1906 } 1907 1908 if (mediaTimeUs < 0) { 1909 ALOGV("dropping early accessUnit."); 1910 return false; 1911 } 1912 1913 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1914 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1915 1916 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1917 1918 return true; 1919 } 1920 1921 void postQueueAccessUnit( 1922 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1923 sp<AMessage> msg = mNotify->dup(); 1924 msg->setInt32("what", kWhatAccessUnit); 1925 msg->setSize("trackIndex", trackIndex); 1926 msg->setBuffer("accessUnit", accessUnit); 1927 msg->post(); 1928 } 1929 1930 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1931 sp<AMessage> msg = mNotify->dup(); 1932 msg->setInt32("what", kWhatEOS); 1933 msg->setSize("trackIndex", trackIndex); 1934 msg->setInt32("finalResult", finalResult); 1935 msg->post(); 1936 } 1937 1938 void postQueueSeekDiscontinuity(size_t trackIndex) { 1939 sp<AMessage> msg = mNotify->dup(); 1940 msg->setInt32("what", kWhatSeekDiscontinuity); 1941 msg->setSize("trackIndex", trackIndex); 1942 msg->post(); 1943 } 1944 1945 void postNormalPlayTimeMapping( 1946 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1947 sp<AMessage> msg = mNotify->dup(); 1948 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1949 msg->setSize("trackIndex", trackIndex); 1950 msg->setInt32("rtpTime", rtpTime); 1951 msg->setInt64("nptUs", nptUs); 1952 msg->post(); 1953 } 1954 1955 void postTimeout() { 1956 sp<AMessage> timeout = new AMessage('tiou', this); 1957 mCheckTimeoutGeneration++; 1958 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1959 1960 int64_t startupTimeoutUs; 1961 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs); 1962 timeout->post(startupTimeoutUs); 1963 } 1964 1965 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1966}; 1967 1968} // namespace android 1969 1970#endif // MY_HANDLER_H_ 1971