1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22 23#ifndef LOG_TAG 24#define LOG_TAG "MyHandler" 25#endif 26 27#include <utils/Log.h> 28 29#include "APacketSource.h" 30#include "ARTPConnection.h" 31#include "ARTSPConnection.h" 32#include "ASessionDescription.h" 33 34#include <ctype.h> 35 36#include <media/stagefright/foundation/ABuffer.h> 37#include <media/stagefright/foundation/ADebug.h> 38#include <media/stagefright/foundation/ALooper.h> 39#include <media/stagefright/foundation/AMessage.h> 40#include <media/stagefright/MediaErrors.h> 41#include <media/stagefright/Utils.h> 42 43#include <arpa/inet.h> 44#include <sys/socket.h> 45#include <netdb.h> 46 47#include "HTTPBase.h" 48 49#if LOG_NDEBUG 50#define UNUSED_UNLESS_VERBOSE(x) (void)(x) 51#else 52#define UNUSED_UNLESS_VERBOSE(x) 53#endif 54 55// If no access units are received within 5 secs, assume that the rtp 56// stream has ended and signal end of stream. 57static int64_t kAccessUnitTimeoutUs = 10000000ll; 58 59// If no access units arrive for the first 10 secs after starting the 60// stream, assume none ever will and signal EOS or switch transports. 61static int64_t kStartupTimeoutUs = 10000000ll; 62 63static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 64 65static int64_t kPauseDelayUs = 3000000ll; 66 67namespace android { 68 69static bool GetAttribute(const char *s, const char *key, AString *value) { 70 value->clear(); 71 72 size_t keyLen = strlen(key); 73 74 for (;;) { 75 while (isspace(*s)) { 76 ++s; 77 } 78 79 const char *colonPos = strchr(s, ';'); 80 81 size_t len = 82 (colonPos == NULL) ? strlen(s) : colonPos - s; 83 84 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 85 value->setTo(&s[keyLen + 1], len - keyLen - 1); 86 return true; 87 } 88 89 if (colonPos == NULL) { 90 return false; 91 } 92 93 s = colonPos + 1; 94 } 95} 96 97struct MyHandler : public AHandler { 98 enum { 99 kWhatConnected = 'conn', 100 kWhatDisconnected = 'disc', 101 kWhatSeekDone = 'sdon', 102 103 kWhatAccessUnit = 'accU', 104 kWhatEOS = 'eos!', 105 kWhatSeekDiscontinuity = 'seeD', 106 kWhatNormalPlayTimeMapping = 'nptM', 107 }; 108 109 MyHandler( 110 const char *url, 111 const sp<AMessage> ¬ify, 112 bool uidValid = false, uid_t uid = 0) 113 : mNotify(notify), 114 mUIDValid(uidValid), 115 mUID(uid), 116 mNetLooper(new ALooper), 117 mConn(new ARTSPConnection(mUIDValid, mUID)), 118 mRTPConn(new ARTPConnection), 119 mOriginalSessionURL(url), 120 mSessionURL(url), 121 mSetupTracksSuccessful(false), 122 mSeekPending(false), 123 mFirstAccessUnit(true), 124 mAllTracksHaveTime(false), 125 mNTPAnchorUs(-1), 126 mMediaAnchorUs(-1), 127 mLastMediaTimeUs(0), 128 mNumAccessUnitsReceived(0), 129 mCheckPending(false), 130 mCheckGeneration(0), 131 mCheckTimeoutGeneration(0), 132 mTryTCPInterleaving(false), 133 mTryFakeRTCP(false), 134 mReceivedFirstRTCPPacket(false), 135 mReceivedFirstRTPPacket(false), 136 mSeekable(true), 137 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 138 mKeepAliveGeneration(0), 139 mPausing(false), 140 mPauseGeneration(0), 141 mPlayResponseParsed(false) { 142 mNetLooper->setName("rtsp net"); 143 mNetLooper->start(false /* runOnCallingThread */, 144 false /* canCallJava */, 145 PRIORITY_HIGHEST); 146 147 // Strip any authentication info from the session url, we don't 148 // want to transmit user/pass in cleartext. 149 AString host, path, user, pass; 150 unsigned port; 151 CHECK(ARTSPConnection::ParseURL( 152 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 153 154 if (user.size() > 0) { 155 mSessionURL.clear(); 156 mSessionURL.append("rtsp://"); 157 mSessionURL.append(host); 158 mSessionURL.append(":"); 159 mSessionURL.append(StringPrintf("%u", port)); 160 mSessionURL.append(path); 161 162 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 163 } 164 165 mSessionHost = host; 166 } 167 168 void connect() { 169 looper()->registerHandler(mConn); 170 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 171 172 sp<AMessage> notify = new AMessage('biny', id()); 173 mConn->observeBinaryData(notify); 174 175 sp<AMessage> reply = new AMessage('conn', id()); 176 mConn->connect(mOriginalSessionURL.c_str(), reply); 177 } 178 179 void loadSDP(const sp<ASessionDescription>& desc) { 180 looper()->registerHandler(mConn); 181 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 182 183 sp<AMessage> notify = new AMessage('biny', id()); 184 mConn->observeBinaryData(notify); 185 186 sp<AMessage> reply = new AMessage('sdpl', id()); 187 reply->setObject("description", desc); 188 mConn->connect(mOriginalSessionURL.c_str(), reply); 189 } 190 191 AString getControlURL() { 192 AString sessionLevelControlURL; 193 if (mSessionDesc->findAttribute( 194 0, 195 "a=control", 196 &sessionLevelControlURL)) { 197 if (sessionLevelControlURL.compare("*") == 0) { 198 return mBaseURL; 199 } else { 200 AString controlURL; 201 CHECK(MakeURL( 202 mBaseURL.c_str(), 203 sessionLevelControlURL.c_str(), 204 &controlURL)); 205 return controlURL; 206 } 207 } else { 208 return mSessionURL; 209 } 210 } 211 212 void disconnect() { 213 (new AMessage('abor', id()))->post(); 214 } 215 216 void seek(int64_t timeUs) { 217 sp<AMessage> msg = new AMessage('seek', id()); 218 msg->setInt64("time", timeUs); 219 mPauseGeneration++; 220 msg->post(); 221 } 222 223 bool isSeekable() const { 224 return mSeekable; 225 } 226 227 void pause() { 228 sp<AMessage> msg = new AMessage('paus', id()); 229 mPauseGeneration++; 230 msg->setInt32("pausecheck", mPauseGeneration); 231 msg->post(kPauseDelayUs); 232 } 233 234 void resume() { 235 sp<AMessage> msg = new AMessage('resu', id()); 236 mPauseGeneration++; 237 msg->post(); 238 } 239 240 static void addRR(const sp<ABuffer> &buf) { 241 uint8_t *ptr = buf->data() + buf->size(); 242 ptr[0] = 0x80 | 0; 243 ptr[1] = 201; // RR 244 ptr[2] = 0; 245 ptr[3] = 1; 246 ptr[4] = 0xde; // SSRC 247 ptr[5] = 0xad; 248 ptr[6] = 0xbe; 249 ptr[7] = 0xef; 250 251 buf->setRange(0, buf->size() + 8); 252 } 253 254 static void addSDES(int s, const sp<ABuffer> &buffer) { 255 struct sockaddr_in addr; 256 socklen_t addrSize = sizeof(addr); 257 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 258 inet_aton("0.0.0.0", &(addr.sin_addr)); 259 } 260 261 uint8_t *data = buffer->data() + buffer->size(); 262 data[0] = 0x80 | 1; 263 data[1] = 202; // SDES 264 data[4] = 0xde; // SSRC 265 data[5] = 0xad; 266 data[6] = 0xbe; 267 data[7] = 0xef; 268 269 size_t offset = 8; 270 271 data[offset++] = 1; // CNAME 272 273 AString cname = "stagefright@"; 274 cname.append(inet_ntoa(addr.sin_addr)); 275 data[offset++] = cname.size(); 276 277 memcpy(&data[offset], cname.c_str(), cname.size()); 278 offset += cname.size(); 279 280 data[offset++] = 6; // TOOL 281 282 AString tool = MakeUserAgent(); 283 284 data[offset++] = tool.size(); 285 286 memcpy(&data[offset], tool.c_str(), tool.size()); 287 offset += tool.size(); 288 289 data[offset++] = 0; 290 291 if ((offset % 4) > 0) { 292 size_t count = 4 - (offset % 4); 293 switch (count) { 294 case 3: 295 data[offset++] = 0; 296 case 2: 297 data[offset++] = 0; 298 case 1: 299 data[offset++] = 0; 300 } 301 } 302 303 size_t numWords = (offset / 4) - 1; 304 data[2] = numWords >> 8; 305 data[3] = numWords & 0xff; 306 307 buffer->setRange(buffer->offset(), buffer->size() + offset); 308 } 309 310 // In case we're behind NAT, fire off two UDP packets to the remote 311 // rtp/rtcp ports to poke a hole into the firewall for future incoming 312 // packets. We're going to send an RR/SDES RTCP packet to both of them. 313 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 314 struct sockaddr_in addr; 315 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 316 addr.sin_family = AF_INET; 317 318 AString source; 319 AString server_port; 320 if (!GetAttribute(transport.c_str(), 321 "source", 322 &source)) { 323 ALOGW("Missing 'source' field in Transport response. Using " 324 "RTSP endpoint address."); 325 326 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 327 if (ent == NULL) { 328 ALOGE("Failed to look up address of session host '%s'", 329 mSessionHost.c_str()); 330 331 return false; 332 } 333 334 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 335 } else { 336 addr.sin_addr.s_addr = inet_addr(source.c_str()); 337 } 338 339 if (!GetAttribute(transport.c_str(), 340 "server_port", 341 &server_port)) { 342 ALOGI("Missing 'server_port' field in Transport response."); 343 return false; 344 } 345 346 int rtpPort, rtcpPort; 347 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 348 || rtpPort <= 0 || rtpPort > 65535 349 || rtcpPort <=0 || rtcpPort > 65535 350 || rtcpPort != rtpPort + 1) { 351 ALOGE("Server picked invalid RTP/RTCP port pair %s," 352 " RTP port must be even, RTCP port must be one higher.", 353 server_port.c_str()); 354 355 return false; 356 } 357 358 if (rtpPort & 1) { 359 ALOGW("Server picked an odd RTP port, it should've picked an " 360 "even one, we'll let it pass for now, but this may break " 361 "in the future."); 362 } 363 364 if (addr.sin_addr.s_addr == INADDR_NONE) { 365 return true; 366 } 367 368 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 369 // No firewalls to traverse on the loopback interface. 370 return true; 371 } 372 373 // Make up an RR/SDES RTCP packet. 374 sp<ABuffer> buf = new ABuffer(65536); 375 buf->setRange(0, 0); 376 addRR(buf); 377 addSDES(rtpSocket, buf); 378 379 addr.sin_port = htons(rtpPort); 380 381 ssize_t n = sendto( 382 rtpSocket, buf->data(), buf->size(), 0, 383 (const sockaddr *)&addr, sizeof(addr)); 384 385 if (n < (ssize_t)buf->size()) { 386 ALOGE("failed to poke a hole for RTP packets"); 387 return false; 388 } 389 390 addr.sin_port = htons(rtcpPort); 391 392 n = sendto( 393 rtcpSocket, buf->data(), buf->size(), 0, 394 (const sockaddr *)&addr, sizeof(addr)); 395 396 if (n < (ssize_t)buf->size()) { 397 ALOGE("failed to poke a hole for RTCP packets"); 398 return false; 399 } 400 401 ALOGV("successfully poked holes."); 402 403 return true; 404 } 405 406 static bool isLiveStream(const sp<ASessionDescription> &desc) { 407 AString attrLiveStream; 408 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 409 ssize_t semicolonPos = attrLiveStream.find(";", 2); 410 411 const char* liveStreamValue; 412 if (semicolonPos < 0) { 413 liveStreamValue = attrLiveStream.c_str(); 414 } else { 415 AString valString; 416 valString.setTo(attrLiveStream, 417 semicolonPos + 1, 418 attrLiveStream.size() - semicolonPos - 1); 419 liveStreamValue = valString.c_str(); 420 } 421 422 uint32_t value = strtoul(liveStreamValue, NULL, 10); 423 if (value == 1) { 424 ALOGV("found live stream"); 425 return true; 426 } 427 } else { 428 // It is a live stream if no duration is returned 429 int64_t durationUs; 430 if (!desc->getDurationUs(&durationUs)) { 431 ALOGV("No duration found, assume live stream"); 432 return true; 433 } 434 } 435 436 return false; 437 } 438 439 virtual void onMessageReceived(const sp<AMessage> &msg) { 440 switch (msg->what()) { 441 case 'conn': 442 { 443 int32_t result; 444 CHECK(msg->findInt32("result", &result)); 445 446 ALOGI("connection request completed with result %d (%s)", 447 result, strerror(-result)); 448 449 if (result == OK) { 450 AString request; 451 request = "DESCRIBE "; 452 request.append(mSessionURL); 453 request.append(" RTSP/1.0\r\n"); 454 request.append("Accept: application/sdp\r\n"); 455 request.append("\r\n"); 456 457 sp<AMessage> reply = new AMessage('desc', id()); 458 mConn->sendRequest(request.c_str(), reply); 459 } else { 460 (new AMessage('disc', id()))->post(); 461 } 462 break; 463 } 464 465 case 'disc': 466 { 467 ++mKeepAliveGeneration; 468 469 int32_t reconnect; 470 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 471 sp<AMessage> reply = new AMessage('conn', id()); 472 mConn->connect(mOriginalSessionURL.c_str(), reply); 473 } else { 474 (new AMessage('quit', id()))->post(); 475 } 476 break; 477 } 478 479 case 'desc': 480 { 481 int32_t result; 482 CHECK(msg->findInt32("result", &result)); 483 484 ALOGI("DESCRIBE completed with result %d (%s)", 485 result, strerror(-result)); 486 487 if (result == OK) { 488 sp<RefBase> obj; 489 CHECK(msg->findObject("response", &obj)); 490 sp<ARTSPResponse> response = 491 static_cast<ARTSPResponse *>(obj.get()); 492 493 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 494 ssize_t i = response->mHeaders.indexOfKey("location"); 495 CHECK_GE(i, 0); 496 497 mOriginalSessionURL = response->mHeaders.valueAt(i); 498 mSessionURL = mOriginalSessionURL; 499 500 // Strip any authentication info from the session url, we don't 501 // want to transmit user/pass in cleartext. 502 AString host, path, user, pass; 503 unsigned port; 504 if (ARTSPConnection::ParseURL( 505 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 506 && user.size() > 0) { 507 mSessionURL.clear(); 508 mSessionURL.append("rtsp://"); 509 mSessionURL.append(host); 510 mSessionURL.append(":"); 511 mSessionURL.append(StringPrintf("%u", port)); 512 mSessionURL.append(path); 513 514 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 515 } 516 517 sp<AMessage> reply = new AMessage('conn', id()); 518 mConn->connect(mOriginalSessionURL.c_str(), reply); 519 break; 520 } 521 522 if (response->mStatusCode != 200) { 523 result = UNKNOWN_ERROR; 524 } else if (response->mContent == NULL) { 525 result = ERROR_MALFORMED; 526 ALOGE("The response has no content."); 527 } else { 528 mSessionDesc = new ASessionDescription; 529 530 mSessionDesc->setTo( 531 response->mContent->data(), 532 response->mContent->size()); 533 534 if (!mSessionDesc->isValid()) { 535 ALOGE("Failed to parse session description."); 536 result = ERROR_MALFORMED; 537 } else { 538 ssize_t i = response->mHeaders.indexOfKey("content-base"); 539 if (i >= 0) { 540 mBaseURL = response->mHeaders.valueAt(i); 541 } else { 542 i = response->mHeaders.indexOfKey("content-location"); 543 if (i >= 0) { 544 mBaseURL = response->mHeaders.valueAt(i); 545 } else { 546 mBaseURL = mSessionURL; 547 } 548 } 549 550 mSeekable = !isLiveStream(mSessionDesc); 551 552 if (!mBaseURL.startsWith("rtsp://")) { 553 // Some misbehaving servers specify a relative 554 // URL in one of the locations above, combine 555 // it with the absolute session URL to get 556 // something usable... 557 558 ALOGW("Server specified a non-absolute base URL" 559 ", combining it with the session URL to " 560 "get something usable..."); 561 562 AString tmp; 563 CHECK(MakeURL( 564 mSessionURL.c_str(), 565 mBaseURL.c_str(), 566 &tmp)); 567 568 mBaseURL = tmp; 569 } 570 571 mControlURL = getControlURL(); 572 573 if (mSessionDesc->countTracks() < 2) { 574 // There's no actual tracks in this session. 575 // The first "track" is merely session meta 576 // data. 577 578 ALOGW("Session doesn't contain any playable " 579 "tracks. Aborting."); 580 result = ERROR_UNSUPPORTED; 581 } else { 582 setupTrack(1); 583 } 584 } 585 } 586 } 587 588 if (result != OK) { 589 sp<AMessage> reply = new AMessage('disc', id()); 590 mConn->disconnect(reply); 591 } 592 break; 593 } 594 595 case 'sdpl': 596 { 597 int32_t result; 598 CHECK(msg->findInt32("result", &result)); 599 600 ALOGI("SDP connection request completed with result %d (%s)", 601 result, strerror(-result)); 602 603 if (result == OK) { 604 sp<RefBase> obj; 605 CHECK(msg->findObject("description", &obj)); 606 mSessionDesc = 607 static_cast<ASessionDescription *>(obj.get()); 608 609 if (!mSessionDesc->isValid()) { 610 ALOGE("Failed to parse session description."); 611 result = ERROR_MALFORMED; 612 } else { 613 mBaseURL = mSessionURL; 614 615 mSeekable = !isLiveStream(mSessionDesc); 616 617 mControlURL = getControlURL(); 618 619 if (mSessionDesc->countTracks() < 2) { 620 // There's no actual tracks in this session. 621 // The first "track" is merely session meta 622 // data. 623 624 ALOGW("Session doesn't contain any playable " 625 "tracks. Aborting."); 626 result = ERROR_UNSUPPORTED; 627 } else { 628 setupTrack(1); 629 } 630 } 631 } 632 633 if (result != OK) { 634 sp<AMessage> reply = new AMessage('disc', id()); 635 mConn->disconnect(reply); 636 } 637 break; 638 } 639 640 case 'setu': 641 { 642 size_t index; 643 CHECK(msg->findSize("index", &index)); 644 645 TrackInfo *track = NULL; 646 size_t trackIndex; 647 if (msg->findSize("track-index", &trackIndex)) { 648 track = &mTracks.editItemAt(trackIndex); 649 } 650 651 int32_t result; 652 CHECK(msg->findInt32("result", &result)); 653 654 ALOGI("SETUP(%d) completed with result %d (%s)", 655 index, result, strerror(-result)); 656 657 if (result == OK) { 658 CHECK(track != NULL); 659 660 sp<RefBase> obj; 661 CHECK(msg->findObject("response", &obj)); 662 sp<ARTSPResponse> response = 663 static_cast<ARTSPResponse *>(obj.get()); 664 665 if (response->mStatusCode != 200) { 666 result = UNKNOWN_ERROR; 667 } else { 668 ssize_t i = response->mHeaders.indexOfKey("session"); 669 CHECK_GE(i, 0); 670 671 mSessionID = response->mHeaders.valueAt(i); 672 673 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 674 AString timeoutStr; 675 if (GetAttribute( 676 mSessionID.c_str(), "timeout", &timeoutStr)) { 677 char *end; 678 unsigned long timeoutSecs = 679 strtoul(timeoutStr.c_str(), &end, 10); 680 681 if (end == timeoutStr.c_str() || *end != '\0') { 682 ALOGW("server specified malformed timeout '%s'", 683 timeoutStr.c_str()); 684 685 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 686 } else if (timeoutSecs < 15) { 687 ALOGW("server specified too short a timeout " 688 "(%lu secs), using default.", 689 timeoutSecs); 690 691 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 692 } else { 693 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 694 695 ALOGI("server specified timeout of %lu secs.", 696 timeoutSecs); 697 } 698 } 699 700 i = mSessionID.find(";"); 701 if (i >= 0) { 702 // Remove options, i.e. ";timeout=90" 703 mSessionID.erase(i, mSessionID.size() - i); 704 } 705 706 sp<AMessage> notify = new AMessage('accu', id()); 707 notify->setSize("track-index", trackIndex); 708 709 i = response->mHeaders.indexOfKey("transport"); 710 CHECK_GE(i, 0); 711 712 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 713 if (!track->mUsingInterleavedTCP) { 714 AString transport = response->mHeaders.valueAt(i); 715 716 // We are going to continue even if we were 717 // unable to poke a hole into the firewall... 718 pokeAHole( 719 track->mRTPSocket, 720 track->mRTCPSocket, 721 transport); 722 } 723 724 mRTPConn->addStream( 725 track->mRTPSocket, track->mRTCPSocket, 726 mSessionDesc, index, 727 notify, track->mUsingInterleavedTCP); 728 729 mSetupTracksSuccessful = true; 730 } else { 731 result = BAD_VALUE; 732 } 733 } 734 } 735 736 if (result != OK) { 737 if (track) { 738 if (!track->mUsingInterleavedTCP) { 739 // Clear the tag 740 if (mUIDValid) { 741 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 742 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 743 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 744 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 745 } 746 747 close(track->mRTPSocket); 748 close(track->mRTCPSocket); 749 } 750 751 mTracks.removeItemsAt(trackIndex); 752 } 753 } 754 755 ++index; 756 if (result == OK && index < mSessionDesc->countTracks()) { 757 setupTrack(index); 758 } else if (mSetupTracksSuccessful) { 759 ++mKeepAliveGeneration; 760 postKeepAlive(); 761 762 AString request = "PLAY "; 763 request.append(mControlURL); 764 request.append(" RTSP/1.0\r\n"); 765 766 request.append("Session: "); 767 request.append(mSessionID); 768 request.append("\r\n"); 769 770 request.append("\r\n"); 771 772 sp<AMessage> reply = new AMessage('play', id()); 773 mConn->sendRequest(request.c_str(), reply); 774 } else { 775 sp<AMessage> reply = new AMessage('disc', id()); 776 mConn->disconnect(reply); 777 } 778 break; 779 } 780 781 case 'play': 782 { 783 int32_t result; 784 CHECK(msg->findInt32("result", &result)); 785 786 ALOGI("PLAY completed with result %d (%s)", 787 result, strerror(-result)); 788 789 if (result == OK) { 790 sp<RefBase> obj; 791 CHECK(msg->findObject("response", &obj)); 792 sp<ARTSPResponse> response = 793 static_cast<ARTSPResponse *>(obj.get()); 794 795 if (response->mStatusCode != 200) { 796 result = UNKNOWN_ERROR; 797 } else { 798 parsePlayResponse(response); 799 800 sp<AMessage> timeout = new AMessage('tiou', id()); 801 mCheckTimeoutGeneration++; 802 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 803 timeout->post(kStartupTimeoutUs); 804 } 805 } 806 807 if (result != OK) { 808 sp<AMessage> reply = new AMessage('disc', id()); 809 mConn->disconnect(reply); 810 } 811 812 break; 813 } 814 815 case 'aliv': 816 { 817 int32_t generation; 818 CHECK(msg->findInt32("generation", &generation)); 819 820 if (generation != mKeepAliveGeneration) { 821 // obsolete event. 822 break; 823 } 824 825 AString request; 826 request.append("OPTIONS "); 827 request.append(mSessionURL); 828 request.append(" RTSP/1.0\r\n"); 829 request.append("Session: "); 830 request.append(mSessionID); 831 request.append("\r\n"); 832 request.append("\r\n"); 833 834 sp<AMessage> reply = new AMessage('opts', id()); 835 reply->setInt32("generation", mKeepAliveGeneration); 836 mConn->sendRequest(request.c_str(), reply); 837 break; 838 } 839 840 case 'opts': 841 { 842 int32_t result; 843 CHECK(msg->findInt32("result", &result)); 844 845 ALOGI("OPTIONS completed with result %d (%s)", 846 result, strerror(-result)); 847 848 int32_t generation; 849 CHECK(msg->findInt32("generation", &generation)); 850 851 if (generation != mKeepAliveGeneration) { 852 // obsolete event. 853 break; 854 } 855 856 postKeepAlive(); 857 break; 858 } 859 860 case 'abor': 861 { 862 for (size_t i = 0; i < mTracks.size(); ++i) { 863 TrackInfo *info = &mTracks.editItemAt(i); 864 865 if (!mFirstAccessUnit) { 866 postQueueEOS(i, ERROR_END_OF_STREAM); 867 } 868 869 if (!info->mUsingInterleavedTCP) { 870 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 871 872 // Clear the tag 873 if (mUIDValid) { 874 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 875 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 876 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 877 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 878 } 879 880 close(info->mRTPSocket); 881 close(info->mRTCPSocket); 882 } 883 } 884 mTracks.clear(); 885 mSetupTracksSuccessful = false; 886 mSeekPending = false; 887 mFirstAccessUnit = true; 888 mAllTracksHaveTime = false; 889 mNTPAnchorUs = -1; 890 mMediaAnchorUs = -1; 891 mNumAccessUnitsReceived = 0; 892 mReceivedFirstRTCPPacket = false; 893 mReceivedFirstRTPPacket = false; 894 mPausing = false; 895 mSeekable = true; 896 897 sp<AMessage> reply = new AMessage('tear', id()); 898 899 int32_t reconnect; 900 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 901 reply->setInt32("reconnect", true); 902 } 903 904 AString request; 905 request = "TEARDOWN "; 906 907 // XXX should use aggregate url from SDP here... 908 request.append(mSessionURL); 909 request.append(" RTSP/1.0\r\n"); 910 911 request.append("Session: "); 912 request.append(mSessionID); 913 request.append("\r\n"); 914 915 request.append("\r\n"); 916 917 mConn->sendRequest(request.c_str(), reply); 918 break; 919 } 920 921 case 'tear': 922 { 923 int32_t result; 924 CHECK(msg->findInt32("result", &result)); 925 926 ALOGI("TEARDOWN completed with result %d (%s)", 927 result, strerror(-result)); 928 929 sp<AMessage> reply = new AMessage('disc', id()); 930 931 int32_t reconnect; 932 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 933 reply->setInt32("reconnect", true); 934 } 935 936 mConn->disconnect(reply); 937 break; 938 } 939 940 case 'quit': 941 { 942 sp<AMessage> msg = mNotify->dup(); 943 msg->setInt32("what", kWhatDisconnected); 944 msg->setInt32("result", UNKNOWN_ERROR); 945 msg->post(); 946 break; 947 } 948 949 case 'chek': 950 { 951 int32_t generation; 952 CHECK(msg->findInt32("generation", &generation)); 953 if (generation != mCheckGeneration) { 954 // This is an outdated message. Ignore. 955 break; 956 } 957 958 if (mNumAccessUnitsReceived == 0) { 959#if 1 960 ALOGI("stream ended? aborting."); 961 (new AMessage('abor', id()))->post(); 962 break; 963#else 964 ALOGI("haven't seen an AU in a looong time."); 965#endif 966 } 967 968 mNumAccessUnitsReceived = 0; 969 msg->post(kAccessUnitTimeoutUs); 970 break; 971 } 972 973 case 'accu': 974 { 975 int32_t timeUpdate; 976 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 977 size_t trackIndex; 978 CHECK(msg->findSize("track-index", &trackIndex)); 979 980 uint32_t rtpTime; 981 uint64_t ntpTime; 982 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 983 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 984 985 onTimeUpdate(trackIndex, rtpTime, ntpTime); 986 break; 987 } 988 989 int32_t first; 990 if (msg->findInt32("first-rtcp", &first)) { 991 mReceivedFirstRTCPPacket = true; 992 break; 993 } 994 995 if (msg->findInt32("first-rtp", &first)) { 996 mReceivedFirstRTPPacket = true; 997 break; 998 } 999 1000 ++mNumAccessUnitsReceived; 1001 postAccessUnitTimeoutCheck(); 1002 1003 size_t trackIndex; 1004 CHECK(msg->findSize("track-index", &trackIndex)); 1005 1006 if (trackIndex >= mTracks.size()) { 1007 ALOGV("late packets ignored."); 1008 break; 1009 } 1010 1011 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1012 1013 int32_t eos; 1014 if (msg->findInt32("eos", &eos)) { 1015 ALOGI("received BYE on track index %d", trackIndex); 1016 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1017 ALOGI("No time established => fake existing data"); 1018 1019 track->mEOSReceived = true; 1020 mTryFakeRTCP = true; 1021 mReceivedFirstRTCPPacket = true; 1022 fakeTimestamps(); 1023 } else { 1024 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1025 } 1026 return; 1027 } 1028 1029 sp<ABuffer> accessUnit; 1030 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1031 1032 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1033 1034 if (mSeekPending) { 1035 ALOGV("we're seeking, dropping stale packet."); 1036 break; 1037 } 1038 1039 if (seqNum < track->mFirstSeqNumInSegment) { 1040 ALOGV("dropping stale access-unit (%d < %d)", 1041 seqNum, track->mFirstSeqNumInSegment); 1042 break; 1043 } 1044 1045 if (track->mNewSegment) { 1046 track->mNewSegment = false; 1047 } 1048 1049 onAccessUnitComplete(trackIndex, accessUnit); 1050 break; 1051 } 1052 1053 case 'paus': 1054 { 1055 int32_t generation; 1056 CHECK(msg->findInt32("pausecheck", &generation)); 1057 if (generation != mPauseGeneration) { 1058 ALOGV("Ignoring outdated pause message."); 1059 break; 1060 } 1061 1062 if (!mSeekable) { 1063 ALOGW("This is a live stream, ignoring pause request."); 1064 break; 1065 } 1066 mCheckPending = true; 1067 ++mCheckGeneration; 1068 mPausing = true; 1069 1070 AString request = "PAUSE "; 1071 request.append(mControlURL); 1072 request.append(" RTSP/1.0\r\n"); 1073 1074 request.append("Session: "); 1075 request.append(mSessionID); 1076 request.append("\r\n"); 1077 1078 request.append("\r\n"); 1079 1080 sp<AMessage> reply = new AMessage('pau2', id()); 1081 mConn->sendRequest(request.c_str(), reply); 1082 break; 1083 } 1084 1085 case 'pau2': 1086 { 1087 int32_t result; 1088 CHECK(msg->findInt32("result", &result)); 1089 mCheckTimeoutGeneration++; 1090 1091 ALOGI("PAUSE completed with result %d (%s)", 1092 result, strerror(-result)); 1093 break; 1094 } 1095 1096 case 'resu': 1097 { 1098 if (mPausing && mSeekPending) { 1099 // If seeking, Play will be sent from see1 instead 1100 break; 1101 } 1102 1103 if (!mPausing) { 1104 // Dont send PLAY if we have not paused 1105 break; 1106 } 1107 AString request = "PLAY "; 1108 request.append(mControlURL); 1109 request.append(" RTSP/1.0\r\n"); 1110 1111 request.append("Session: "); 1112 request.append(mSessionID); 1113 request.append("\r\n"); 1114 1115 request.append("\r\n"); 1116 1117 sp<AMessage> reply = new AMessage('res2', id()); 1118 mConn->sendRequest(request.c_str(), reply); 1119 break; 1120 } 1121 1122 case 'res2': 1123 { 1124 int32_t result; 1125 CHECK(msg->findInt32("result", &result)); 1126 1127 ALOGI("PLAY completed with result %d (%s)", 1128 result, strerror(-result)); 1129 1130 mCheckPending = false; 1131 postAccessUnitTimeoutCheck(); 1132 1133 if (result == OK) { 1134 sp<RefBase> obj; 1135 CHECK(msg->findObject("response", &obj)); 1136 sp<ARTSPResponse> response = 1137 static_cast<ARTSPResponse *>(obj.get()); 1138 1139 if (response->mStatusCode != 200) { 1140 result = UNKNOWN_ERROR; 1141 } else { 1142 parsePlayResponse(response); 1143 1144 // Post new timeout in order to make sure to use 1145 // fake timestamps if no new Sender Reports arrive 1146 sp<AMessage> timeout = new AMessage('tiou', id()); 1147 mCheckTimeoutGeneration++; 1148 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1149 timeout->post(kStartupTimeoutUs); 1150 } 1151 } 1152 1153 if (result != OK) { 1154 ALOGE("resume failed, aborting."); 1155 (new AMessage('abor', id()))->post(); 1156 } 1157 1158 mPausing = false; 1159 break; 1160 } 1161 1162 case 'seek': 1163 { 1164 if (!mSeekable) { 1165 ALOGW("This is a live stream, ignoring seek request."); 1166 1167 sp<AMessage> msg = mNotify->dup(); 1168 msg->setInt32("what", kWhatSeekDone); 1169 msg->post(); 1170 break; 1171 } 1172 1173 int64_t timeUs; 1174 CHECK(msg->findInt64("time", &timeUs)); 1175 1176 mSeekPending = true; 1177 1178 // Disable the access unit timeout until we resumed 1179 // playback again. 1180 mCheckPending = true; 1181 ++mCheckGeneration; 1182 1183 sp<AMessage> reply = new AMessage('see1', id()); 1184 reply->setInt64("time", timeUs); 1185 1186 if (mPausing) { 1187 // PAUSE already sent 1188 ALOGI("Pause already sent"); 1189 reply->post(); 1190 break; 1191 } 1192 AString request = "PAUSE "; 1193 request.append(mControlURL); 1194 request.append(" RTSP/1.0\r\n"); 1195 1196 request.append("Session: "); 1197 request.append(mSessionID); 1198 request.append("\r\n"); 1199 1200 request.append("\r\n"); 1201 1202 mConn->sendRequest(request.c_str(), reply); 1203 break; 1204 } 1205 1206 case 'see1': 1207 { 1208 // Session is paused now. 1209 for (size_t i = 0; i < mTracks.size(); ++i) { 1210 TrackInfo *info = &mTracks.editItemAt(i); 1211 1212 postQueueSeekDiscontinuity(i); 1213 info->mEOSReceived = false; 1214 1215 info->mRTPAnchor = 0; 1216 info->mNTPAnchorUs = -1; 1217 } 1218 1219 mAllTracksHaveTime = false; 1220 mNTPAnchorUs = -1; 1221 1222 // Start new timeoutgeneration to avoid getting timeout 1223 // before PLAY response arrive 1224 sp<AMessage> timeout = new AMessage('tiou', id()); 1225 mCheckTimeoutGeneration++; 1226 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1227 timeout->post(kStartupTimeoutUs); 1228 1229 int64_t timeUs; 1230 CHECK(msg->findInt64("time", &timeUs)); 1231 1232 AString request = "PLAY "; 1233 request.append(mControlURL); 1234 request.append(" RTSP/1.0\r\n"); 1235 1236 request.append("Session: "); 1237 request.append(mSessionID); 1238 request.append("\r\n"); 1239 1240 request.append( 1241 StringPrintf( 1242 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1243 1244 request.append("\r\n"); 1245 1246 sp<AMessage> reply = new AMessage('see2', id()); 1247 mConn->sendRequest(request.c_str(), reply); 1248 break; 1249 } 1250 1251 case 'see2': 1252 { 1253 if (mTracks.size() == 0) { 1254 // We have already hit abor, break 1255 break; 1256 } 1257 1258 int32_t result; 1259 CHECK(msg->findInt32("result", &result)); 1260 1261 ALOGI("PLAY completed with result %d (%s)", 1262 result, strerror(-result)); 1263 1264 mCheckPending = false; 1265 postAccessUnitTimeoutCheck(); 1266 1267 if (result == OK) { 1268 sp<RefBase> obj; 1269 CHECK(msg->findObject("response", &obj)); 1270 sp<ARTSPResponse> response = 1271 static_cast<ARTSPResponse *>(obj.get()); 1272 1273 if (response->mStatusCode != 200) { 1274 result = UNKNOWN_ERROR; 1275 } else { 1276 parsePlayResponse(response); 1277 1278 // Post new timeout in order to make sure to use 1279 // fake timestamps if no new Sender Reports arrive 1280 sp<AMessage> timeout = new AMessage('tiou', id()); 1281 mCheckTimeoutGeneration++; 1282 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1283 timeout->post(kStartupTimeoutUs); 1284 1285 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1286 CHECK_GE(i, 0); 1287 1288 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1289 1290 ALOGI("seek completed."); 1291 } 1292 } 1293 1294 if (result != OK) { 1295 ALOGE("seek failed, aborting."); 1296 (new AMessage('abor', id()))->post(); 1297 } 1298 1299 mPausing = false; 1300 mSeekPending = false; 1301 1302 sp<AMessage> msg = mNotify->dup(); 1303 msg->setInt32("what", kWhatSeekDone); 1304 msg->post(); 1305 break; 1306 } 1307 1308 case 'biny': 1309 { 1310 sp<ABuffer> buffer; 1311 CHECK(msg->findBuffer("buffer", &buffer)); 1312 1313 int32_t index; 1314 CHECK(buffer->meta()->findInt32("index", &index)); 1315 1316 mRTPConn->injectPacket(index, buffer); 1317 break; 1318 } 1319 1320 case 'tiou': 1321 { 1322 int32_t timeoutGenerationCheck; 1323 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1324 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1325 // This is an outdated message. Ignore. 1326 // This typically happens if a lot of seeks are 1327 // performed, since new timeout messages now are 1328 // posted at seek as well. 1329 break; 1330 } 1331 if (!mReceivedFirstRTCPPacket) { 1332 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1333 ALOGW("We received RTP packets but no RTCP packets, " 1334 "using fake timestamps."); 1335 1336 mTryFakeRTCP = true; 1337 1338 mReceivedFirstRTCPPacket = true; 1339 1340 fakeTimestamps(); 1341 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1342 ALOGW("Never received any data, switching transports."); 1343 1344 mTryTCPInterleaving = true; 1345 1346 sp<AMessage> msg = new AMessage('abor', id()); 1347 msg->setInt32("reconnect", true); 1348 msg->post(); 1349 } else { 1350 ALOGW("Never received any data, disconnecting."); 1351 (new AMessage('abor', id()))->post(); 1352 } 1353 } else { 1354 if (!mAllTracksHaveTime) { 1355 ALOGW("We received some RTCP packets, but time " 1356 "could not be established on all tracks, now " 1357 "using fake timestamps"); 1358 1359 fakeTimestamps(); 1360 } 1361 } 1362 break; 1363 } 1364 1365 default: 1366 TRESPASS(); 1367 break; 1368 } 1369 } 1370 1371 void postKeepAlive() { 1372 sp<AMessage> msg = new AMessage('aliv', id()); 1373 msg->setInt32("generation", mKeepAliveGeneration); 1374 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1375 } 1376 1377 void postAccessUnitTimeoutCheck() { 1378 if (mCheckPending) { 1379 return; 1380 } 1381 1382 mCheckPending = true; 1383 sp<AMessage> check = new AMessage('chek', id()); 1384 check->setInt32("generation", mCheckGeneration); 1385 check->post(kAccessUnitTimeoutUs); 1386 } 1387 1388 static void SplitString( 1389 const AString &s, const char *separator, List<AString> *items) { 1390 items->clear(); 1391 size_t start = 0; 1392 while (start < s.size()) { 1393 ssize_t offset = s.find(separator, start); 1394 1395 if (offset < 0) { 1396 items->push_back(AString(s, start, s.size() - start)); 1397 break; 1398 } 1399 1400 items->push_back(AString(s, start, offset - start)); 1401 start = offset + strlen(separator); 1402 } 1403 } 1404 1405 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1406 mPlayResponseParsed = true; 1407 if (mTracks.size() == 0) { 1408 ALOGV("parsePlayResponse: late packets ignored."); 1409 return; 1410 } 1411 1412 ssize_t i = response->mHeaders.indexOfKey("range"); 1413 if (i < 0) { 1414 // Server doesn't even tell use what range it is going to 1415 // play, therefore we won't support seeking. 1416 return; 1417 } 1418 1419 AString range = response->mHeaders.valueAt(i); 1420 ALOGV("Range: %s", range.c_str()); 1421 1422 AString val; 1423 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1424 1425 float npt1, npt2; 1426 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1427 // This is a live stream and therefore not seekable. 1428 1429 ALOGI("This is a live stream"); 1430 return; 1431 } 1432 1433 i = response->mHeaders.indexOfKey("rtp-info"); 1434 CHECK_GE(i, 0); 1435 1436 AString rtpInfo = response->mHeaders.valueAt(i); 1437 List<AString> streamInfos; 1438 SplitString(rtpInfo, ",", &streamInfos); 1439 1440 int n = 1; 1441 for (List<AString>::iterator it = streamInfos.begin(); 1442 it != streamInfos.end(); ++it) { 1443 (*it).trim(); 1444 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1445 1446 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1447 1448 size_t trackIndex = 0; 1449 while (trackIndex < mTracks.size() 1450 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1451 ++trackIndex; 1452 } 1453 CHECK_LT(trackIndex, mTracks.size()); 1454 1455 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1456 1457 char *end; 1458 unsigned long seq = strtoul(val.c_str(), &end, 10); 1459 1460 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1461 info->mFirstSeqNumInSegment = seq; 1462 info->mNewSegment = true; 1463 1464 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1465 1466 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1467 1468 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1469 1470 info->mNormalPlayTimeRTP = rtpTime; 1471 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1472 1473 if (!mFirstAccessUnit) { 1474 postNormalPlayTimeMapping( 1475 trackIndex, 1476 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1477 } 1478 1479 ++n; 1480 } 1481 } 1482 1483 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1484 CHECK_GE(index, 0u); 1485 CHECK_LT(index, mTracks.size()); 1486 1487 const TrackInfo &info = mTracks.itemAt(index); 1488 1489 *timeScale = info.mTimeScale; 1490 1491 return info.mPacketSource->getFormat(); 1492 } 1493 1494 size_t countTracks() const { 1495 return mTracks.size(); 1496 } 1497 1498private: 1499 struct TrackInfo { 1500 AString mURL; 1501 int mRTPSocket; 1502 int mRTCPSocket; 1503 bool mUsingInterleavedTCP; 1504 uint32_t mFirstSeqNumInSegment; 1505 bool mNewSegment; 1506 1507 uint32_t mRTPAnchor; 1508 int64_t mNTPAnchorUs; 1509 int32_t mTimeScale; 1510 bool mEOSReceived; 1511 1512 uint32_t mNormalPlayTimeRTP; 1513 int64_t mNormalPlayTimeUs; 1514 1515 sp<APacketSource> mPacketSource; 1516 1517 // Stores packets temporarily while no notion of time 1518 // has been established yet. 1519 List<sp<ABuffer> > mPackets; 1520 }; 1521 1522 sp<AMessage> mNotify; 1523 bool mUIDValid; 1524 uid_t mUID; 1525 sp<ALooper> mNetLooper; 1526 sp<ARTSPConnection> mConn; 1527 sp<ARTPConnection> mRTPConn; 1528 sp<ASessionDescription> mSessionDesc; 1529 AString mOriginalSessionURL; // This one still has user:pass@ 1530 AString mSessionURL; 1531 AString mSessionHost; 1532 AString mBaseURL; 1533 AString mControlURL; 1534 AString mSessionID; 1535 bool mSetupTracksSuccessful; 1536 bool mSeekPending; 1537 bool mFirstAccessUnit; 1538 1539 bool mAllTracksHaveTime; 1540 int64_t mNTPAnchorUs; 1541 int64_t mMediaAnchorUs; 1542 int64_t mLastMediaTimeUs; 1543 1544 int64_t mNumAccessUnitsReceived; 1545 bool mCheckPending; 1546 int32_t mCheckGeneration; 1547 int32_t mCheckTimeoutGeneration; 1548 bool mTryTCPInterleaving; 1549 bool mTryFakeRTCP; 1550 bool mReceivedFirstRTCPPacket; 1551 bool mReceivedFirstRTPPacket; 1552 bool mSeekable; 1553 int64_t mKeepAliveTimeoutUs; 1554 int32_t mKeepAliveGeneration; 1555 bool mPausing; 1556 int32_t mPauseGeneration; 1557 1558 Vector<TrackInfo> mTracks; 1559 1560 bool mPlayResponseParsed; 1561 1562 void setupTrack(size_t index) { 1563 sp<APacketSource> source = 1564 new APacketSource(mSessionDesc, index); 1565 1566 if (source->initCheck() != OK) { 1567 ALOGW("Unsupported format. Ignoring track #%d.", index); 1568 1569 sp<AMessage> reply = new AMessage('setu', id()); 1570 reply->setSize("index", index); 1571 reply->setInt32("result", ERROR_UNSUPPORTED); 1572 reply->post(); 1573 return; 1574 } 1575 1576 AString url; 1577 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1578 1579 AString trackURL; 1580 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1581 1582 mTracks.push(TrackInfo()); 1583 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1584 info->mURL = trackURL; 1585 info->mPacketSource = source; 1586 info->mUsingInterleavedTCP = false; 1587 info->mFirstSeqNumInSegment = 0; 1588 info->mNewSegment = true; 1589 info->mRTPSocket = -1; 1590 info->mRTCPSocket = -1; 1591 info->mRTPAnchor = 0; 1592 info->mNTPAnchorUs = -1; 1593 info->mNormalPlayTimeRTP = 0; 1594 info->mNormalPlayTimeUs = 0ll; 1595 1596 unsigned long PT; 1597 AString formatDesc; 1598 AString formatParams; 1599 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1600 1601 int32_t timescale; 1602 int32_t numChannels; 1603 ASessionDescription::ParseFormatDesc( 1604 formatDesc.c_str(), ×cale, &numChannels); 1605 1606 info->mTimeScale = timescale; 1607 info->mEOSReceived = false; 1608 1609 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1610 1611 AString request = "SETUP "; 1612 request.append(trackURL); 1613 request.append(" RTSP/1.0\r\n"); 1614 1615 if (mTryTCPInterleaving) { 1616 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1617 info->mUsingInterleavedTCP = true; 1618 info->mRTPSocket = interleaveIndex; 1619 info->mRTCPSocket = interleaveIndex + 1; 1620 1621 request.append("Transport: RTP/AVP/TCP;interleaved="); 1622 request.append(interleaveIndex); 1623 request.append("-"); 1624 request.append(interleaveIndex + 1); 1625 } else { 1626 unsigned rtpPort; 1627 ARTPConnection::MakePortPair( 1628 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1629 1630 if (mUIDValid) { 1631 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1632 (uint32_t)*(uint32_t*) "RTP_"); 1633 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1634 (uint32_t)*(uint32_t*) "RTP_"); 1635 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1636 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1637 } 1638 1639 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1640 request.append(rtpPort); 1641 request.append("-"); 1642 request.append(rtpPort + 1); 1643 } 1644 1645 request.append("\r\n"); 1646 1647 if (index > 1) { 1648 request.append("Session: "); 1649 request.append(mSessionID); 1650 request.append("\r\n"); 1651 } 1652 1653 request.append("\r\n"); 1654 1655 sp<AMessage> reply = new AMessage('setu', id()); 1656 reply->setSize("index", index); 1657 reply->setSize("track-index", mTracks.size() - 1); 1658 mConn->sendRequest(request.c_str(), reply); 1659 } 1660 1661 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1662 out->clear(); 1663 1664 if (strncasecmp("rtsp://", baseURL, 7)) { 1665 // Base URL must be absolute 1666 return false; 1667 } 1668 1669 if (!strncasecmp("rtsp://", url, 7)) { 1670 // "url" is already an absolute URL, ignore base URL. 1671 out->setTo(url); 1672 return true; 1673 } 1674 1675 size_t n = strlen(baseURL); 1676 if (baseURL[n - 1] == '/') { 1677 out->setTo(baseURL); 1678 out->append(url); 1679 } else { 1680 const char *slashPos = strrchr(baseURL, '/'); 1681 1682 if (slashPos > &baseURL[6]) { 1683 out->setTo(baseURL, slashPos - baseURL); 1684 } else { 1685 out->setTo(baseURL); 1686 } 1687 1688 out->append("/"); 1689 out->append(url); 1690 } 1691 1692 return true; 1693 } 1694 1695 void fakeTimestamps() { 1696 mNTPAnchorUs = -1ll; 1697 for (size_t i = 0; i < mTracks.size(); ++i) { 1698 onTimeUpdate(i, 0, 0ll); 1699 } 1700 } 1701 1702 bool dataReceivedOnAllChannels() { 1703 TrackInfo *track; 1704 for (size_t i = 0; i < mTracks.size(); ++i) { 1705 track = &mTracks.editItemAt(i); 1706 if (track->mPackets.empty()) { 1707 return false; 1708 } 1709 } 1710 return true; 1711 } 1712 1713 void handleFirstAccessUnit() { 1714 if (mFirstAccessUnit) { 1715 sp<AMessage> msg = mNotify->dup(); 1716 msg->setInt32("what", kWhatConnected); 1717 msg->post(); 1718 1719 if (mSeekable) { 1720 for (size_t i = 0; i < mTracks.size(); ++i) { 1721 TrackInfo *info = &mTracks.editItemAt(i); 1722 1723 postNormalPlayTimeMapping( 1724 i, 1725 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1726 } 1727 } 1728 1729 mFirstAccessUnit = false; 1730 } 1731 } 1732 1733 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1734 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1735 trackIndex, rtpTime, ntpTime); 1736 1737 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1738 1739 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1740 1741 track->mRTPAnchor = rtpTime; 1742 track->mNTPAnchorUs = ntpTimeUs; 1743 1744 if (mNTPAnchorUs < 0) { 1745 mNTPAnchorUs = ntpTimeUs; 1746 mMediaAnchorUs = mLastMediaTimeUs; 1747 } 1748 1749 if (!mAllTracksHaveTime) { 1750 bool allTracksHaveTime = true; 1751 for (size_t i = 0; i < mTracks.size(); ++i) { 1752 TrackInfo *track = &mTracks.editItemAt(i); 1753 if (track->mNTPAnchorUs < 0) { 1754 allTracksHaveTime = false; 1755 break; 1756 } 1757 } 1758 if (allTracksHaveTime) { 1759 mAllTracksHaveTime = true; 1760 ALOGI("Time now established for all tracks."); 1761 } 1762 } 1763 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1764 handleFirstAccessUnit(); 1765 1766 // Time is now established, lets start timestamping immediately 1767 for (size_t i = 0; i < mTracks.size(); ++i) { 1768 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1769 while (!trackInfo->mPackets.empty()) { 1770 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1771 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1772 1773 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1774 postQueueAccessUnit(i, accessUnit); 1775 } 1776 } 1777 } 1778 for (size_t i = 0; i < mTracks.size(); ++i) { 1779 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1780 if (trackInfo->mEOSReceived) { 1781 postQueueEOS(i, ERROR_END_OF_STREAM); 1782 trackInfo->mEOSReceived = false; 1783 } 1784 } 1785 } 1786 } 1787 1788 void onAccessUnitComplete( 1789 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1790 ALOGV("onAccessUnitComplete track %d", trackIndex); 1791 1792 if(!mPlayResponseParsed){ 1793 ALOGI("play response is not parsed, storing accessunit"); 1794 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1795 track->mPackets.push_back(accessUnit); 1796 return; 1797 } 1798 1799 handleFirstAccessUnit(); 1800 1801 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1802 1803 if (!mAllTracksHaveTime) { 1804 ALOGV("storing accessUnit, no time established yet"); 1805 track->mPackets.push_back(accessUnit); 1806 return; 1807 } 1808 1809 while (!track->mPackets.empty()) { 1810 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1811 track->mPackets.erase(track->mPackets.begin()); 1812 1813 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1814 postQueueAccessUnit(trackIndex, accessUnit); 1815 } 1816 } 1817 1818 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1819 postQueueAccessUnit(trackIndex, accessUnit); 1820 } 1821 1822 if (track->mEOSReceived) { 1823 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1824 track->mEOSReceived = false; 1825 } 1826 } 1827 1828 bool addMediaTimestamp( 1829 int32_t trackIndex, const TrackInfo *track, 1830 const sp<ABuffer> &accessUnit) { 1831 UNUSED_UNLESS_VERBOSE(trackIndex); 1832 1833 uint32_t rtpTime; 1834 CHECK(accessUnit->meta()->findInt32( 1835 "rtp-time", (int32_t *)&rtpTime)); 1836 1837 int64_t relRtpTimeUs = 1838 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1839 / track->mTimeScale; 1840 1841 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1842 1843 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1844 1845 if (mediaTimeUs > mLastMediaTimeUs) { 1846 mLastMediaTimeUs = mediaTimeUs; 1847 } 1848 1849 if (mediaTimeUs < 0) { 1850 ALOGV("dropping early accessUnit."); 1851 return false; 1852 } 1853 1854 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1855 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1856 1857 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1858 1859 return true; 1860 } 1861 1862 void postQueueAccessUnit( 1863 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1864 sp<AMessage> msg = mNotify->dup(); 1865 msg->setInt32("what", kWhatAccessUnit); 1866 msg->setSize("trackIndex", trackIndex); 1867 msg->setBuffer("accessUnit", accessUnit); 1868 msg->post(); 1869 } 1870 1871 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1872 sp<AMessage> msg = mNotify->dup(); 1873 msg->setInt32("what", kWhatEOS); 1874 msg->setSize("trackIndex", trackIndex); 1875 msg->setInt32("finalResult", finalResult); 1876 msg->post(); 1877 } 1878 1879 void postQueueSeekDiscontinuity(size_t trackIndex) { 1880 sp<AMessage> msg = mNotify->dup(); 1881 msg->setInt32("what", kWhatSeekDiscontinuity); 1882 msg->setSize("trackIndex", trackIndex); 1883 msg->post(); 1884 } 1885 1886 void postNormalPlayTimeMapping( 1887 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1888 sp<AMessage> msg = mNotify->dup(); 1889 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1890 msg->setSize("trackIndex", trackIndex); 1891 msg->setInt32("rtpTime", rtpTime); 1892 msg->setInt64("nptUs", nptUs); 1893 msg->post(); 1894 } 1895 1896 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1897}; 1898 1899} // namespace android 1900 1901#endif // MY_HANDLER_H_ 1902