1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22 23#ifndef LOG_TAG 24#define LOG_TAG "MyHandler" 25#endif 26 27#include <utils/Log.h> 28 29#include "APacketSource.h" 30#include "ARTPConnection.h" 31#include "ARTSPConnection.h" 32#include "ASessionDescription.h" 33 34#include <ctype.h> 35 36#include <media/stagefright/foundation/ABuffer.h> 37#include <media/stagefright/foundation/ADebug.h> 38#include <media/stagefright/foundation/ALooper.h> 39#include <media/stagefright/foundation/AMessage.h> 40#include <media/stagefright/MediaErrors.h> 41#include <media/stagefright/Utils.h> 42 43#include <arpa/inet.h> 44#include <sys/socket.h> 45#include <netdb.h> 46 47#include "HTTPBase.h" 48 49#if LOG_NDEBUG 50#define UNUSED_UNLESS_VERBOSE(x) (void)(x) 51#else 52#define UNUSED_UNLESS_VERBOSE(x) 53#endif 54 55// If no access units are received within 5 secs, assume that the rtp 56// stream has ended and signal end of stream. 57static int64_t kAccessUnitTimeoutUs = 10000000ll; 58 59// If no access units arrive for the first 10 secs after starting the 60// stream, assume none ever will and signal EOS or switch transports. 61static int64_t kStartupTimeoutUs = 10000000ll; 62 63static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 64 65static int64_t kPauseDelayUs = 3000000ll; 66 67namespace android { 68 69static bool GetAttribute(const char *s, const char *key, AString *value) { 70 value->clear(); 71 72 size_t keyLen = strlen(key); 73 74 for (;;) { 75 while (isspace(*s)) { 76 ++s; 77 } 78 79 const char *colonPos = strchr(s, ';'); 80 81 size_t len = 82 (colonPos == NULL) ? strlen(s) : colonPos - s; 83 84 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 85 value->setTo(&s[keyLen + 1], len - keyLen - 1); 86 return true; 87 } 88 89 if (colonPos == NULL) { 90 return false; 91 } 92 93 s = colonPos + 1; 94 } 95} 96 97struct MyHandler : public AHandler { 98 enum { 99 kWhatConnected = 'conn', 100 kWhatDisconnected = 'disc', 101 kWhatSeekPaused = 'spau', 102 kWhatSeekDone = 'sdon', 103 104 kWhatAccessUnit = 'accU', 105 kWhatEOS = 'eos!', 106 kWhatSeekDiscontinuity = 'seeD', 107 kWhatNormalPlayTimeMapping = 'nptM', 108 }; 109 110 MyHandler( 111 const char *url, 112 const sp<AMessage> ¬ify, 113 bool uidValid = false, uid_t uid = 0) 114 : mNotify(notify), 115 mUIDValid(uidValid), 116 mUID(uid), 117 mNetLooper(new ALooper), 118 mConn(new ARTSPConnection(mUIDValid, mUID)), 119 mRTPConn(new ARTPConnection), 120 mOriginalSessionURL(url), 121 mSessionURL(url), 122 mSetupTracksSuccessful(false), 123 mSeekPending(false), 124 mFirstAccessUnit(true), 125 mAllTracksHaveTime(false), 126 mNTPAnchorUs(-1), 127 mMediaAnchorUs(-1), 128 mLastMediaTimeUs(0), 129 mNumAccessUnitsReceived(0), 130 mCheckPending(false), 131 mCheckGeneration(0), 132 mCheckTimeoutGeneration(0), 133 mTryTCPInterleaving(false), 134 mTryFakeRTCP(false), 135 mReceivedFirstRTCPPacket(false), 136 mReceivedFirstRTPPacket(false), 137 mSeekable(true), 138 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 139 mKeepAliveGeneration(0), 140 mPausing(false), 141 mPauseGeneration(0), 142 mPlayResponseParsed(false) { 143 mNetLooper->setName("rtsp net"); 144 mNetLooper->start(false /* runOnCallingThread */, 145 false /* canCallJava */, 146 PRIORITY_HIGHEST); 147 148 // Strip any authentication info from the session url, we don't 149 // want to transmit user/pass in cleartext. 150 AString host, path, user, pass; 151 unsigned port; 152 CHECK(ARTSPConnection::ParseURL( 153 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 154 155 if (user.size() > 0) { 156 mSessionURL.clear(); 157 mSessionURL.append("rtsp://"); 158 mSessionURL.append(host); 159 mSessionURL.append(":"); 160 mSessionURL.append(AStringPrintf("%u", port)); 161 mSessionURL.append(path); 162 163 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 164 } 165 166 mSessionHost = host; 167 } 168 169 void connect() { 170 looper()->registerHandler(mConn); 171 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 172 173 sp<AMessage> notify = new AMessage('biny', this); 174 mConn->observeBinaryData(notify); 175 176 sp<AMessage> reply = new AMessage('conn', this); 177 mConn->connect(mOriginalSessionURL.c_str(), reply); 178 } 179 180 void loadSDP(const sp<ASessionDescription>& desc) { 181 looper()->registerHandler(mConn); 182 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 183 184 sp<AMessage> notify = new AMessage('biny', this); 185 mConn->observeBinaryData(notify); 186 187 sp<AMessage> reply = new AMessage('sdpl', this); 188 reply->setObject("description", desc); 189 mConn->connect(mOriginalSessionURL.c_str(), reply); 190 } 191 192 AString getControlURL() { 193 AString sessionLevelControlURL; 194 if (mSessionDesc->findAttribute( 195 0, 196 "a=control", 197 &sessionLevelControlURL)) { 198 if (sessionLevelControlURL.compare("*") == 0) { 199 return mBaseURL; 200 } else { 201 AString controlURL; 202 CHECK(MakeURL( 203 mBaseURL.c_str(), 204 sessionLevelControlURL.c_str(), 205 &controlURL)); 206 return controlURL; 207 } 208 } else { 209 return mSessionURL; 210 } 211 } 212 213 void disconnect() { 214 (new AMessage('abor', this))->post(); 215 } 216 217 void seek(int64_t timeUs) { 218 sp<AMessage> msg = new AMessage('seek', this); 219 msg->setInt64("time", timeUs); 220 mPauseGeneration++; 221 msg->post(); 222 } 223 224 void continueSeekAfterPause(int64_t timeUs) { 225 sp<AMessage> msg = new AMessage('see1', this); 226 msg->setInt64("time", timeUs); 227 msg->post(); 228 } 229 230 bool isSeekable() const { 231 return mSeekable; 232 } 233 234 void pause() { 235 sp<AMessage> msg = new AMessage('paus', this); 236 mPauseGeneration++; 237 msg->setInt32("pausecheck", mPauseGeneration); 238 msg->post(kPauseDelayUs); 239 } 240 241 void resume() { 242 sp<AMessage> msg = new AMessage('resu', this); 243 mPauseGeneration++; 244 msg->post(); 245 } 246 247 static void addRR(const sp<ABuffer> &buf) { 248 uint8_t *ptr = buf->data() + buf->size(); 249 ptr[0] = 0x80 | 0; 250 ptr[1] = 201; // RR 251 ptr[2] = 0; 252 ptr[3] = 1; 253 ptr[4] = 0xde; // SSRC 254 ptr[5] = 0xad; 255 ptr[6] = 0xbe; 256 ptr[7] = 0xef; 257 258 buf->setRange(0, buf->size() + 8); 259 } 260 261 static void addSDES(int s, const sp<ABuffer> &buffer) { 262 struct sockaddr_in addr; 263 socklen_t addrSize = sizeof(addr); 264 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 265 inet_aton("0.0.0.0", &(addr.sin_addr)); 266 } 267 268 uint8_t *data = buffer->data() + buffer->size(); 269 data[0] = 0x80 | 1; 270 data[1] = 202; // SDES 271 data[4] = 0xde; // SSRC 272 data[5] = 0xad; 273 data[6] = 0xbe; 274 data[7] = 0xef; 275 276 size_t offset = 8; 277 278 data[offset++] = 1; // CNAME 279 280 AString cname = "stagefright@"; 281 cname.append(inet_ntoa(addr.sin_addr)); 282 data[offset++] = cname.size(); 283 284 memcpy(&data[offset], cname.c_str(), cname.size()); 285 offset += cname.size(); 286 287 data[offset++] = 6; // TOOL 288 289 AString tool = MakeUserAgent(); 290 291 data[offset++] = tool.size(); 292 293 memcpy(&data[offset], tool.c_str(), tool.size()); 294 offset += tool.size(); 295 296 data[offset++] = 0; 297 298 if ((offset % 4) > 0) { 299 size_t count = 4 - (offset % 4); 300 switch (count) { 301 case 3: 302 data[offset++] = 0; 303 case 2: 304 data[offset++] = 0; 305 case 1: 306 data[offset++] = 0; 307 } 308 } 309 310 size_t numWords = (offset / 4) - 1; 311 data[2] = numWords >> 8; 312 data[3] = numWords & 0xff; 313 314 buffer->setRange(buffer->offset(), buffer->size() + offset); 315 } 316 317 // In case we're behind NAT, fire off two UDP packets to the remote 318 // rtp/rtcp ports to poke a hole into the firewall for future incoming 319 // packets. We're going to send an RR/SDES RTCP packet to both of them. 320 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 321 struct sockaddr_in addr; 322 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 323 addr.sin_family = AF_INET; 324 325 AString source; 326 AString server_port; 327 if (!GetAttribute(transport.c_str(), 328 "source", 329 &source)) { 330 ALOGW("Missing 'source' field in Transport response. Using " 331 "RTSP endpoint address."); 332 333 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 334 if (ent == NULL) { 335 ALOGE("Failed to look up address of session host '%s'", 336 mSessionHost.c_str()); 337 338 return false; 339 } 340 341 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 342 } else { 343 addr.sin_addr.s_addr = inet_addr(source.c_str()); 344 } 345 346 if (!GetAttribute(transport.c_str(), 347 "server_port", 348 &server_port)) { 349 ALOGI("Missing 'server_port' field in Transport response."); 350 return false; 351 } 352 353 int rtpPort, rtcpPort; 354 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 355 || rtpPort <= 0 || rtpPort > 65535 356 || rtcpPort <=0 || rtcpPort > 65535 357 || rtcpPort != rtpPort + 1) { 358 ALOGE("Server picked invalid RTP/RTCP port pair %s," 359 " RTP port must be even, RTCP port must be one higher.", 360 server_port.c_str()); 361 362 return false; 363 } 364 365 if (rtpPort & 1) { 366 ALOGW("Server picked an odd RTP port, it should've picked an " 367 "even one, we'll let it pass for now, but this may break " 368 "in the future."); 369 } 370 371 if (addr.sin_addr.s_addr == INADDR_NONE) { 372 return true; 373 } 374 375 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 376 // No firewalls to traverse on the loopback interface. 377 return true; 378 } 379 380 // Make up an RR/SDES RTCP packet. 381 sp<ABuffer> buf = new ABuffer(65536); 382 buf->setRange(0, 0); 383 addRR(buf); 384 addSDES(rtpSocket, buf); 385 386 addr.sin_port = htons(rtpPort); 387 388 ssize_t n = sendto( 389 rtpSocket, buf->data(), buf->size(), 0, 390 (const sockaddr *)&addr, sizeof(addr)); 391 392 if (n < (ssize_t)buf->size()) { 393 ALOGE("failed to poke a hole for RTP packets"); 394 return false; 395 } 396 397 addr.sin_port = htons(rtcpPort); 398 399 n = sendto( 400 rtcpSocket, buf->data(), buf->size(), 0, 401 (const sockaddr *)&addr, sizeof(addr)); 402 403 if (n < (ssize_t)buf->size()) { 404 ALOGE("failed to poke a hole for RTCP packets"); 405 return false; 406 } 407 408 ALOGV("successfully poked holes."); 409 410 return true; 411 } 412 413 static bool isLiveStream(const sp<ASessionDescription> &desc) { 414 AString attrLiveStream; 415 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 416 ssize_t semicolonPos = attrLiveStream.find(";", 2); 417 418 const char* liveStreamValue; 419 if (semicolonPos < 0) { 420 liveStreamValue = attrLiveStream.c_str(); 421 } else { 422 AString valString; 423 valString.setTo(attrLiveStream, 424 semicolonPos + 1, 425 attrLiveStream.size() - semicolonPos - 1); 426 liveStreamValue = valString.c_str(); 427 } 428 429 uint32_t value = strtoul(liveStreamValue, NULL, 10); 430 if (value == 1) { 431 ALOGV("found live stream"); 432 return true; 433 } 434 } else { 435 // It is a live stream if no duration is returned 436 int64_t durationUs; 437 if (!desc->getDurationUs(&durationUs)) { 438 ALOGV("No duration found, assume live stream"); 439 return true; 440 } 441 } 442 443 return false; 444 } 445 446 virtual void onMessageReceived(const sp<AMessage> &msg) { 447 switch (msg->what()) { 448 case 'conn': 449 { 450 int32_t result; 451 CHECK(msg->findInt32("result", &result)); 452 453 ALOGI("connection request completed with result %d (%s)", 454 result, strerror(-result)); 455 456 if (result == OK) { 457 AString request; 458 request = "DESCRIBE "; 459 request.append(mSessionURL); 460 request.append(" RTSP/1.0\r\n"); 461 request.append("Accept: application/sdp\r\n"); 462 request.append("\r\n"); 463 464 sp<AMessage> reply = new AMessage('desc', this); 465 mConn->sendRequest(request.c_str(), reply); 466 } else { 467 (new AMessage('disc', this))->post(); 468 } 469 break; 470 } 471 472 case 'disc': 473 { 474 ++mKeepAliveGeneration; 475 476 int32_t reconnect; 477 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 478 sp<AMessage> reply = new AMessage('conn', this); 479 mConn->connect(mOriginalSessionURL.c_str(), reply); 480 } else { 481 (new AMessage('quit', this))->post(); 482 } 483 break; 484 } 485 486 case 'desc': 487 { 488 int32_t result; 489 CHECK(msg->findInt32("result", &result)); 490 491 ALOGI("DESCRIBE completed with result %d (%s)", 492 result, strerror(-result)); 493 494 if (result == OK) { 495 sp<RefBase> obj; 496 CHECK(msg->findObject("response", &obj)); 497 sp<ARTSPResponse> response = 498 static_cast<ARTSPResponse *>(obj.get()); 499 500 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 501 ssize_t i = response->mHeaders.indexOfKey("location"); 502 CHECK_GE(i, 0); 503 504 mOriginalSessionURL = response->mHeaders.valueAt(i); 505 mSessionURL = mOriginalSessionURL; 506 507 // Strip any authentication info from the session url, we don't 508 // want to transmit user/pass in cleartext. 509 AString host, path, user, pass; 510 unsigned port; 511 if (ARTSPConnection::ParseURL( 512 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 513 && user.size() > 0) { 514 mSessionURL.clear(); 515 mSessionURL.append("rtsp://"); 516 mSessionURL.append(host); 517 mSessionURL.append(":"); 518 mSessionURL.append(AStringPrintf("%u", port)); 519 mSessionURL.append(path); 520 521 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 522 } 523 524 sp<AMessage> reply = new AMessage('conn', this); 525 mConn->connect(mOriginalSessionURL.c_str(), reply); 526 break; 527 } 528 529 if (response->mStatusCode != 200) { 530 result = UNKNOWN_ERROR; 531 } else if (response->mContent == NULL) { 532 result = ERROR_MALFORMED; 533 ALOGE("The response has no content."); 534 } else { 535 mSessionDesc = new ASessionDescription; 536 537 mSessionDesc->setTo( 538 response->mContent->data(), 539 response->mContent->size()); 540 541 if (!mSessionDesc->isValid()) { 542 ALOGE("Failed to parse session description."); 543 result = ERROR_MALFORMED; 544 } else { 545 ssize_t i = response->mHeaders.indexOfKey("content-base"); 546 if (i >= 0) { 547 mBaseURL = response->mHeaders.valueAt(i); 548 } else { 549 i = response->mHeaders.indexOfKey("content-location"); 550 if (i >= 0) { 551 mBaseURL = response->mHeaders.valueAt(i); 552 } else { 553 mBaseURL = mSessionURL; 554 } 555 } 556 557 mSeekable = !isLiveStream(mSessionDesc); 558 559 if (!mBaseURL.startsWith("rtsp://")) { 560 // Some misbehaving servers specify a relative 561 // URL in one of the locations above, combine 562 // it with the absolute session URL to get 563 // something usable... 564 565 ALOGW("Server specified a non-absolute base URL" 566 ", combining it with the session URL to " 567 "get something usable..."); 568 569 AString tmp; 570 CHECK(MakeURL( 571 mSessionURL.c_str(), 572 mBaseURL.c_str(), 573 &tmp)); 574 575 mBaseURL = tmp; 576 } 577 578 mControlURL = getControlURL(); 579 580 if (mSessionDesc->countTracks() < 2) { 581 // There's no actual tracks in this session. 582 // The first "track" is merely session meta 583 // data. 584 585 ALOGW("Session doesn't contain any playable " 586 "tracks. Aborting."); 587 result = ERROR_UNSUPPORTED; 588 } else { 589 setupTrack(1); 590 } 591 } 592 } 593 } 594 595 if (result != OK) { 596 sp<AMessage> reply = new AMessage('disc', this); 597 mConn->disconnect(reply); 598 } 599 break; 600 } 601 602 case 'sdpl': 603 { 604 int32_t result; 605 CHECK(msg->findInt32("result", &result)); 606 607 ALOGI("SDP connection request completed with result %d (%s)", 608 result, strerror(-result)); 609 610 if (result == OK) { 611 sp<RefBase> obj; 612 CHECK(msg->findObject("description", &obj)); 613 mSessionDesc = 614 static_cast<ASessionDescription *>(obj.get()); 615 616 if (!mSessionDesc->isValid()) { 617 ALOGE("Failed to parse session description."); 618 result = ERROR_MALFORMED; 619 } else { 620 mBaseURL = mSessionURL; 621 622 mSeekable = !isLiveStream(mSessionDesc); 623 624 mControlURL = getControlURL(); 625 626 if (mSessionDesc->countTracks() < 2) { 627 // There's no actual tracks in this session. 628 // The first "track" is merely session meta 629 // data. 630 631 ALOGW("Session doesn't contain any playable " 632 "tracks. Aborting."); 633 result = ERROR_UNSUPPORTED; 634 } else { 635 setupTrack(1); 636 } 637 } 638 } 639 640 if (result != OK) { 641 sp<AMessage> reply = new AMessage('disc', this); 642 mConn->disconnect(reply); 643 } 644 break; 645 } 646 647 case 'setu': 648 { 649 size_t index; 650 CHECK(msg->findSize("index", &index)); 651 652 TrackInfo *track = NULL; 653 size_t trackIndex; 654 if (msg->findSize("track-index", &trackIndex)) { 655 track = &mTracks.editItemAt(trackIndex); 656 } 657 658 int32_t result; 659 CHECK(msg->findInt32("result", &result)); 660 661 ALOGI("SETUP(%zu) completed with result %d (%s)", 662 index, result, strerror(-result)); 663 664 if (result == OK) { 665 CHECK(track != NULL); 666 667 sp<RefBase> obj; 668 CHECK(msg->findObject("response", &obj)); 669 sp<ARTSPResponse> response = 670 static_cast<ARTSPResponse *>(obj.get()); 671 672 if (response->mStatusCode != 200) { 673 result = UNKNOWN_ERROR; 674 } else { 675 ssize_t i = response->mHeaders.indexOfKey("session"); 676 CHECK_GE(i, 0); 677 678 mSessionID = response->mHeaders.valueAt(i); 679 680 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 681 AString timeoutStr; 682 if (GetAttribute( 683 mSessionID.c_str(), "timeout", &timeoutStr)) { 684 char *end; 685 unsigned long timeoutSecs = 686 strtoul(timeoutStr.c_str(), &end, 10); 687 688 if (end == timeoutStr.c_str() || *end != '\0') { 689 ALOGW("server specified malformed timeout '%s'", 690 timeoutStr.c_str()); 691 692 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 693 } else if (timeoutSecs < 15) { 694 ALOGW("server specified too short a timeout " 695 "(%lu secs), using default.", 696 timeoutSecs); 697 698 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 699 } else { 700 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 701 702 ALOGI("server specified timeout of %lu secs.", 703 timeoutSecs); 704 } 705 } 706 707 i = mSessionID.find(";"); 708 if (i >= 0) { 709 // Remove options, i.e. ";timeout=90" 710 mSessionID.erase(i, mSessionID.size() - i); 711 } 712 713 sp<AMessage> notify = new AMessage('accu', this); 714 notify->setSize("track-index", trackIndex); 715 716 i = response->mHeaders.indexOfKey("transport"); 717 CHECK_GE(i, 0); 718 719 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 720 if (!track->mUsingInterleavedTCP) { 721 AString transport = response->mHeaders.valueAt(i); 722 723 // We are going to continue even if we were 724 // unable to poke a hole into the firewall... 725 pokeAHole( 726 track->mRTPSocket, 727 track->mRTCPSocket, 728 transport); 729 } 730 731 mRTPConn->addStream( 732 track->mRTPSocket, track->mRTCPSocket, 733 mSessionDesc, index, 734 notify, track->mUsingInterleavedTCP); 735 736 mSetupTracksSuccessful = true; 737 } else { 738 result = BAD_VALUE; 739 } 740 } 741 } 742 743 if (result != OK) { 744 if (track) { 745 if (!track->mUsingInterleavedTCP) { 746 // Clear the tag 747 if (mUIDValid) { 748 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 749 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 750 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 751 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 752 } 753 754 close(track->mRTPSocket); 755 close(track->mRTCPSocket); 756 } 757 758 mTracks.removeItemsAt(trackIndex); 759 } 760 } 761 762 ++index; 763 if (result == OK && index < mSessionDesc->countTracks()) { 764 setupTrack(index); 765 } else if (mSetupTracksSuccessful) { 766 ++mKeepAliveGeneration; 767 postKeepAlive(); 768 769 AString request = "PLAY "; 770 request.append(mControlURL); 771 request.append(" RTSP/1.0\r\n"); 772 773 request.append("Session: "); 774 request.append(mSessionID); 775 request.append("\r\n"); 776 777 request.append("\r\n"); 778 779 sp<AMessage> reply = new AMessage('play', this); 780 mConn->sendRequest(request.c_str(), reply); 781 } else { 782 sp<AMessage> reply = new AMessage('disc', this); 783 mConn->disconnect(reply); 784 } 785 break; 786 } 787 788 case 'play': 789 { 790 int32_t result; 791 CHECK(msg->findInt32("result", &result)); 792 793 ALOGI("PLAY completed with result %d (%s)", 794 result, strerror(-result)); 795 796 if (result == OK) { 797 sp<RefBase> obj; 798 CHECK(msg->findObject("response", &obj)); 799 sp<ARTSPResponse> response = 800 static_cast<ARTSPResponse *>(obj.get()); 801 802 if (response->mStatusCode != 200) { 803 result = UNKNOWN_ERROR; 804 } else { 805 parsePlayResponse(response); 806 807 sp<AMessage> timeout = new AMessage('tiou', this); 808 mCheckTimeoutGeneration++; 809 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 810 timeout->post(kStartupTimeoutUs); 811 } 812 } 813 814 if (result != OK) { 815 sp<AMessage> reply = new AMessage('disc', this); 816 mConn->disconnect(reply); 817 } 818 819 break; 820 } 821 822 case 'aliv': 823 { 824 int32_t generation; 825 CHECK(msg->findInt32("generation", &generation)); 826 827 if (generation != mKeepAliveGeneration) { 828 // obsolete event. 829 break; 830 } 831 832 AString request; 833 request.append("OPTIONS "); 834 request.append(mSessionURL); 835 request.append(" RTSP/1.0\r\n"); 836 request.append("Session: "); 837 request.append(mSessionID); 838 request.append("\r\n"); 839 request.append("\r\n"); 840 841 sp<AMessage> reply = new AMessage('opts', this); 842 reply->setInt32("generation", mKeepAliveGeneration); 843 mConn->sendRequest(request.c_str(), reply); 844 break; 845 } 846 847 case 'opts': 848 { 849 int32_t result; 850 CHECK(msg->findInt32("result", &result)); 851 852 ALOGI("OPTIONS completed with result %d (%s)", 853 result, strerror(-result)); 854 855 int32_t generation; 856 CHECK(msg->findInt32("generation", &generation)); 857 858 if (generation != mKeepAliveGeneration) { 859 // obsolete event. 860 break; 861 } 862 863 postKeepAlive(); 864 break; 865 } 866 867 case 'abor': 868 { 869 for (size_t i = 0; i < mTracks.size(); ++i) { 870 TrackInfo *info = &mTracks.editItemAt(i); 871 872 if (!mFirstAccessUnit) { 873 postQueueEOS(i, ERROR_END_OF_STREAM); 874 } 875 876 if (!info->mUsingInterleavedTCP) { 877 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 878 879 // Clear the tag 880 if (mUIDValid) { 881 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 882 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 883 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 884 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 885 } 886 887 close(info->mRTPSocket); 888 close(info->mRTCPSocket); 889 } 890 } 891 mTracks.clear(); 892 mSetupTracksSuccessful = false; 893 mSeekPending = false; 894 mFirstAccessUnit = true; 895 mAllTracksHaveTime = false; 896 mNTPAnchorUs = -1; 897 mMediaAnchorUs = -1; 898 mNumAccessUnitsReceived = 0; 899 mReceivedFirstRTCPPacket = false; 900 mReceivedFirstRTPPacket = false; 901 mPausing = false; 902 mSeekable = true; 903 904 sp<AMessage> reply = new AMessage('tear', this); 905 906 int32_t reconnect; 907 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 908 reply->setInt32("reconnect", true); 909 } 910 911 AString request; 912 request = "TEARDOWN "; 913 914 // XXX should use aggregate url from SDP here... 915 request.append(mSessionURL); 916 request.append(" RTSP/1.0\r\n"); 917 918 request.append("Session: "); 919 request.append(mSessionID); 920 request.append("\r\n"); 921 922 request.append("\r\n"); 923 924 mConn->sendRequest(request.c_str(), reply); 925 break; 926 } 927 928 case 'tear': 929 { 930 int32_t result; 931 CHECK(msg->findInt32("result", &result)); 932 933 ALOGI("TEARDOWN completed with result %d (%s)", 934 result, strerror(-result)); 935 936 sp<AMessage> reply = new AMessage('disc', this); 937 938 int32_t reconnect; 939 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 940 reply->setInt32("reconnect", true); 941 } 942 943 mConn->disconnect(reply); 944 break; 945 } 946 947 case 'quit': 948 { 949 sp<AMessage> msg = mNotify->dup(); 950 msg->setInt32("what", kWhatDisconnected); 951 msg->setInt32("result", UNKNOWN_ERROR); 952 msg->post(); 953 break; 954 } 955 956 case 'chek': 957 { 958 int32_t generation; 959 CHECK(msg->findInt32("generation", &generation)); 960 if (generation != mCheckGeneration) { 961 // This is an outdated message. Ignore. 962 break; 963 } 964 965 if (mNumAccessUnitsReceived == 0) { 966#if 1 967 ALOGI("stream ended? aborting."); 968 (new AMessage('abor', this))->post(); 969 break; 970#else 971 ALOGI("haven't seen an AU in a looong time."); 972#endif 973 } 974 975 mNumAccessUnitsReceived = 0; 976 msg->post(kAccessUnitTimeoutUs); 977 break; 978 } 979 980 case 'accu': 981 { 982 int32_t timeUpdate; 983 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 984 size_t trackIndex; 985 CHECK(msg->findSize("track-index", &trackIndex)); 986 987 uint32_t rtpTime; 988 uint64_t ntpTime; 989 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 990 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 991 992 onTimeUpdate(trackIndex, rtpTime, ntpTime); 993 break; 994 } 995 996 int32_t first; 997 if (msg->findInt32("first-rtcp", &first)) { 998 mReceivedFirstRTCPPacket = true; 999 break; 1000 } 1001 1002 if (msg->findInt32("first-rtp", &first)) { 1003 mReceivedFirstRTPPacket = true; 1004 break; 1005 } 1006 1007 ++mNumAccessUnitsReceived; 1008 postAccessUnitTimeoutCheck(); 1009 1010 size_t trackIndex; 1011 CHECK(msg->findSize("track-index", &trackIndex)); 1012 1013 if (trackIndex >= mTracks.size()) { 1014 ALOGV("late packets ignored."); 1015 break; 1016 } 1017 1018 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1019 1020 int32_t eos; 1021 if (msg->findInt32("eos", &eos)) { 1022 ALOGI("received BYE on track index %zu", trackIndex); 1023 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1024 ALOGI("No time established => fake existing data"); 1025 1026 track->mEOSReceived = true; 1027 mTryFakeRTCP = true; 1028 mReceivedFirstRTCPPacket = true; 1029 fakeTimestamps(); 1030 } else { 1031 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1032 } 1033 return; 1034 } 1035 1036 sp<ABuffer> accessUnit; 1037 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1038 1039 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1040 1041 if (mSeekPending) { 1042 ALOGV("we're seeking, dropping stale packet."); 1043 break; 1044 } 1045 1046 if (seqNum < track->mFirstSeqNumInSegment) { 1047 ALOGV("dropping stale access-unit (%d < %d)", 1048 seqNum, track->mFirstSeqNumInSegment); 1049 break; 1050 } 1051 1052 if (track->mNewSegment) { 1053 track->mNewSegment = false; 1054 } 1055 1056 onAccessUnitComplete(trackIndex, accessUnit); 1057 break; 1058 } 1059 1060 case 'paus': 1061 { 1062 int32_t generation; 1063 CHECK(msg->findInt32("pausecheck", &generation)); 1064 if (generation != mPauseGeneration) { 1065 ALOGV("Ignoring outdated pause message."); 1066 break; 1067 } 1068 1069 if (!mSeekable) { 1070 ALOGW("This is a live stream, ignoring pause request."); 1071 break; 1072 } 1073 mCheckPending = true; 1074 ++mCheckGeneration; 1075 mPausing = true; 1076 1077 AString request = "PAUSE "; 1078 request.append(mControlURL); 1079 request.append(" RTSP/1.0\r\n"); 1080 1081 request.append("Session: "); 1082 request.append(mSessionID); 1083 request.append("\r\n"); 1084 1085 request.append("\r\n"); 1086 1087 sp<AMessage> reply = new AMessage('pau2', this); 1088 mConn->sendRequest(request.c_str(), reply); 1089 break; 1090 } 1091 1092 case 'pau2': 1093 { 1094 int32_t result; 1095 CHECK(msg->findInt32("result", &result)); 1096 mCheckTimeoutGeneration++; 1097 1098 ALOGI("PAUSE completed with result %d (%s)", 1099 result, strerror(-result)); 1100 break; 1101 } 1102 1103 case 'resu': 1104 { 1105 if (mPausing && mSeekPending) { 1106 // If seeking, Play will be sent from see1 instead 1107 break; 1108 } 1109 1110 if (!mPausing) { 1111 // Dont send PLAY if we have not paused 1112 break; 1113 } 1114 AString request = "PLAY "; 1115 request.append(mControlURL); 1116 request.append(" RTSP/1.0\r\n"); 1117 1118 request.append("Session: "); 1119 request.append(mSessionID); 1120 request.append("\r\n"); 1121 1122 request.append("\r\n"); 1123 1124 sp<AMessage> reply = new AMessage('res2', this); 1125 mConn->sendRequest(request.c_str(), reply); 1126 break; 1127 } 1128 1129 case 'res2': 1130 { 1131 int32_t result; 1132 CHECK(msg->findInt32("result", &result)); 1133 1134 ALOGI("PLAY completed with result %d (%s)", 1135 result, strerror(-result)); 1136 1137 mCheckPending = false; 1138 postAccessUnitTimeoutCheck(); 1139 1140 if (result == OK) { 1141 sp<RefBase> obj; 1142 CHECK(msg->findObject("response", &obj)); 1143 sp<ARTSPResponse> response = 1144 static_cast<ARTSPResponse *>(obj.get()); 1145 1146 if (response->mStatusCode != 200) { 1147 result = UNKNOWN_ERROR; 1148 } else { 1149 parsePlayResponse(response); 1150 1151 // Post new timeout in order to make sure to use 1152 // fake timestamps if no new Sender Reports arrive 1153 sp<AMessage> timeout = new AMessage('tiou', this); 1154 mCheckTimeoutGeneration++; 1155 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1156 timeout->post(kStartupTimeoutUs); 1157 } 1158 } 1159 1160 if (result != OK) { 1161 ALOGE("resume failed, aborting."); 1162 (new AMessage('abor', this))->post(); 1163 } 1164 1165 mPausing = false; 1166 break; 1167 } 1168 1169 case 'seek': 1170 { 1171 if (!mSeekable) { 1172 ALOGW("This is a live stream, ignoring seek request."); 1173 1174 sp<AMessage> msg = mNotify->dup(); 1175 msg->setInt32("what", kWhatSeekDone); 1176 msg->post(); 1177 break; 1178 } 1179 1180 int64_t timeUs; 1181 CHECK(msg->findInt64("time", &timeUs)); 1182 1183 mSeekPending = true; 1184 1185 // Disable the access unit timeout until we resumed 1186 // playback again. 1187 mCheckPending = true; 1188 ++mCheckGeneration; 1189 1190 sp<AMessage> reply = new AMessage('see0', this); 1191 reply->setInt64("time", timeUs); 1192 1193 if (mPausing) { 1194 // PAUSE already sent 1195 ALOGI("Pause already sent"); 1196 reply->post(); 1197 break; 1198 } 1199 AString request = "PAUSE "; 1200 request.append(mControlURL); 1201 request.append(" RTSP/1.0\r\n"); 1202 1203 request.append("Session: "); 1204 request.append(mSessionID); 1205 request.append("\r\n"); 1206 1207 request.append("\r\n"); 1208 1209 mConn->sendRequest(request.c_str(), reply); 1210 break; 1211 } 1212 1213 case 'see0': 1214 { 1215 // Session is paused now. 1216 status_t err = OK; 1217 msg->findInt32("result", &err); 1218 1219 int64_t timeUs; 1220 CHECK(msg->findInt64("time", &timeUs)); 1221 1222 sp<AMessage> notify = mNotify->dup(); 1223 notify->setInt32("what", kWhatSeekPaused); 1224 notify->setInt32("err", err); 1225 notify->setInt64("time", timeUs); 1226 notify->post(); 1227 break; 1228 1229 } 1230 1231 case 'see1': 1232 { 1233 for (size_t i = 0; i < mTracks.size(); ++i) { 1234 TrackInfo *info = &mTracks.editItemAt(i); 1235 1236 postQueueSeekDiscontinuity(i); 1237 info->mEOSReceived = false; 1238 1239 info->mRTPAnchor = 0; 1240 info->mNTPAnchorUs = -1; 1241 } 1242 1243 mAllTracksHaveTime = false; 1244 mNTPAnchorUs = -1; 1245 1246 // Start new timeoutgeneration to avoid getting timeout 1247 // before PLAY response arrive 1248 sp<AMessage> timeout = new AMessage('tiou', this); 1249 mCheckTimeoutGeneration++; 1250 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1251 timeout->post(kStartupTimeoutUs); 1252 1253 int64_t timeUs; 1254 CHECK(msg->findInt64("time", &timeUs)); 1255 1256 AString request = "PLAY "; 1257 request.append(mControlURL); 1258 request.append(" RTSP/1.0\r\n"); 1259 1260 request.append("Session: "); 1261 request.append(mSessionID); 1262 request.append("\r\n"); 1263 1264 request.append( 1265 AStringPrintf( 1266 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1267 1268 request.append("\r\n"); 1269 1270 sp<AMessage> reply = new AMessage('see2', this); 1271 mConn->sendRequest(request.c_str(), reply); 1272 break; 1273 } 1274 1275 case 'see2': 1276 { 1277 if (mTracks.size() == 0) { 1278 // We have already hit abor, break 1279 break; 1280 } 1281 1282 int32_t result; 1283 CHECK(msg->findInt32("result", &result)); 1284 1285 ALOGI("PLAY completed with result %d (%s)", 1286 result, strerror(-result)); 1287 1288 mCheckPending = false; 1289 postAccessUnitTimeoutCheck(); 1290 1291 if (result == OK) { 1292 sp<RefBase> obj; 1293 CHECK(msg->findObject("response", &obj)); 1294 sp<ARTSPResponse> response = 1295 static_cast<ARTSPResponse *>(obj.get()); 1296 1297 if (response->mStatusCode != 200) { 1298 result = UNKNOWN_ERROR; 1299 } else { 1300 parsePlayResponse(response); 1301 1302 // Post new timeout in order to make sure to use 1303 // fake timestamps if no new Sender Reports arrive 1304 sp<AMessage> timeout = new AMessage('tiou', this); 1305 mCheckTimeoutGeneration++; 1306 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1307 timeout->post(kStartupTimeoutUs); 1308 1309 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1310 CHECK_GE(i, 0); 1311 1312 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1313 1314 ALOGI("seek completed."); 1315 } 1316 } 1317 1318 if (result != OK) { 1319 ALOGE("seek failed, aborting."); 1320 (new AMessage('abor', this))->post(); 1321 } 1322 1323 mPausing = false; 1324 mSeekPending = false; 1325 1326 sp<AMessage> msg = mNotify->dup(); 1327 msg->setInt32("what", kWhatSeekDone); 1328 msg->post(); 1329 break; 1330 } 1331 1332 case 'biny': 1333 { 1334 sp<ABuffer> buffer; 1335 CHECK(msg->findBuffer("buffer", &buffer)); 1336 1337 int32_t index; 1338 CHECK(buffer->meta()->findInt32("index", &index)); 1339 1340 mRTPConn->injectPacket(index, buffer); 1341 break; 1342 } 1343 1344 case 'tiou': 1345 { 1346 int32_t timeoutGenerationCheck; 1347 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1348 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1349 // This is an outdated message. Ignore. 1350 // This typically happens if a lot of seeks are 1351 // performed, since new timeout messages now are 1352 // posted at seek as well. 1353 break; 1354 } 1355 if (!mReceivedFirstRTCPPacket) { 1356 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1357 ALOGW("We received RTP packets but no RTCP packets, " 1358 "using fake timestamps."); 1359 1360 mTryFakeRTCP = true; 1361 1362 mReceivedFirstRTCPPacket = true; 1363 1364 fakeTimestamps(); 1365 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1366 ALOGW("Never received any data, switching transports."); 1367 1368 mTryTCPInterleaving = true; 1369 1370 sp<AMessage> msg = new AMessage('abor', this); 1371 msg->setInt32("reconnect", true); 1372 msg->post(); 1373 } else { 1374 ALOGW("Never received any data, disconnecting."); 1375 (new AMessage('abor', this))->post(); 1376 } 1377 } else { 1378 if (!mAllTracksHaveTime) { 1379 ALOGW("We received some RTCP packets, but time " 1380 "could not be established on all tracks, now " 1381 "using fake timestamps"); 1382 1383 fakeTimestamps(); 1384 } 1385 } 1386 break; 1387 } 1388 1389 default: 1390 TRESPASS(); 1391 break; 1392 } 1393 } 1394 1395 void postKeepAlive() { 1396 sp<AMessage> msg = new AMessage('aliv', this); 1397 msg->setInt32("generation", mKeepAliveGeneration); 1398 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1399 } 1400 1401 void postAccessUnitTimeoutCheck() { 1402 if (mCheckPending) { 1403 return; 1404 } 1405 1406 mCheckPending = true; 1407 sp<AMessage> check = new AMessage('chek', this); 1408 check->setInt32("generation", mCheckGeneration); 1409 check->post(kAccessUnitTimeoutUs); 1410 } 1411 1412 static void SplitString( 1413 const AString &s, const char *separator, List<AString> *items) { 1414 items->clear(); 1415 size_t start = 0; 1416 while (start < s.size()) { 1417 ssize_t offset = s.find(separator, start); 1418 1419 if (offset < 0) { 1420 items->push_back(AString(s, start, s.size() - start)); 1421 break; 1422 } 1423 1424 items->push_back(AString(s, start, offset - start)); 1425 start = offset + strlen(separator); 1426 } 1427 } 1428 1429 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1430 mPlayResponseParsed = true; 1431 if (mTracks.size() == 0) { 1432 ALOGV("parsePlayResponse: late packets ignored."); 1433 return; 1434 } 1435 1436 ssize_t i = response->mHeaders.indexOfKey("range"); 1437 if (i < 0) { 1438 // Server doesn't even tell use what range it is going to 1439 // play, therefore we won't support seeking. 1440 return; 1441 } 1442 1443 AString range = response->mHeaders.valueAt(i); 1444 ALOGV("Range: %s", range.c_str()); 1445 1446 AString val; 1447 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1448 1449 float npt1, npt2; 1450 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1451 // This is a live stream and therefore not seekable. 1452 1453 ALOGI("This is a live stream"); 1454 return; 1455 } 1456 1457 i = response->mHeaders.indexOfKey("rtp-info"); 1458 CHECK_GE(i, 0); 1459 1460 AString rtpInfo = response->mHeaders.valueAt(i); 1461 List<AString> streamInfos; 1462 SplitString(rtpInfo, ",", &streamInfos); 1463 1464 int n = 1; 1465 for (List<AString>::iterator it = streamInfos.begin(); 1466 it != streamInfos.end(); ++it) { 1467 (*it).trim(); 1468 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1469 1470 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1471 1472 size_t trackIndex = 0; 1473 while (trackIndex < mTracks.size() 1474 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1475 ++trackIndex; 1476 } 1477 CHECK_LT(trackIndex, mTracks.size()); 1478 1479 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1480 1481 char *end; 1482 unsigned long seq = strtoul(val.c_str(), &end, 10); 1483 1484 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1485 info->mFirstSeqNumInSegment = seq; 1486 info->mNewSegment = true; 1487 1488 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1489 1490 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1491 1492 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1493 1494 info->mNormalPlayTimeRTP = rtpTime; 1495 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1496 1497 if (!mFirstAccessUnit) { 1498 postNormalPlayTimeMapping( 1499 trackIndex, 1500 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1501 } 1502 1503 ++n; 1504 } 1505 } 1506 1507 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1508 CHECK_GE(index, 0u); 1509 CHECK_LT(index, mTracks.size()); 1510 1511 const TrackInfo &info = mTracks.itemAt(index); 1512 1513 *timeScale = info.mTimeScale; 1514 1515 return info.mPacketSource->getFormat(); 1516 } 1517 1518 size_t countTracks() const { 1519 return mTracks.size(); 1520 } 1521 1522private: 1523 struct TrackInfo { 1524 AString mURL; 1525 int mRTPSocket; 1526 int mRTCPSocket; 1527 bool mUsingInterleavedTCP; 1528 uint32_t mFirstSeqNumInSegment; 1529 bool mNewSegment; 1530 1531 uint32_t mRTPAnchor; 1532 int64_t mNTPAnchorUs; 1533 int32_t mTimeScale; 1534 bool mEOSReceived; 1535 1536 uint32_t mNormalPlayTimeRTP; 1537 int64_t mNormalPlayTimeUs; 1538 1539 sp<APacketSource> mPacketSource; 1540 1541 // Stores packets temporarily while no notion of time 1542 // has been established yet. 1543 List<sp<ABuffer> > mPackets; 1544 }; 1545 1546 sp<AMessage> mNotify; 1547 bool mUIDValid; 1548 uid_t mUID; 1549 sp<ALooper> mNetLooper; 1550 sp<ARTSPConnection> mConn; 1551 sp<ARTPConnection> mRTPConn; 1552 sp<ASessionDescription> mSessionDesc; 1553 AString mOriginalSessionURL; // This one still has user:pass@ 1554 AString mSessionURL; 1555 AString mSessionHost; 1556 AString mBaseURL; 1557 AString mControlURL; 1558 AString mSessionID; 1559 bool mSetupTracksSuccessful; 1560 bool mSeekPending; 1561 bool mFirstAccessUnit; 1562 1563 bool mAllTracksHaveTime; 1564 int64_t mNTPAnchorUs; 1565 int64_t mMediaAnchorUs; 1566 int64_t mLastMediaTimeUs; 1567 1568 int64_t mNumAccessUnitsReceived; 1569 bool mCheckPending; 1570 int32_t mCheckGeneration; 1571 int32_t mCheckTimeoutGeneration; 1572 bool mTryTCPInterleaving; 1573 bool mTryFakeRTCP; 1574 bool mReceivedFirstRTCPPacket; 1575 bool mReceivedFirstRTPPacket; 1576 bool mSeekable; 1577 int64_t mKeepAliveTimeoutUs; 1578 int32_t mKeepAliveGeneration; 1579 bool mPausing; 1580 int32_t mPauseGeneration; 1581 1582 Vector<TrackInfo> mTracks; 1583 1584 bool mPlayResponseParsed; 1585 1586 void setupTrack(size_t index) { 1587 sp<APacketSource> source = 1588 new APacketSource(mSessionDesc, index); 1589 1590 if (source->initCheck() != OK) { 1591 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1592 1593 sp<AMessage> reply = new AMessage('setu', this); 1594 reply->setSize("index", index); 1595 reply->setInt32("result", ERROR_UNSUPPORTED); 1596 reply->post(); 1597 return; 1598 } 1599 1600 AString url; 1601 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1602 1603 AString trackURL; 1604 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1605 1606 mTracks.push(TrackInfo()); 1607 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1608 info->mURL = trackURL; 1609 info->mPacketSource = source; 1610 info->mUsingInterleavedTCP = false; 1611 info->mFirstSeqNumInSegment = 0; 1612 info->mNewSegment = true; 1613 info->mRTPSocket = -1; 1614 info->mRTCPSocket = -1; 1615 info->mRTPAnchor = 0; 1616 info->mNTPAnchorUs = -1; 1617 info->mNormalPlayTimeRTP = 0; 1618 info->mNormalPlayTimeUs = 0ll; 1619 1620 unsigned long PT; 1621 AString formatDesc; 1622 AString formatParams; 1623 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1624 1625 int32_t timescale; 1626 int32_t numChannels; 1627 ASessionDescription::ParseFormatDesc( 1628 formatDesc.c_str(), ×cale, &numChannels); 1629 1630 info->mTimeScale = timescale; 1631 info->mEOSReceived = false; 1632 1633 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1634 1635 AString request = "SETUP "; 1636 request.append(trackURL); 1637 request.append(" RTSP/1.0\r\n"); 1638 1639 if (mTryTCPInterleaving) { 1640 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1641 info->mUsingInterleavedTCP = true; 1642 info->mRTPSocket = interleaveIndex; 1643 info->mRTCPSocket = interleaveIndex + 1; 1644 1645 request.append("Transport: RTP/AVP/TCP;interleaved="); 1646 request.append(interleaveIndex); 1647 request.append("-"); 1648 request.append(interleaveIndex + 1); 1649 } else { 1650 unsigned rtpPort; 1651 ARTPConnection::MakePortPair( 1652 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1653 1654 if (mUIDValid) { 1655 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1656 (uint32_t)*(uint32_t*) "RTP_"); 1657 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1658 (uint32_t)*(uint32_t*) "RTP_"); 1659 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1660 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1661 } 1662 1663 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1664 request.append(rtpPort); 1665 request.append("-"); 1666 request.append(rtpPort + 1); 1667 } 1668 1669 request.append("\r\n"); 1670 1671 if (index > 1) { 1672 request.append("Session: "); 1673 request.append(mSessionID); 1674 request.append("\r\n"); 1675 } 1676 1677 request.append("\r\n"); 1678 1679 sp<AMessage> reply = new AMessage('setu', this); 1680 reply->setSize("index", index); 1681 reply->setSize("track-index", mTracks.size() - 1); 1682 mConn->sendRequest(request.c_str(), reply); 1683 } 1684 1685 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1686 out->clear(); 1687 1688 if (strncasecmp("rtsp://", baseURL, 7)) { 1689 // Base URL must be absolute 1690 return false; 1691 } 1692 1693 if (!strncasecmp("rtsp://", url, 7)) { 1694 // "url" is already an absolute URL, ignore base URL. 1695 out->setTo(url); 1696 return true; 1697 } 1698 1699 size_t n = strlen(baseURL); 1700 out->setTo(baseURL); 1701 if (baseURL[n - 1] != '/') { 1702 out->append("/"); 1703 } 1704 out->append(url); 1705 1706 return true; 1707 } 1708 1709 void fakeTimestamps() { 1710 mNTPAnchorUs = -1ll; 1711 for (size_t i = 0; i < mTracks.size(); ++i) { 1712 onTimeUpdate(i, 0, 0ll); 1713 } 1714 } 1715 1716 bool dataReceivedOnAllChannels() { 1717 TrackInfo *track; 1718 for (size_t i = 0; i < mTracks.size(); ++i) { 1719 track = &mTracks.editItemAt(i); 1720 if (track->mPackets.empty()) { 1721 return false; 1722 } 1723 } 1724 return true; 1725 } 1726 1727 void handleFirstAccessUnit() { 1728 if (mFirstAccessUnit) { 1729 sp<AMessage> msg = mNotify->dup(); 1730 msg->setInt32("what", kWhatConnected); 1731 msg->post(); 1732 1733 if (mSeekable) { 1734 for (size_t i = 0; i < mTracks.size(); ++i) { 1735 TrackInfo *info = &mTracks.editItemAt(i); 1736 1737 postNormalPlayTimeMapping( 1738 i, 1739 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1740 } 1741 } 1742 1743 mFirstAccessUnit = false; 1744 } 1745 } 1746 1747 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1748 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1749 trackIndex, rtpTime, (long long)ntpTime); 1750 1751 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1752 1753 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1754 1755 track->mRTPAnchor = rtpTime; 1756 track->mNTPAnchorUs = ntpTimeUs; 1757 1758 if (mNTPAnchorUs < 0) { 1759 mNTPAnchorUs = ntpTimeUs; 1760 mMediaAnchorUs = mLastMediaTimeUs; 1761 } 1762 1763 if (!mAllTracksHaveTime) { 1764 bool allTracksHaveTime = (mTracks.size() > 0); 1765 for (size_t i = 0; i < mTracks.size(); ++i) { 1766 TrackInfo *track = &mTracks.editItemAt(i); 1767 if (track->mNTPAnchorUs < 0) { 1768 allTracksHaveTime = false; 1769 break; 1770 } 1771 } 1772 if (allTracksHaveTime) { 1773 mAllTracksHaveTime = true; 1774 ALOGI("Time now established for all tracks."); 1775 } 1776 } 1777 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1778 handleFirstAccessUnit(); 1779 1780 // Time is now established, lets start timestamping immediately 1781 for (size_t i = 0; i < mTracks.size(); ++i) { 1782 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1783 while (!trackInfo->mPackets.empty()) { 1784 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1785 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1786 1787 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1788 postQueueAccessUnit(i, accessUnit); 1789 } 1790 } 1791 } 1792 for (size_t i = 0; i < mTracks.size(); ++i) { 1793 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1794 if (trackInfo->mEOSReceived) { 1795 postQueueEOS(i, ERROR_END_OF_STREAM); 1796 trackInfo->mEOSReceived = false; 1797 } 1798 } 1799 } 1800 } 1801 1802 void onAccessUnitComplete( 1803 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1804 ALOGV("onAccessUnitComplete track %d", trackIndex); 1805 1806 if(!mPlayResponseParsed){ 1807 ALOGI("play response is not parsed, storing accessunit"); 1808 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1809 track->mPackets.push_back(accessUnit); 1810 return; 1811 } 1812 1813 handleFirstAccessUnit(); 1814 1815 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1816 1817 if (!mAllTracksHaveTime) { 1818 ALOGV("storing accessUnit, no time established yet"); 1819 track->mPackets.push_back(accessUnit); 1820 return; 1821 } 1822 1823 while (!track->mPackets.empty()) { 1824 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1825 track->mPackets.erase(track->mPackets.begin()); 1826 1827 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1828 postQueueAccessUnit(trackIndex, accessUnit); 1829 } 1830 } 1831 1832 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1833 postQueueAccessUnit(trackIndex, accessUnit); 1834 } 1835 1836 if (track->mEOSReceived) { 1837 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1838 track->mEOSReceived = false; 1839 } 1840 } 1841 1842 bool addMediaTimestamp( 1843 int32_t trackIndex, const TrackInfo *track, 1844 const sp<ABuffer> &accessUnit) { 1845 UNUSED_UNLESS_VERBOSE(trackIndex); 1846 1847 uint32_t rtpTime; 1848 CHECK(accessUnit->meta()->findInt32( 1849 "rtp-time", (int32_t *)&rtpTime)); 1850 1851 int64_t relRtpTimeUs = 1852 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1853 / track->mTimeScale; 1854 1855 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1856 1857 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1858 1859 if (mediaTimeUs > mLastMediaTimeUs) { 1860 mLastMediaTimeUs = mediaTimeUs; 1861 } 1862 1863 if (mediaTimeUs < 0) { 1864 ALOGV("dropping early accessUnit."); 1865 return false; 1866 } 1867 1868 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1869 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1870 1871 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1872 1873 return true; 1874 } 1875 1876 void postQueueAccessUnit( 1877 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1878 sp<AMessage> msg = mNotify->dup(); 1879 msg->setInt32("what", kWhatAccessUnit); 1880 msg->setSize("trackIndex", trackIndex); 1881 msg->setBuffer("accessUnit", accessUnit); 1882 msg->post(); 1883 } 1884 1885 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1886 sp<AMessage> msg = mNotify->dup(); 1887 msg->setInt32("what", kWhatEOS); 1888 msg->setSize("trackIndex", trackIndex); 1889 msg->setInt32("finalResult", finalResult); 1890 msg->post(); 1891 } 1892 1893 void postQueueSeekDiscontinuity(size_t trackIndex) { 1894 sp<AMessage> msg = mNotify->dup(); 1895 msg->setInt32("what", kWhatSeekDiscontinuity); 1896 msg->setSize("trackIndex", trackIndex); 1897 msg->post(); 1898 } 1899 1900 void postNormalPlayTimeMapping( 1901 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1902 sp<AMessage> msg = mNotify->dup(); 1903 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1904 msg->setSize("trackIndex", trackIndex); 1905 msg->setInt32("rtpTime", rtpTime); 1906 msg->setInt64("nptUs", nptUs); 1907 msg->post(); 1908 } 1909 1910 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1911}; 1912 1913} // namespace android 1914 1915#endif // MY_HANDLER_H_ 1916