MyHandler.h revision a8b8488f703bb6bda039d7d98f87e4f9d845664d
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31 32#include <media/stagefright/foundation/ABuffer.h> 33#include <media/stagefright/foundation/ADebug.h> 34#include <media/stagefright/foundation/ALooper.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/Utils.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41#include <netdb.h> 42 43#include "HTTPBase.h" 44 45// If no access units are received within 5 secs, assume that the rtp 46// stream has ended and signal end of stream. 47static int64_t kAccessUnitTimeoutUs = 10000000ll; 48 49// If no access units arrive for the first 10 secs after starting the 50// stream, assume none ever will and signal EOS or switch transports. 51static int64_t kStartupTimeoutUs = 10000000ll; 52 53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 54 55static int64_t kPauseDelayUs = 3000000ll; 56 57namespace android { 58 59static bool GetAttribute(const char *s, const char *key, AString *value) { 60 value->clear(); 61 62 size_t keyLen = strlen(key); 63 64 for (;;) { 65 while (isspace(*s)) { 66 ++s; 67 } 68 69 const char *colonPos = strchr(s, ';'); 70 71 size_t len = 72 (colonPos == NULL) ? strlen(s) : colonPos - s; 73 74 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 75 value->setTo(&s[keyLen + 1], len - keyLen - 1); 76 return true; 77 } 78 79 if (colonPos == NULL) { 80 return false; 81 } 82 83 s = colonPos + 1; 84 } 85} 86 87struct MyHandler : public AHandler { 88 enum { 89 kWhatConnected = 'conn', 90 kWhatDisconnected = 'disc', 91 kWhatSeekDone = 'sdon', 92 93 kWhatAccessUnit = 'accU', 94 kWhatEOS = 'eos!', 95 kWhatSeekDiscontinuity = 'seeD', 96 kWhatNormalPlayTimeMapping = 'nptM', 97 }; 98 99 MyHandler( 100 const char *url, 101 const sp<AMessage> ¬ify, 102 bool uidValid = false, uid_t uid = 0) 103 : mNotify(notify), 104 mUIDValid(uidValid), 105 mUID(uid), 106 mNetLooper(new ALooper), 107 mConn(new ARTSPConnection(mUIDValid, mUID)), 108 mRTPConn(new ARTPConnection), 109 mOriginalSessionURL(url), 110 mSessionURL(url), 111 mSetupTracksSuccessful(false), 112 mSeekPending(false), 113 mFirstAccessUnit(true), 114 mAllTracksHaveTime(false), 115 mNTPAnchorUs(-1), 116 mMediaAnchorUs(-1), 117 mLastMediaTimeUs(0), 118 mNumAccessUnitsReceived(0), 119 mCheckPending(false), 120 mCheckGeneration(0), 121 mCheckTimeoutGeneration(0), 122 mTryTCPInterleaving(false), 123 mTryFakeRTCP(false), 124 mReceivedFirstRTCPPacket(false), 125 mReceivedFirstRTPPacket(false), 126 mSeekable(true), 127 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 128 mKeepAliveGeneration(0), 129 mPausing(false), 130 mPauseGeneration(0), 131 mPlayResponseParsed(false) { 132 mNetLooper->setName("rtsp net"); 133 mNetLooper->start(false /* runOnCallingThread */, 134 false /* canCallJava */, 135 PRIORITY_HIGHEST); 136 137 // Strip any authentication info from the session url, we don't 138 // want to transmit user/pass in cleartext. 139 AString host, path, user, pass; 140 unsigned port; 141 CHECK(ARTSPConnection::ParseURL( 142 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 143 144 if (user.size() > 0) { 145 mSessionURL.clear(); 146 mSessionURL.append("rtsp://"); 147 mSessionURL.append(host); 148 mSessionURL.append(":"); 149 mSessionURL.append(StringPrintf("%u", port)); 150 mSessionURL.append(path); 151 152 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 153 } 154 155 mSessionHost = host; 156 } 157 158 void connect() { 159 looper()->registerHandler(mConn); 160 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 161 162 sp<AMessage> notify = new AMessage('biny', id()); 163 mConn->observeBinaryData(notify); 164 165 sp<AMessage> reply = new AMessage('conn', id()); 166 mConn->connect(mOriginalSessionURL.c_str(), reply); 167 } 168 169 void loadSDP(const sp<ASessionDescription>& desc) { 170 looper()->registerHandler(mConn); 171 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 172 173 sp<AMessage> notify = new AMessage('biny', id()); 174 mConn->observeBinaryData(notify); 175 176 sp<AMessage> reply = new AMessage('sdpl', id()); 177 reply->setObject("description", desc); 178 mConn->connect(mOriginalSessionURL.c_str(), reply); 179 } 180 181 AString getControlURL(sp<ASessionDescription> desc) { 182 AString sessionLevelControlURL; 183 if (mSessionDesc->findAttribute( 184 0, 185 "a=control", 186 &sessionLevelControlURL)) { 187 if (sessionLevelControlURL.compare("*") == 0) { 188 return mBaseURL; 189 } else { 190 AString controlURL; 191 CHECK(MakeURL( 192 mBaseURL.c_str(), 193 sessionLevelControlURL.c_str(), 194 &controlURL)); 195 return controlURL; 196 } 197 } else { 198 return mSessionURL; 199 } 200 } 201 202 void disconnect() { 203 (new AMessage('abor', id()))->post(); 204 } 205 206 void seek(int64_t timeUs) { 207 sp<AMessage> msg = new AMessage('seek', id()); 208 msg->setInt64("time", timeUs); 209 mPauseGeneration++; 210 msg->post(); 211 } 212 213 bool isSeekable() const { 214 return mSeekable; 215 } 216 217 void pause() { 218 sp<AMessage> msg = new AMessage('paus', id()); 219 mPauseGeneration++; 220 msg->setInt32("pausecheck", mPauseGeneration); 221 msg->post(kPauseDelayUs); 222 } 223 224 void resume() { 225 sp<AMessage> msg = new AMessage('resu', id()); 226 mPauseGeneration++; 227 msg->post(); 228 } 229 230 static void addRR(const sp<ABuffer> &buf) { 231 uint8_t *ptr = buf->data() + buf->size(); 232 ptr[0] = 0x80 | 0; 233 ptr[1] = 201; // RR 234 ptr[2] = 0; 235 ptr[3] = 1; 236 ptr[4] = 0xde; // SSRC 237 ptr[5] = 0xad; 238 ptr[6] = 0xbe; 239 ptr[7] = 0xef; 240 241 buf->setRange(0, buf->size() + 8); 242 } 243 244 static void addSDES(int s, const sp<ABuffer> &buffer) { 245 struct sockaddr_in addr; 246 socklen_t addrSize = sizeof(addr); 247 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 248 249 uint8_t *data = buffer->data() + buffer->size(); 250 data[0] = 0x80 | 1; 251 data[1] = 202; // SDES 252 data[4] = 0xde; // SSRC 253 data[5] = 0xad; 254 data[6] = 0xbe; 255 data[7] = 0xef; 256 257 size_t offset = 8; 258 259 data[offset++] = 1; // CNAME 260 261 AString cname = "stagefright@"; 262 cname.append(inet_ntoa(addr.sin_addr)); 263 data[offset++] = cname.size(); 264 265 memcpy(&data[offset], cname.c_str(), cname.size()); 266 offset += cname.size(); 267 268 data[offset++] = 6; // TOOL 269 270 AString tool = MakeUserAgent(); 271 272 data[offset++] = tool.size(); 273 274 memcpy(&data[offset], tool.c_str(), tool.size()); 275 offset += tool.size(); 276 277 data[offset++] = 0; 278 279 if ((offset % 4) > 0) { 280 size_t count = 4 - (offset % 4); 281 switch (count) { 282 case 3: 283 data[offset++] = 0; 284 case 2: 285 data[offset++] = 0; 286 case 1: 287 data[offset++] = 0; 288 } 289 } 290 291 size_t numWords = (offset / 4) - 1; 292 data[2] = numWords >> 8; 293 data[3] = numWords & 0xff; 294 295 buffer->setRange(buffer->offset(), buffer->size() + offset); 296 } 297 298 // In case we're behind NAT, fire off two UDP packets to the remote 299 // rtp/rtcp ports to poke a hole into the firewall for future incoming 300 // packets. We're going to send an RR/SDES RTCP packet to both of them. 301 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 302 struct sockaddr_in addr; 303 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 304 addr.sin_family = AF_INET; 305 306 AString source; 307 AString server_port; 308 if (!GetAttribute(transport.c_str(), 309 "source", 310 &source)) { 311 ALOGW("Missing 'source' field in Transport response. Using " 312 "RTSP endpoint address."); 313 314 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 315 if (ent == NULL) { 316 ALOGE("Failed to look up address of session host '%s'", 317 mSessionHost.c_str()); 318 319 return false; 320 } 321 322 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 323 } else { 324 addr.sin_addr.s_addr = inet_addr(source.c_str()); 325 } 326 327 if (!GetAttribute(transport.c_str(), 328 "server_port", 329 &server_port)) { 330 ALOGI("Missing 'server_port' field in Transport response."); 331 return false; 332 } 333 334 int rtpPort, rtcpPort; 335 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 336 || rtpPort <= 0 || rtpPort > 65535 337 || rtcpPort <=0 || rtcpPort > 65535 338 || rtcpPort != rtpPort + 1) { 339 ALOGE("Server picked invalid RTP/RTCP port pair %s," 340 " RTP port must be even, RTCP port must be one higher.", 341 server_port.c_str()); 342 343 return false; 344 } 345 346 if (rtpPort & 1) { 347 ALOGW("Server picked an odd RTP port, it should've picked an " 348 "even one, we'll let it pass for now, but this may break " 349 "in the future."); 350 } 351 352 if (addr.sin_addr.s_addr == INADDR_NONE) { 353 return true; 354 } 355 356 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 357 // No firewalls to traverse on the loopback interface. 358 return true; 359 } 360 361 // Make up an RR/SDES RTCP packet. 362 sp<ABuffer> buf = new ABuffer(65536); 363 buf->setRange(0, 0); 364 addRR(buf); 365 addSDES(rtpSocket, buf); 366 367 addr.sin_port = htons(rtpPort); 368 369 ssize_t n = sendto( 370 rtpSocket, buf->data(), buf->size(), 0, 371 (const sockaddr *)&addr, sizeof(addr)); 372 373 if (n < (ssize_t)buf->size()) { 374 ALOGE("failed to poke a hole for RTP packets"); 375 return false; 376 } 377 378 addr.sin_port = htons(rtcpPort); 379 380 n = sendto( 381 rtcpSocket, buf->data(), buf->size(), 0, 382 (const sockaddr *)&addr, sizeof(addr)); 383 384 if (n < (ssize_t)buf->size()) { 385 ALOGE("failed to poke a hole for RTCP packets"); 386 return false; 387 } 388 389 ALOGV("successfully poked holes."); 390 391 return true; 392 } 393 394 static bool isLiveStream(const sp<ASessionDescription> &desc) { 395 AString attrLiveStream; 396 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 397 ssize_t semicolonPos = attrLiveStream.find(";", 2); 398 399 const char* liveStreamValue; 400 if (semicolonPos < 0) { 401 liveStreamValue = attrLiveStream.c_str(); 402 } else { 403 AString valString; 404 valString.setTo(attrLiveStream, 405 semicolonPos + 1, 406 attrLiveStream.size() - semicolonPos - 1); 407 liveStreamValue = valString.c_str(); 408 } 409 410 uint32_t value = strtoul(liveStreamValue, NULL, 10); 411 if (value == 1) { 412 ALOGV("found live stream"); 413 return true; 414 } 415 } else { 416 // It is a live stream if no duration is returned 417 int64_t durationUs; 418 if (!desc->getDurationUs(&durationUs)) { 419 ALOGV("No duration found, assume live stream"); 420 return true; 421 } 422 } 423 424 return false; 425 } 426 427 virtual void onMessageReceived(const sp<AMessage> &msg) { 428 switch (msg->what()) { 429 case 'conn': 430 { 431 int32_t result; 432 CHECK(msg->findInt32("result", &result)); 433 434 ALOGI("connection request completed with result %d (%s)", 435 result, strerror(-result)); 436 437 if (result == OK) { 438 AString request; 439 request = "DESCRIBE "; 440 request.append(mSessionURL); 441 request.append(" RTSP/1.0\r\n"); 442 request.append("Accept: application/sdp\r\n"); 443 request.append("\r\n"); 444 445 sp<AMessage> reply = new AMessage('desc', id()); 446 mConn->sendRequest(request.c_str(), reply); 447 } else { 448 (new AMessage('disc', id()))->post(); 449 } 450 break; 451 } 452 453 case 'disc': 454 { 455 ++mKeepAliveGeneration; 456 457 int32_t reconnect; 458 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 459 sp<AMessage> reply = new AMessage('conn', id()); 460 mConn->connect(mOriginalSessionURL.c_str(), reply); 461 } else { 462 (new AMessage('quit', id()))->post(); 463 } 464 break; 465 } 466 467 case 'desc': 468 { 469 int32_t result; 470 CHECK(msg->findInt32("result", &result)); 471 472 ALOGI("DESCRIBE completed with result %d (%s)", 473 result, strerror(-result)); 474 475 if (result == OK) { 476 sp<RefBase> obj; 477 CHECK(msg->findObject("response", &obj)); 478 sp<ARTSPResponse> response = 479 static_cast<ARTSPResponse *>(obj.get()); 480 481 if (response->mStatusCode == 302) { 482 ssize_t i = response->mHeaders.indexOfKey("location"); 483 CHECK_GE(i, 0); 484 485 mSessionURL = response->mHeaders.valueAt(i); 486 487 AString request; 488 request = "DESCRIBE "; 489 request.append(mSessionURL); 490 request.append(" RTSP/1.0\r\n"); 491 request.append("Accept: application/sdp\r\n"); 492 request.append("\r\n"); 493 494 sp<AMessage> reply = new AMessage('desc', id()); 495 mConn->sendRequest(request.c_str(), reply); 496 break; 497 } 498 499 if (response->mStatusCode != 200) { 500 result = UNKNOWN_ERROR; 501 } else if (response->mContent == NULL) { 502 result = ERROR_MALFORMED; 503 ALOGE("The response has no content."); 504 } else { 505 mSessionDesc = new ASessionDescription; 506 507 mSessionDesc->setTo( 508 response->mContent->data(), 509 response->mContent->size()); 510 511 if (!mSessionDesc->isValid()) { 512 ALOGE("Failed to parse session description."); 513 result = ERROR_MALFORMED; 514 } else { 515 ssize_t i = response->mHeaders.indexOfKey("content-base"); 516 if (i >= 0) { 517 mBaseURL = response->mHeaders.valueAt(i); 518 } else { 519 i = response->mHeaders.indexOfKey("content-location"); 520 if (i >= 0) { 521 mBaseURL = response->mHeaders.valueAt(i); 522 } else { 523 mBaseURL = mSessionURL; 524 } 525 } 526 527 mSeekable = !isLiveStream(mSessionDesc); 528 529 if (!mBaseURL.startsWith("rtsp://")) { 530 // Some misbehaving servers specify a relative 531 // URL in one of the locations above, combine 532 // it with the absolute session URL to get 533 // something usable... 534 535 ALOGW("Server specified a non-absolute base URL" 536 ", combining it with the session URL to " 537 "get something usable..."); 538 539 AString tmp; 540 CHECK(MakeURL( 541 mSessionURL.c_str(), 542 mBaseURL.c_str(), 543 &tmp)); 544 545 mBaseURL = tmp; 546 } 547 548 mControlURL = getControlURL(mSessionDesc); 549 550 if (mSessionDesc->countTracks() < 2) { 551 // There's no actual tracks in this session. 552 // The first "track" is merely session meta 553 // data. 554 555 ALOGW("Session doesn't contain any playable " 556 "tracks. Aborting."); 557 result = ERROR_UNSUPPORTED; 558 } else { 559 setupTrack(1); 560 } 561 } 562 } 563 } 564 565 if (result != OK) { 566 sp<AMessage> reply = new AMessage('disc', id()); 567 mConn->disconnect(reply); 568 } 569 break; 570 } 571 572 case 'sdpl': 573 { 574 int32_t result; 575 CHECK(msg->findInt32("result", &result)); 576 577 ALOGI("SDP connection request completed with result %d (%s)", 578 result, strerror(-result)); 579 580 if (result == OK) { 581 sp<RefBase> obj; 582 CHECK(msg->findObject("description", &obj)); 583 mSessionDesc = 584 static_cast<ASessionDescription *>(obj.get()); 585 586 if (!mSessionDesc->isValid()) { 587 ALOGE("Failed to parse session description."); 588 result = ERROR_MALFORMED; 589 } else { 590 mBaseURL = mSessionURL; 591 592 mSeekable = !isLiveStream(mSessionDesc); 593 594 mControlURL = getControlURL(mSessionDesc); 595 596 if (mSessionDesc->countTracks() < 2) { 597 // There's no actual tracks in this session. 598 // The first "track" is merely session meta 599 // data. 600 601 ALOGW("Session doesn't contain any playable " 602 "tracks. Aborting."); 603 result = ERROR_UNSUPPORTED; 604 } else { 605 setupTrack(1); 606 } 607 } 608 } 609 610 if (result != OK) { 611 sp<AMessage> reply = new AMessage('disc', id()); 612 mConn->disconnect(reply); 613 } 614 break; 615 } 616 617 case 'setu': 618 { 619 size_t index; 620 CHECK(msg->findSize("index", &index)); 621 622 TrackInfo *track = NULL; 623 size_t trackIndex; 624 if (msg->findSize("track-index", &trackIndex)) { 625 track = &mTracks.editItemAt(trackIndex); 626 } 627 628 int32_t result; 629 CHECK(msg->findInt32("result", &result)); 630 631 ALOGI("SETUP(%d) completed with result %d (%s)", 632 index, result, strerror(-result)); 633 634 if (result == OK) { 635 CHECK(track != NULL); 636 637 sp<RefBase> obj; 638 CHECK(msg->findObject("response", &obj)); 639 sp<ARTSPResponse> response = 640 static_cast<ARTSPResponse *>(obj.get()); 641 642 if (response->mStatusCode != 200) { 643 result = UNKNOWN_ERROR; 644 } else { 645 ssize_t i = response->mHeaders.indexOfKey("session"); 646 CHECK_GE(i, 0); 647 648 mSessionID = response->mHeaders.valueAt(i); 649 650 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 651 AString timeoutStr; 652 if (GetAttribute( 653 mSessionID.c_str(), "timeout", &timeoutStr)) { 654 char *end; 655 unsigned long timeoutSecs = 656 strtoul(timeoutStr.c_str(), &end, 10); 657 658 if (end == timeoutStr.c_str() || *end != '\0') { 659 ALOGW("server specified malformed timeout '%s'", 660 timeoutStr.c_str()); 661 662 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 663 } else if (timeoutSecs < 15) { 664 ALOGW("server specified too short a timeout " 665 "(%lu secs), using default.", 666 timeoutSecs); 667 668 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 669 } else { 670 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 671 672 ALOGI("server specified timeout of %lu secs.", 673 timeoutSecs); 674 } 675 } 676 677 i = mSessionID.find(";"); 678 if (i >= 0) { 679 // Remove options, i.e. ";timeout=90" 680 mSessionID.erase(i, mSessionID.size() - i); 681 } 682 683 sp<AMessage> notify = new AMessage('accu', id()); 684 notify->setSize("track-index", trackIndex); 685 686 i = response->mHeaders.indexOfKey("transport"); 687 CHECK_GE(i, 0); 688 689 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 690 if (!track->mUsingInterleavedTCP) { 691 AString transport = response->mHeaders.valueAt(i); 692 693 // We are going to continue even if we were 694 // unable to poke a hole into the firewall... 695 pokeAHole( 696 track->mRTPSocket, 697 track->mRTCPSocket, 698 transport); 699 } 700 701 mRTPConn->addStream( 702 track->mRTPSocket, track->mRTCPSocket, 703 mSessionDesc, index, 704 notify, track->mUsingInterleavedTCP); 705 706 mSetupTracksSuccessful = true; 707 } else { 708 result = BAD_VALUE; 709 } 710 } 711 } 712 713 if (result != OK) { 714 if (track) { 715 if (!track->mUsingInterleavedTCP) { 716 // Clear the tag 717 if (mUIDValid) { 718 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 719 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 720 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 721 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 722 } 723 724 close(track->mRTPSocket); 725 close(track->mRTCPSocket); 726 } 727 728 mTracks.removeItemsAt(trackIndex); 729 } 730 } 731 732 ++index; 733 if (result == OK && index < mSessionDesc->countTracks()) { 734 setupTrack(index); 735 } else if (mSetupTracksSuccessful) { 736 ++mKeepAliveGeneration; 737 postKeepAlive(); 738 739 AString request = "PLAY "; 740 request.append(mControlURL); 741 request.append(" RTSP/1.0\r\n"); 742 743 request.append("Session: "); 744 request.append(mSessionID); 745 request.append("\r\n"); 746 747 request.append("\r\n"); 748 749 sp<AMessage> reply = new AMessage('play', id()); 750 mConn->sendRequest(request.c_str(), reply); 751 } else { 752 sp<AMessage> reply = new AMessage('disc', id()); 753 mConn->disconnect(reply); 754 } 755 break; 756 } 757 758 case 'play': 759 { 760 int32_t result; 761 CHECK(msg->findInt32("result", &result)); 762 763 ALOGI("PLAY completed with result %d (%s)", 764 result, strerror(-result)); 765 766 if (result == OK) { 767 sp<RefBase> obj; 768 CHECK(msg->findObject("response", &obj)); 769 sp<ARTSPResponse> response = 770 static_cast<ARTSPResponse *>(obj.get()); 771 772 if (response->mStatusCode != 200) { 773 result = UNKNOWN_ERROR; 774 } else { 775 parsePlayResponse(response); 776 777 sp<AMessage> timeout = new AMessage('tiou', id()); 778 mCheckTimeoutGeneration++; 779 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 780 timeout->post(kStartupTimeoutUs); 781 } 782 } 783 784 if (result != OK) { 785 sp<AMessage> reply = new AMessage('disc', id()); 786 mConn->disconnect(reply); 787 } 788 789 break; 790 } 791 792 case 'aliv': 793 { 794 int32_t generation; 795 CHECK(msg->findInt32("generation", &generation)); 796 797 if (generation != mKeepAliveGeneration) { 798 // obsolete event. 799 break; 800 } 801 802 AString request; 803 request.append("OPTIONS "); 804 request.append(mSessionURL); 805 request.append(" RTSP/1.0\r\n"); 806 request.append("Session: "); 807 request.append(mSessionID); 808 request.append("\r\n"); 809 request.append("\r\n"); 810 811 sp<AMessage> reply = new AMessage('opts', id()); 812 reply->setInt32("generation", mKeepAliveGeneration); 813 mConn->sendRequest(request.c_str(), reply); 814 break; 815 } 816 817 case 'opts': 818 { 819 int32_t result; 820 CHECK(msg->findInt32("result", &result)); 821 822 ALOGI("OPTIONS completed with result %d (%s)", 823 result, strerror(-result)); 824 825 int32_t generation; 826 CHECK(msg->findInt32("generation", &generation)); 827 828 if (generation != mKeepAliveGeneration) { 829 // obsolete event. 830 break; 831 } 832 833 postKeepAlive(); 834 break; 835 } 836 837 case 'abor': 838 { 839 for (size_t i = 0; i < mTracks.size(); ++i) { 840 TrackInfo *info = &mTracks.editItemAt(i); 841 842 if (!mFirstAccessUnit) { 843 postQueueEOS(i, ERROR_END_OF_STREAM); 844 } 845 846 if (!info->mUsingInterleavedTCP) { 847 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 848 849 // Clear the tag 850 if (mUIDValid) { 851 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 852 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 853 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 854 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 855 } 856 857 close(info->mRTPSocket); 858 close(info->mRTCPSocket); 859 } 860 } 861 mTracks.clear(); 862 mSetupTracksSuccessful = false; 863 mSeekPending = false; 864 mFirstAccessUnit = true; 865 mAllTracksHaveTime = false; 866 mNTPAnchorUs = -1; 867 mMediaAnchorUs = -1; 868 mNumAccessUnitsReceived = 0; 869 mReceivedFirstRTCPPacket = false; 870 mReceivedFirstRTPPacket = false; 871 mPausing = false; 872 mSeekable = true; 873 874 sp<AMessage> reply = new AMessage('tear', id()); 875 876 int32_t reconnect; 877 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 878 reply->setInt32("reconnect", true); 879 } 880 881 AString request; 882 request = "TEARDOWN "; 883 884 // XXX should use aggregate url from SDP here... 885 request.append(mSessionURL); 886 request.append(" RTSP/1.0\r\n"); 887 888 request.append("Session: "); 889 request.append(mSessionID); 890 request.append("\r\n"); 891 892 request.append("\r\n"); 893 894 mConn->sendRequest(request.c_str(), reply); 895 break; 896 } 897 898 case 'tear': 899 { 900 int32_t result; 901 CHECK(msg->findInt32("result", &result)); 902 903 ALOGI("TEARDOWN completed with result %d (%s)", 904 result, strerror(-result)); 905 906 sp<AMessage> reply = new AMessage('disc', id()); 907 908 int32_t reconnect; 909 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 910 reply->setInt32("reconnect", true); 911 } 912 913 mConn->disconnect(reply); 914 break; 915 } 916 917 case 'quit': 918 { 919 sp<AMessage> msg = mNotify->dup(); 920 msg->setInt32("what", kWhatDisconnected); 921 msg->setInt32("result", UNKNOWN_ERROR); 922 msg->post(); 923 break; 924 } 925 926 case 'chek': 927 { 928 int32_t generation; 929 CHECK(msg->findInt32("generation", &generation)); 930 if (generation != mCheckGeneration) { 931 // This is an outdated message. Ignore. 932 break; 933 } 934 935 if (mNumAccessUnitsReceived == 0) { 936#if 1 937 ALOGI("stream ended? aborting."); 938 (new AMessage('abor', id()))->post(); 939 break; 940#else 941 ALOGI("haven't seen an AU in a looong time."); 942#endif 943 } 944 945 mNumAccessUnitsReceived = 0; 946 msg->post(kAccessUnitTimeoutUs); 947 break; 948 } 949 950 case 'accu': 951 { 952 int32_t timeUpdate; 953 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 954 size_t trackIndex; 955 CHECK(msg->findSize("track-index", &trackIndex)); 956 957 uint32_t rtpTime; 958 uint64_t ntpTime; 959 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 960 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 961 962 onTimeUpdate(trackIndex, rtpTime, ntpTime); 963 break; 964 } 965 966 int32_t first; 967 if (msg->findInt32("first-rtcp", &first)) { 968 mReceivedFirstRTCPPacket = true; 969 break; 970 } 971 972 if (msg->findInt32("first-rtp", &first)) { 973 mReceivedFirstRTPPacket = true; 974 break; 975 } 976 977 ++mNumAccessUnitsReceived; 978 postAccessUnitTimeoutCheck(); 979 980 size_t trackIndex; 981 CHECK(msg->findSize("track-index", &trackIndex)); 982 983 if (trackIndex >= mTracks.size()) { 984 ALOGV("late packets ignored."); 985 break; 986 } 987 988 TrackInfo *track = &mTracks.editItemAt(trackIndex); 989 990 int32_t eos; 991 if (msg->findInt32("eos", &eos)) { 992 ALOGI("received BYE on track index %d", trackIndex); 993 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 994 ALOGI("No time established => fake existing data"); 995 996 track->mEOSReceived = true; 997 mTryFakeRTCP = true; 998 mReceivedFirstRTCPPacket = true; 999 fakeTimestamps(); 1000 } else { 1001 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1002 } 1003 return; 1004 } 1005 1006 sp<ABuffer> accessUnit; 1007 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1008 1009 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1010 1011 if (mSeekPending) { 1012 ALOGV("we're seeking, dropping stale packet."); 1013 break; 1014 } 1015 1016 if (seqNum < track->mFirstSeqNumInSegment) { 1017 ALOGV("dropping stale access-unit (%d < %d)", 1018 seqNum, track->mFirstSeqNumInSegment); 1019 break; 1020 } 1021 1022 if (track->mNewSegment) { 1023 track->mNewSegment = false; 1024 } 1025 1026 onAccessUnitComplete(trackIndex, accessUnit); 1027 break; 1028 } 1029 1030 case 'paus': 1031 { 1032 int32_t generation; 1033 CHECK(msg->findInt32("pausecheck", &generation)); 1034 if (generation != mPauseGeneration) { 1035 ALOGV("Ignoring outdated pause message."); 1036 break; 1037 } 1038 1039 if (!mSeekable) { 1040 ALOGW("This is a live stream, ignoring pause request."); 1041 break; 1042 } 1043 mCheckPending = true; 1044 ++mCheckGeneration; 1045 mPausing = true; 1046 1047 AString request = "PAUSE "; 1048 request.append(mControlURL); 1049 request.append(" RTSP/1.0\r\n"); 1050 1051 request.append("Session: "); 1052 request.append(mSessionID); 1053 request.append("\r\n"); 1054 1055 request.append("\r\n"); 1056 1057 sp<AMessage> reply = new AMessage('pau2', id()); 1058 mConn->sendRequest(request.c_str(), reply); 1059 break; 1060 } 1061 1062 case 'pau2': 1063 { 1064 int32_t result; 1065 CHECK(msg->findInt32("result", &result)); 1066 mCheckTimeoutGeneration++; 1067 1068 ALOGI("PAUSE completed with result %d (%s)", 1069 result, strerror(-result)); 1070 break; 1071 } 1072 1073 case 'resu': 1074 { 1075 if (mPausing && mSeekPending) { 1076 // If seeking, Play will be sent from see1 instead 1077 break; 1078 } 1079 1080 if (!mPausing) { 1081 // Dont send PLAY if we have not paused 1082 break; 1083 } 1084 AString request = "PLAY "; 1085 request.append(mControlURL); 1086 request.append(" RTSP/1.0\r\n"); 1087 1088 request.append("Session: "); 1089 request.append(mSessionID); 1090 request.append("\r\n"); 1091 1092 request.append("\r\n"); 1093 1094 sp<AMessage> reply = new AMessage('res2', id()); 1095 mConn->sendRequest(request.c_str(), reply); 1096 break; 1097 } 1098 1099 case 'res2': 1100 { 1101 int32_t result; 1102 CHECK(msg->findInt32("result", &result)); 1103 1104 ALOGI("PLAY completed with result %d (%s)", 1105 result, strerror(-result)); 1106 1107 mCheckPending = false; 1108 postAccessUnitTimeoutCheck(); 1109 1110 if (result == OK) { 1111 sp<RefBase> obj; 1112 CHECK(msg->findObject("response", &obj)); 1113 sp<ARTSPResponse> response = 1114 static_cast<ARTSPResponse *>(obj.get()); 1115 1116 if (response->mStatusCode != 200) { 1117 result = UNKNOWN_ERROR; 1118 } else { 1119 parsePlayResponse(response); 1120 1121 // Post new timeout in order to make sure to use 1122 // fake timestamps if no new Sender Reports arrive 1123 sp<AMessage> timeout = new AMessage('tiou', id()); 1124 mCheckTimeoutGeneration++; 1125 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1126 timeout->post(kStartupTimeoutUs); 1127 } 1128 } 1129 1130 if (result != OK) { 1131 ALOGE("resume failed, aborting."); 1132 (new AMessage('abor', id()))->post(); 1133 } 1134 1135 mPausing = false; 1136 break; 1137 } 1138 1139 case 'seek': 1140 { 1141 if (!mSeekable) { 1142 ALOGW("This is a live stream, ignoring seek request."); 1143 1144 sp<AMessage> msg = mNotify->dup(); 1145 msg->setInt32("what", kWhatSeekDone); 1146 msg->post(); 1147 break; 1148 } 1149 1150 int64_t timeUs; 1151 CHECK(msg->findInt64("time", &timeUs)); 1152 1153 mSeekPending = true; 1154 1155 // Disable the access unit timeout until we resumed 1156 // playback again. 1157 mCheckPending = true; 1158 ++mCheckGeneration; 1159 1160 sp<AMessage> reply = new AMessage('see1', id()); 1161 reply->setInt64("time", timeUs); 1162 1163 if (mPausing) { 1164 // PAUSE already sent 1165 ALOGI("Pause already sent"); 1166 reply->post(); 1167 break; 1168 } 1169 AString request = "PAUSE "; 1170 request.append(mControlURL); 1171 request.append(" RTSP/1.0\r\n"); 1172 1173 request.append("Session: "); 1174 request.append(mSessionID); 1175 request.append("\r\n"); 1176 1177 request.append("\r\n"); 1178 1179 mConn->sendRequest(request.c_str(), reply); 1180 break; 1181 } 1182 1183 case 'see1': 1184 { 1185 // Session is paused now. 1186 for (size_t i = 0; i < mTracks.size(); ++i) { 1187 TrackInfo *info = &mTracks.editItemAt(i); 1188 1189 postQueueSeekDiscontinuity(i); 1190 info->mEOSReceived = false; 1191 1192 info->mRTPAnchor = 0; 1193 info->mNTPAnchorUs = -1; 1194 } 1195 1196 mAllTracksHaveTime = false; 1197 mNTPAnchorUs = -1; 1198 1199 // Start new timeoutgeneration to avoid getting timeout 1200 // before PLAY response arrive 1201 sp<AMessage> timeout = new AMessage('tiou', id()); 1202 mCheckTimeoutGeneration++; 1203 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1204 timeout->post(kStartupTimeoutUs); 1205 1206 int64_t timeUs; 1207 CHECK(msg->findInt64("time", &timeUs)); 1208 1209 AString request = "PLAY "; 1210 request.append(mControlURL); 1211 request.append(" RTSP/1.0\r\n"); 1212 1213 request.append("Session: "); 1214 request.append(mSessionID); 1215 request.append("\r\n"); 1216 1217 request.append( 1218 StringPrintf( 1219 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1220 1221 request.append("\r\n"); 1222 1223 sp<AMessage> reply = new AMessage('see2', id()); 1224 mConn->sendRequest(request.c_str(), reply); 1225 break; 1226 } 1227 1228 case 'see2': 1229 { 1230 if (mTracks.size() == 0) { 1231 // We have already hit abor, break 1232 break; 1233 } 1234 1235 int32_t result; 1236 CHECK(msg->findInt32("result", &result)); 1237 1238 ALOGI("PLAY completed with result %d (%s)", 1239 result, strerror(-result)); 1240 1241 mCheckPending = false; 1242 postAccessUnitTimeoutCheck(); 1243 1244 if (result == OK) { 1245 sp<RefBase> obj; 1246 CHECK(msg->findObject("response", &obj)); 1247 sp<ARTSPResponse> response = 1248 static_cast<ARTSPResponse *>(obj.get()); 1249 1250 if (response->mStatusCode != 200) { 1251 result = UNKNOWN_ERROR; 1252 } else { 1253 parsePlayResponse(response); 1254 1255 // Post new timeout in order to make sure to use 1256 // fake timestamps if no new Sender Reports arrive 1257 sp<AMessage> timeout = new AMessage('tiou', id()); 1258 mCheckTimeoutGeneration++; 1259 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1260 timeout->post(kStartupTimeoutUs); 1261 1262 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1263 CHECK_GE(i, 0); 1264 1265 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1266 1267 ALOGI("seek completed."); 1268 } 1269 } 1270 1271 if (result != OK) { 1272 ALOGE("seek failed, aborting."); 1273 (new AMessage('abor', id()))->post(); 1274 } 1275 1276 mPausing = false; 1277 mSeekPending = false; 1278 1279 sp<AMessage> msg = mNotify->dup(); 1280 msg->setInt32("what", kWhatSeekDone); 1281 msg->post(); 1282 break; 1283 } 1284 1285 case 'biny': 1286 { 1287 sp<ABuffer> buffer; 1288 CHECK(msg->findBuffer("buffer", &buffer)); 1289 1290 int32_t index; 1291 CHECK(buffer->meta()->findInt32("index", &index)); 1292 1293 mRTPConn->injectPacket(index, buffer); 1294 break; 1295 } 1296 1297 case 'tiou': 1298 { 1299 int32_t timeoutGenerationCheck; 1300 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1301 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1302 // This is an outdated message. Ignore. 1303 // This typically happens if a lot of seeks are 1304 // performed, since new timeout messages now are 1305 // posted at seek as well. 1306 break; 1307 } 1308 if (!mReceivedFirstRTCPPacket) { 1309 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1310 ALOGW("We received RTP packets but no RTCP packets, " 1311 "using fake timestamps."); 1312 1313 mTryFakeRTCP = true; 1314 1315 mReceivedFirstRTCPPacket = true; 1316 1317 fakeTimestamps(); 1318 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1319 ALOGW("Never received any data, switching transports."); 1320 1321 mTryTCPInterleaving = true; 1322 1323 sp<AMessage> msg = new AMessage('abor', id()); 1324 msg->setInt32("reconnect", true); 1325 msg->post(); 1326 } else { 1327 ALOGW("Never received any data, disconnecting."); 1328 (new AMessage('abor', id()))->post(); 1329 } 1330 } else { 1331 if (!mAllTracksHaveTime) { 1332 ALOGW("We received some RTCP packets, but time " 1333 "could not be established on all tracks, now " 1334 "using fake timestamps"); 1335 1336 fakeTimestamps(); 1337 } 1338 } 1339 break; 1340 } 1341 1342 default: 1343 TRESPASS(); 1344 break; 1345 } 1346 } 1347 1348 void postKeepAlive() { 1349 sp<AMessage> msg = new AMessage('aliv', id()); 1350 msg->setInt32("generation", mKeepAliveGeneration); 1351 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1352 } 1353 1354 void postAccessUnitTimeoutCheck() { 1355 if (mCheckPending) { 1356 return; 1357 } 1358 1359 mCheckPending = true; 1360 sp<AMessage> check = new AMessage('chek', id()); 1361 check->setInt32("generation", mCheckGeneration); 1362 check->post(kAccessUnitTimeoutUs); 1363 } 1364 1365 static void SplitString( 1366 const AString &s, const char *separator, List<AString> *items) { 1367 items->clear(); 1368 size_t start = 0; 1369 while (start < s.size()) { 1370 ssize_t offset = s.find(separator, start); 1371 1372 if (offset < 0) { 1373 items->push_back(AString(s, start, s.size() - start)); 1374 break; 1375 } 1376 1377 items->push_back(AString(s, start, offset - start)); 1378 start = offset + strlen(separator); 1379 } 1380 } 1381 1382 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1383 mPlayResponseParsed = true; 1384 if (mTracks.size() == 0) { 1385 ALOGV("parsePlayResponse: late packets ignored."); 1386 return; 1387 } 1388 1389 ssize_t i = response->mHeaders.indexOfKey("range"); 1390 if (i < 0) { 1391 // Server doesn't even tell use what range it is going to 1392 // play, therefore we won't support seeking. 1393 return; 1394 } 1395 1396 AString range = response->mHeaders.valueAt(i); 1397 ALOGV("Range: %s", range.c_str()); 1398 1399 AString val; 1400 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1401 1402 float npt1, npt2; 1403 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1404 // This is a live stream and therefore not seekable. 1405 1406 ALOGI("This is a live stream"); 1407 return; 1408 } 1409 1410 i = response->mHeaders.indexOfKey("rtp-info"); 1411 CHECK_GE(i, 0); 1412 1413 AString rtpInfo = response->mHeaders.valueAt(i); 1414 List<AString> streamInfos; 1415 SplitString(rtpInfo, ",", &streamInfos); 1416 1417 int n = 1; 1418 for (List<AString>::iterator it = streamInfos.begin(); 1419 it != streamInfos.end(); ++it) { 1420 (*it).trim(); 1421 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1422 1423 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1424 1425 size_t trackIndex = 0; 1426 while (trackIndex < mTracks.size() 1427 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1428 ++trackIndex; 1429 } 1430 CHECK_LT(trackIndex, mTracks.size()); 1431 1432 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1433 1434 char *end; 1435 unsigned long seq = strtoul(val.c_str(), &end, 10); 1436 1437 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1438 info->mFirstSeqNumInSegment = seq; 1439 info->mNewSegment = true; 1440 1441 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1442 1443 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1444 1445 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1446 1447 info->mNormalPlayTimeRTP = rtpTime; 1448 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1449 1450 if (!mFirstAccessUnit) { 1451 postNormalPlayTimeMapping( 1452 trackIndex, 1453 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1454 } 1455 1456 ++n; 1457 } 1458 } 1459 1460 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1461 CHECK_GE(index, 0u); 1462 CHECK_LT(index, mTracks.size()); 1463 1464 const TrackInfo &info = mTracks.itemAt(index); 1465 1466 *timeScale = info.mTimeScale; 1467 1468 return info.mPacketSource->getFormat(); 1469 } 1470 1471 size_t countTracks() const { 1472 return mTracks.size(); 1473 } 1474 1475private: 1476 struct TrackInfo { 1477 AString mURL; 1478 int mRTPSocket; 1479 int mRTCPSocket; 1480 bool mUsingInterleavedTCP; 1481 uint32_t mFirstSeqNumInSegment; 1482 bool mNewSegment; 1483 1484 uint32_t mRTPAnchor; 1485 int64_t mNTPAnchorUs; 1486 int32_t mTimeScale; 1487 bool mEOSReceived; 1488 1489 uint32_t mNormalPlayTimeRTP; 1490 int64_t mNormalPlayTimeUs; 1491 1492 sp<APacketSource> mPacketSource; 1493 1494 // Stores packets temporarily while no notion of time 1495 // has been established yet. 1496 List<sp<ABuffer> > mPackets; 1497 }; 1498 1499 sp<AMessage> mNotify; 1500 bool mUIDValid; 1501 uid_t mUID; 1502 sp<ALooper> mNetLooper; 1503 sp<ARTSPConnection> mConn; 1504 sp<ARTPConnection> mRTPConn; 1505 sp<ASessionDescription> mSessionDesc; 1506 AString mOriginalSessionURL; // This one still has user:pass@ 1507 AString mSessionURL; 1508 AString mSessionHost; 1509 AString mBaseURL; 1510 AString mControlURL; 1511 AString mSessionID; 1512 bool mSetupTracksSuccessful; 1513 bool mSeekPending; 1514 bool mFirstAccessUnit; 1515 1516 bool mAllTracksHaveTime; 1517 int64_t mNTPAnchorUs; 1518 int64_t mMediaAnchorUs; 1519 int64_t mLastMediaTimeUs; 1520 1521 int64_t mNumAccessUnitsReceived; 1522 bool mCheckPending; 1523 int32_t mCheckGeneration; 1524 int32_t mCheckTimeoutGeneration; 1525 bool mTryTCPInterleaving; 1526 bool mTryFakeRTCP; 1527 bool mReceivedFirstRTCPPacket; 1528 bool mReceivedFirstRTPPacket; 1529 bool mSeekable; 1530 int64_t mKeepAliveTimeoutUs; 1531 int32_t mKeepAliveGeneration; 1532 bool mPausing; 1533 int32_t mPauseGeneration; 1534 1535 Vector<TrackInfo> mTracks; 1536 1537 bool mPlayResponseParsed; 1538 1539 void setupTrack(size_t index) { 1540 sp<APacketSource> source = 1541 new APacketSource(mSessionDesc, index); 1542 1543 if (source->initCheck() != OK) { 1544 ALOGW("Unsupported format. Ignoring track #%d.", index); 1545 1546 sp<AMessage> reply = new AMessage('setu', id()); 1547 reply->setSize("index", index); 1548 reply->setInt32("result", ERROR_UNSUPPORTED); 1549 reply->post(); 1550 return; 1551 } 1552 1553 AString url; 1554 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1555 1556 AString trackURL; 1557 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1558 1559 mTracks.push(TrackInfo()); 1560 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1561 info->mURL = trackURL; 1562 info->mPacketSource = source; 1563 info->mUsingInterleavedTCP = false; 1564 info->mFirstSeqNumInSegment = 0; 1565 info->mNewSegment = true; 1566 info->mRTPSocket = -1; 1567 info->mRTCPSocket = -1; 1568 info->mRTPAnchor = 0; 1569 info->mNTPAnchorUs = -1; 1570 info->mNormalPlayTimeRTP = 0; 1571 info->mNormalPlayTimeUs = 0ll; 1572 1573 unsigned long PT; 1574 AString formatDesc; 1575 AString formatParams; 1576 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1577 1578 int32_t timescale; 1579 int32_t numChannels; 1580 ASessionDescription::ParseFormatDesc( 1581 formatDesc.c_str(), ×cale, &numChannels); 1582 1583 info->mTimeScale = timescale; 1584 info->mEOSReceived = false; 1585 1586 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1587 1588 AString request = "SETUP "; 1589 request.append(trackURL); 1590 request.append(" RTSP/1.0\r\n"); 1591 1592 if (mTryTCPInterleaving) { 1593 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1594 info->mUsingInterleavedTCP = true; 1595 info->mRTPSocket = interleaveIndex; 1596 info->mRTCPSocket = interleaveIndex + 1; 1597 1598 request.append("Transport: RTP/AVP/TCP;interleaved="); 1599 request.append(interleaveIndex); 1600 request.append("-"); 1601 request.append(interleaveIndex + 1); 1602 } else { 1603 unsigned rtpPort; 1604 ARTPConnection::MakePortPair( 1605 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1606 1607 if (mUIDValid) { 1608 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1609 (uint32_t)*(uint32_t*) "RTP_"); 1610 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1611 (uint32_t)*(uint32_t*) "RTP_"); 1612 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1613 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1614 } 1615 1616 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1617 request.append(rtpPort); 1618 request.append("-"); 1619 request.append(rtpPort + 1); 1620 } 1621 1622 request.append("\r\n"); 1623 1624 if (index > 1) { 1625 request.append("Session: "); 1626 request.append(mSessionID); 1627 request.append("\r\n"); 1628 } 1629 1630 request.append("\r\n"); 1631 1632 sp<AMessage> reply = new AMessage('setu', id()); 1633 reply->setSize("index", index); 1634 reply->setSize("track-index", mTracks.size() - 1); 1635 mConn->sendRequest(request.c_str(), reply); 1636 } 1637 1638 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1639 out->clear(); 1640 1641 if (strncasecmp("rtsp://", baseURL, 7)) { 1642 // Base URL must be absolute 1643 return false; 1644 } 1645 1646 if (!strncasecmp("rtsp://", url, 7)) { 1647 // "url" is already an absolute URL, ignore base URL. 1648 out->setTo(url); 1649 return true; 1650 } 1651 1652 size_t n = strlen(baseURL); 1653 if (baseURL[n - 1] == '/') { 1654 out->setTo(baseURL); 1655 out->append(url); 1656 } else { 1657 const char *slashPos = strrchr(baseURL, '/'); 1658 1659 if (slashPos > &baseURL[6]) { 1660 out->setTo(baseURL, slashPos - baseURL); 1661 } else { 1662 out->setTo(baseURL); 1663 } 1664 1665 out->append("/"); 1666 out->append(url); 1667 } 1668 1669 return true; 1670 } 1671 1672 void fakeTimestamps() { 1673 mNTPAnchorUs = -1ll; 1674 for (size_t i = 0; i < mTracks.size(); ++i) { 1675 onTimeUpdate(i, 0, 0ll); 1676 } 1677 } 1678 1679 bool dataReceivedOnAllChannels() { 1680 TrackInfo *track; 1681 for (size_t i = 0; i < mTracks.size(); ++i) { 1682 track = &mTracks.editItemAt(i); 1683 if (track->mPackets.empty()) { 1684 return false; 1685 } 1686 } 1687 return true; 1688 } 1689 1690 void handleFirstAccessUnit() { 1691 if (mFirstAccessUnit) { 1692 sp<AMessage> msg = mNotify->dup(); 1693 msg->setInt32("what", kWhatConnected); 1694 msg->post(); 1695 1696 if (mSeekable) { 1697 for (size_t i = 0; i < mTracks.size(); ++i) { 1698 TrackInfo *info = &mTracks.editItemAt(i); 1699 1700 postNormalPlayTimeMapping( 1701 i, 1702 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1703 } 1704 } 1705 1706 mFirstAccessUnit = false; 1707 } 1708 } 1709 1710 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1711 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1712 trackIndex, rtpTime, ntpTime); 1713 1714 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1715 1716 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1717 1718 track->mRTPAnchor = rtpTime; 1719 track->mNTPAnchorUs = ntpTimeUs; 1720 1721 if (mNTPAnchorUs < 0) { 1722 mNTPAnchorUs = ntpTimeUs; 1723 mMediaAnchorUs = mLastMediaTimeUs; 1724 } 1725 1726 if (!mAllTracksHaveTime) { 1727 bool allTracksHaveTime = true; 1728 for (size_t i = 0; i < mTracks.size(); ++i) { 1729 TrackInfo *track = &mTracks.editItemAt(i); 1730 if (track->mNTPAnchorUs < 0) { 1731 allTracksHaveTime = false; 1732 break; 1733 } 1734 } 1735 if (allTracksHaveTime) { 1736 mAllTracksHaveTime = true; 1737 ALOGI("Time now established for all tracks."); 1738 } 1739 } 1740 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1741 handleFirstAccessUnit(); 1742 1743 // Time is now established, lets start timestamping immediately 1744 for (size_t i = 0; i < mTracks.size(); ++i) { 1745 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1746 while (!trackInfo->mPackets.empty()) { 1747 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1748 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1749 1750 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1751 postQueueAccessUnit(i, accessUnit); 1752 } 1753 } 1754 } 1755 for (size_t i = 0; i < mTracks.size(); ++i) { 1756 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1757 if (trackInfo->mEOSReceived) { 1758 postQueueEOS(i, ERROR_END_OF_STREAM); 1759 trackInfo->mEOSReceived = false; 1760 } 1761 } 1762 } 1763 } 1764 1765 void onAccessUnitComplete( 1766 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1767 ALOGV("onAccessUnitComplete track %d", trackIndex); 1768 1769 if(!mPlayResponseParsed){ 1770 ALOGI("play response is not parsed, storing accessunit"); 1771 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1772 track->mPackets.push_back(accessUnit); 1773 return; 1774 } 1775 1776 handleFirstAccessUnit(); 1777 1778 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1779 1780 if (!mAllTracksHaveTime) { 1781 ALOGV("storing accessUnit, no time established yet"); 1782 track->mPackets.push_back(accessUnit); 1783 return; 1784 } 1785 1786 while (!track->mPackets.empty()) { 1787 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1788 track->mPackets.erase(track->mPackets.begin()); 1789 1790 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1791 postQueueAccessUnit(trackIndex, accessUnit); 1792 } 1793 } 1794 1795 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1796 postQueueAccessUnit(trackIndex, accessUnit); 1797 } 1798 1799 if (track->mEOSReceived) { 1800 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1801 track->mEOSReceived = false; 1802 } 1803 } 1804 1805 bool addMediaTimestamp( 1806 int32_t trackIndex, const TrackInfo *track, 1807 const sp<ABuffer> &accessUnit) { 1808 uint32_t rtpTime; 1809 CHECK(accessUnit->meta()->findInt32( 1810 "rtp-time", (int32_t *)&rtpTime)); 1811 1812 int64_t relRtpTimeUs = 1813 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1814 / track->mTimeScale; 1815 1816 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1817 1818 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1819 1820 if (mediaTimeUs > mLastMediaTimeUs) { 1821 mLastMediaTimeUs = mediaTimeUs; 1822 } 1823 1824 if (mediaTimeUs < 0) { 1825 ALOGV("dropping early accessUnit."); 1826 return false; 1827 } 1828 1829 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1830 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1831 1832 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1833 1834 return true; 1835 } 1836 1837 void postQueueAccessUnit( 1838 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1839 sp<AMessage> msg = mNotify->dup(); 1840 msg->setInt32("what", kWhatAccessUnit); 1841 msg->setSize("trackIndex", trackIndex); 1842 msg->setBuffer("accessUnit", accessUnit); 1843 msg->post(); 1844 } 1845 1846 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1847 sp<AMessage> msg = mNotify->dup(); 1848 msg->setInt32("what", kWhatEOS); 1849 msg->setSize("trackIndex", trackIndex); 1850 msg->setInt32("finalResult", finalResult); 1851 msg->post(); 1852 } 1853 1854 void postQueueSeekDiscontinuity(size_t trackIndex) { 1855 sp<AMessage> msg = mNotify->dup(); 1856 msg->setInt32("what", kWhatSeekDiscontinuity); 1857 msg->setSize("trackIndex", trackIndex); 1858 msg->post(); 1859 } 1860 1861 void postNormalPlayTimeMapping( 1862 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1863 sp<AMessage> msg = mNotify->dup(); 1864 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1865 msg->setSize("trackIndex", trackIndex); 1866 msg->setInt32("rtpTime", rtpTime); 1867 msg->setInt64("nptUs", nptUs); 1868 msg->post(); 1869 } 1870 1871 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1872}; 1873 1874} // namespace android 1875 1876#endif // MY_HANDLER_H_ 1877