MyHandler.h revision 2bfdd428c56c7524d1a11979f200a1762866032d
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41#include <netdb.h> 42 43#include "HTTPBase.h" 44 45// If no access units are received within 5 secs, assume that the rtp 46// stream has ended and signal end of stream. 47static int64_t kAccessUnitTimeoutUs = 5000000ll; 48 49// If no access units arrive for the first 10 secs after starting the 50// stream, assume none ever will and signal EOS or switch transports. 51static int64_t kStartupTimeoutUs = 10000000ll; 52 53namespace android { 54 55static void MakeUserAgentString(AString *s) { 56 s->setTo("stagefright/1.1 (Linux;Android "); 57 58#if (PROPERTY_VALUE_MAX < 8) 59#error "PROPERTY_VALUE_MAX must be at least 8" 60#endif 61 62 char value[PROPERTY_VALUE_MAX]; 63 property_get("ro.build.version.release", value, "Unknown"); 64 s->append(value); 65 s->append(")"); 66} 67 68static bool GetAttribute(const char *s, const char *key, AString *value) { 69 value->clear(); 70 71 size_t keyLen = strlen(key); 72 73 for (;;) { 74 while (isspace(*s)) { 75 ++s; 76 } 77 78 const char *colonPos = strchr(s, ';'); 79 80 size_t len = 81 (colonPos == NULL) ? strlen(s) : colonPos - s; 82 83 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 84 value->setTo(&s[keyLen + 1], len - keyLen - 1); 85 return true; 86 } 87 88 if (colonPos == NULL) { 89 return false; 90 } 91 92 s = colonPos + 1; 93 } 94} 95 96struct MyHandler : public AHandler { 97 enum { 98 kWhatConnected = 'conn', 99 kWhatDisconnected = 'disc', 100 kWhatSeekDone = 'sdon', 101 102 kWhatAccessUnit = 'accU', 103 kWhatEOS = 'eos!', 104 kWhatSeekDiscontinuity = 'seeD', 105 kWhatNormalPlayTimeMapping = 'nptM', 106 }; 107 108 MyHandler( 109 const char *url, 110 const sp<AMessage> ¬ify, 111 bool uidValid = false, uid_t uid = 0) 112 : mNotify(notify), 113 mUIDValid(uidValid), 114 mUID(uid), 115 mNetLooper(new ALooper), 116 mConn(new ARTSPConnection(mUIDValid, mUID)), 117 mRTPConn(new ARTPConnection), 118 mOriginalSessionURL(url), 119 mSessionURL(url), 120 mSetupTracksSuccessful(false), 121 mSeekPending(false), 122 mFirstAccessUnit(true), 123 mNTPAnchorUs(-1), 124 mMediaAnchorUs(-1), 125 mLastMediaTimeUs(0), 126 mNumAccessUnitsReceived(0), 127 mCheckPending(false), 128 mCheckGeneration(0), 129 mTryTCPInterleaving(false), 130 mTryFakeRTCP(false), 131 mReceivedFirstRTCPPacket(false), 132 mReceivedFirstRTPPacket(false), 133 mSeekable(false) { 134 mNetLooper->setName("rtsp net"); 135 mNetLooper->start(false /* runOnCallingThread */, 136 false /* canCallJava */, 137 PRIORITY_HIGHEST); 138 139 // Strip any authentication info from the session url, we don't 140 // want to transmit user/pass in cleartext. 141 AString host, path, user, pass; 142 unsigned port; 143 CHECK(ARTSPConnection::ParseURL( 144 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 145 146 if (user.size() > 0) { 147 mSessionURL.clear(); 148 mSessionURL.append("rtsp://"); 149 mSessionURL.append(host); 150 mSessionURL.append(":"); 151 mSessionURL.append(StringPrintf("%u", port)); 152 mSessionURL.append(path); 153 154 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 155 } 156 157 mSessionHost = host; 158 } 159 160 void connect() { 161 looper()->registerHandler(mConn); 162 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 163 164 sp<AMessage> notify = new AMessage('biny', id()); 165 mConn->observeBinaryData(notify); 166 167 sp<AMessage> reply = new AMessage('conn', id()); 168 mConn->connect(mOriginalSessionURL.c_str(), reply); 169 } 170 171 void disconnect() { 172 (new AMessage('abor', id()))->post(); 173 } 174 175 void seek(int64_t timeUs) { 176 sp<AMessage> msg = new AMessage('seek', id()); 177 msg->setInt64("time", timeUs); 178 msg->post(); 179 } 180 181 static void addRR(const sp<ABuffer> &buf) { 182 uint8_t *ptr = buf->data() + buf->size(); 183 ptr[0] = 0x80 | 0; 184 ptr[1] = 201; // RR 185 ptr[2] = 0; 186 ptr[3] = 1; 187 ptr[4] = 0xde; // SSRC 188 ptr[5] = 0xad; 189 ptr[6] = 0xbe; 190 ptr[7] = 0xef; 191 192 buf->setRange(0, buf->size() + 8); 193 } 194 195 static void addSDES(int s, const sp<ABuffer> &buffer) { 196 struct sockaddr_in addr; 197 socklen_t addrSize = sizeof(addr); 198 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 199 200 uint8_t *data = buffer->data() + buffer->size(); 201 data[0] = 0x80 | 1; 202 data[1] = 202; // SDES 203 data[4] = 0xde; // SSRC 204 data[5] = 0xad; 205 data[6] = 0xbe; 206 data[7] = 0xef; 207 208 size_t offset = 8; 209 210 data[offset++] = 1; // CNAME 211 212 AString cname = "stagefright@"; 213 cname.append(inet_ntoa(addr.sin_addr)); 214 data[offset++] = cname.size(); 215 216 memcpy(&data[offset], cname.c_str(), cname.size()); 217 offset += cname.size(); 218 219 data[offset++] = 6; // TOOL 220 221 AString tool; 222 MakeUserAgentString(&tool); 223 224 data[offset++] = tool.size(); 225 226 memcpy(&data[offset], tool.c_str(), tool.size()); 227 offset += tool.size(); 228 229 data[offset++] = 0; 230 231 if ((offset % 4) > 0) { 232 size_t count = 4 - (offset % 4); 233 switch (count) { 234 case 3: 235 data[offset++] = 0; 236 case 2: 237 data[offset++] = 0; 238 case 1: 239 data[offset++] = 0; 240 } 241 } 242 243 size_t numWords = (offset / 4) - 1; 244 data[2] = numWords >> 8; 245 data[3] = numWords & 0xff; 246 247 buffer->setRange(buffer->offset(), buffer->size() + offset); 248 } 249 250 // In case we're behind NAT, fire off two UDP packets to the remote 251 // rtp/rtcp ports to poke a hole into the firewall for future incoming 252 // packets. We're going to send an RR/SDES RTCP packet to both of them. 253 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 254 struct sockaddr_in addr; 255 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 256 addr.sin_family = AF_INET; 257 258 AString source; 259 AString server_port; 260 if (!GetAttribute(transport.c_str(), 261 "source", 262 &source)) { 263 LOGW("Missing 'source' field in Transport response. Using " 264 "RTSP endpoint address."); 265 266 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 267 if (ent == NULL) { 268 LOGE("Failed to look up address of session host '%s'", 269 mSessionHost.c_str()); 270 271 return false; 272 } 273 274 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 275 } else { 276 addr.sin_addr.s_addr = inet_addr(source.c_str()); 277 } 278 279 if (!GetAttribute(transport.c_str(), 280 "server_port", 281 &server_port)) { 282 LOGI("Missing 'server_port' field in Transport response."); 283 return false; 284 } 285 286 int rtpPort, rtcpPort; 287 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 288 || rtpPort <= 0 || rtpPort > 65535 289 || rtcpPort <=0 || rtcpPort > 65535 290 || rtcpPort != rtpPort + 1) { 291 LOGE("Server picked invalid RTP/RTCP port pair %s," 292 " RTP port must be even, RTCP port must be one higher.", 293 server_port.c_str()); 294 295 return false; 296 } 297 298 if (rtpPort & 1) { 299 LOGW("Server picked an odd RTP port, it should've picked an " 300 "even one, we'll let it pass for now, but this may break " 301 "in the future."); 302 } 303 304 if (addr.sin_addr.s_addr == INADDR_NONE) { 305 return true; 306 } 307 308 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 309 // No firewalls to traverse on the loopback interface. 310 return true; 311 } 312 313 // Make up an RR/SDES RTCP packet. 314 sp<ABuffer> buf = new ABuffer(65536); 315 buf->setRange(0, 0); 316 addRR(buf); 317 addSDES(rtpSocket, buf); 318 319 addr.sin_port = htons(rtpPort); 320 321 ssize_t n = sendto( 322 rtpSocket, buf->data(), buf->size(), 0, 323 (const sockaddr *)&addr, sizeof(addr)); 324 325 if (n < (ssize_t)buf->size()) { 326 LOGE("failed to poke a hole for RTP packets"); 327 return false; 328 } 329 330 addr.sin_port = htons(rtcpPort); 331 332 n = sendto( 333 rtcpSocket, buf->data(), buf->size(), 0, 334 (const sockaddr *)&addr, sizeof(addr)); 335 336 if (n < (ssize_t)buf->size()) { 337 LOGE("failed to poke a hole for RTCP packets"); 338 return false; 339 } 340 341 LOGV("successfully poked holes."); 342 343 return true; 344 } 345 346 virtual void onMessageReceived(const sp<AMessage> &msg) { 347 switch (msg->what()) { 348 case 'conn': 349 { 350 int32_t result; 351 CHECK(msg->findInt32("result", &result)); 352 353 LOGI("connection request completed with result %d (%s)", 354 result, strerror(-result)); 355 356 if (result == OK) { 357 AString request; 358 request = "DESCRIBE "; 359 request.append(mSessionURL); 360 request.append(" RTSP/1.0\r\n"); 361 request.append("Accept: application/sdp\r\n"); 362 request.append("\r\n"); 363 364 sp<AMessage> reply = new AMessage('desc', id()); 365 mConn->sendRequest(request.c_str(), reply); 366 } else { 367 (new AMessage('disc', id()))->post(); 368 } 369 break; 370 } 371 372 case 'disc': 373 { 374 int32_t reconnect; 375 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 376 sp<AMessage> reply = new AMessage('conn', id()); 377 mConn->connect(mOriginalSessionURL.c_str(), reply); 378 } else { 379 (new AMessage('quit', id()))->post(); 380 } 381 break; 382 } 383 384 case 'desc': 385 { 386 int32_t result; 387 CHECK(msg->findInt32("result", &result)); 388 389 LOGI("DESCRIBE completed with result %d (%s)", 390 result, strerror(-result)); 391 392 if (result == OK) { 393 sp<RefBase> obj; 394 CHECK(msg->findObject("response", &obj)); 395 sp<ARTSPResponse> response = 396 static_cast<ARTSPResponse *>(obj.get()); 397 398 if (response->mStatusCode == 302) { 399 ssize_t i = response->mHeaders.indexOfKey("location"); 400 CHECK_GE(i, 0); 401 402 mSessionURL = response->mHeaders.valueAt(i); 403 404 AString request; 405 request = "DESCRIBE "; 406 request.append(mSessionURL); 407 request.append(" RTSP/1.0\r\n"); 408 request.append("Accept: application/sdp\r\n"); 409 request.append("\r\n"); 410 411 sp<AMessage> reply = new AMessage('desc', id()); 412 mConn->sendRequest(request.c_str(), reply); 413 break; 414 } 415 416 if (response->mStatusCode != 200) { 417 result = UNKNOWN_ERROR; 418 } else { 419 mSessionDesc = new ASessionDescription; 420 421 mSessionDesc->setTo( 422 response->mContent->data(), 423 response->mContent->size()); 424 425 if (!mSessionDesc->isValid()) { 426 LOGE("Failed to parse session description."); 427 result = ERROR_MALFORMED; 428 } else { 429 ssize_t i = response->mHeaders.indexOfKey("content-base"); 430 if (i >= 0) { 431 mBaseURL = response->mHeaders.valueAt(i); 432 } else { 433 i = response->mHeaders.indexOfKey("content-location"); 434 if (i >= 0) { 435 mBaseURL = response->mHeaders.valueAt(i); 436 } else { 437 mBaseURL = mSessionURL; 438 } 439 } 440 441 if (!mBaseURL.startsWith("rtsp://")) { 442 // Some misbehaving servers specify a relative 443 // URL in one of the locations above, combine 444 // it with the absolute session URL to get 445 // something usable... 446 447 LOGW("Server specified a non-absolute base URL" 448 ", combining it with the session URL to " 449 "get something usable..."); 450 451 AString tmp; 452 CHECK(MakeURL( 453 mSessionURL.c_str(), 454 mBaseURL.c_str(), 455 &tmp)); 456 457 mBaseURL = tmp; 458 } 459 460 CHECK_GT(mSessionDesc->countTracks(), 1u); 461 setupTrack(1); 462 } 463 } 464 } 465 466 if (result != OK) { 467 sp<AMessage> reply = new AMessage('disc', id()); 468 mConn->disconnect(reply); 469 } 470 break; 471 } 472 473 case 'setu': 474 { 475 size_t index; 476 CHECK(msg->findSize("index", &index)); 477 478 TrackInfo *track = NULL; 479 size_t trackIndex; 480 if (msg->findSize("track-index", &trackIndex)) { 481 track = &mTracks.editItemAt(trackIndex); 482 } 483 484 int32_t result; 485 CHECK(msg->findInt32("result", &result)); 486 487 LOGI("SETUP(%d) completed with result %d (%s)", 488 index, result, strerror(-result)); 489 490 if (result == OK) { 491 CHECK(track != NULL); 492 493 sp<RefBase> obj; 494 CHECK(msg->findObject("response", &obj)); 495 sp<ARTSPResponse> response = 496 static_cast<ARTSPResponse *>(obj.get()); 497 498 if (response->mStatusCode != 200) { 499 result = UNKNOWN_ERROR; 500 } else { 501 ssize_t i = response->mHeaders.indexOfKey("session"); 502 CHECK_GE(i, 0); 503 504 mSessionID = response->mHeaders.valueAt(i); 505 i = mSessionID.find(";"); 506 if (i >= 0) { 507 // Remove options, i.e. ";timeout=90" 508 mSessionID.erase(i, mSessionID.size() - i); 509 } 510 511 sp<AMessage> notify = new AMessage('accu', id()); 512 notify->setSize("track-index", trackIndex); 513 514 i = response->mHeaders.indexOfKey("transport"); 515 CHECK_GE(i, 0); 516 517 if (!track->mUsingInterleavedTCP) { 518 AString transport = response->mHeaders.valueAt(i); 519 520 // We are going to continue even if we were 521 // unable to poke a hole into the firewall... 522 pokeAHole( 523 track->mRTPSocket, 524 track->mRTCPSocket, 525 transport); 526 } 527 528 mRTPConn->addStream( 529 track->mRTPSocket, track->mRTCPSocket, 530 mSessionDesc, index, 531 notify, track->mUsingInterleavedTCP); 532 533 mSetupTracksSuccessful = true; 534 } 535 } 536 537 if (result != OK) { 538 if (track) { 539 if (!track->mUsingInterleavedTCP) { 540 // Clear the tag 541 if (mUIDValid) { 542 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 543 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 544 } 545 546 close(track->mRTPSocket); 547 close(track->mRTCPSocket); 548 } 549 550 mTracks.removeItemsAt(trackIndex); 551 } 552 } 553 554 ++index; 555 if (index < mSessionDesc->countTracks()) { 556 setupTrack(index); 557 } else if (mSetupTracksSuccessful) { 558 AString request = "PLAY "; 559 request.append(mSessionURL); 560 request.append(" RTSP/1.0\r\n"); 561 562 request.append("Session: "); 563 request.append(mSessionID); 564 request.append("\r\n"); 565 566 request.append("\r\n"); 567 568 sp<AMessage> reply = new AMessage('play', id()); 569 mConn->sendRequest(request.c_str(), reply); 570 } else { 571 sp<AMessage> reply = new AMessage('disc', id()); 572 mConn->disconnect(reply); 573 } 574 break; 575 } 576 577 case 'play': 578 { 579 int32_t result; 580 CHECK(msg->findInt32("result", &result)); 581 582 LOGI("PLAY completed with result %d (%s)", 583 result, strerror(-result)); 584 585 if (result == OK) { 586 sp<RefBase> obj; 587 CHECK(msg->findObject("response", &obj)); 588 sp<ARTSPResponse> response = 589 static_cast<ARTSPResponse *>(obj.get()); 590 591 if (response->mStatusCode != 200) { 592 result = UNKNOWN_ERROR; 593 } else { 594 parsePlayResponse(response); 595 596 sp<AMessage> timeout = new AMessage('tiou', id()); 597 timeout->post(kStartupTimeoutUs); 598 } 599 } 600 601 if (result != OK) { 602 sp<AMessage> reply = new AMessage('disc', id()); 603 mConn->disconnect(reply); 604 } 605 606 break; 607 } 608 609 case 'abor': 610 { 611 for (size_t i = 0; i < mTracks.size(); ++i) { 612 TrackInfo *info = &mTracks.editItemAt(i); 613 614 if (!mFirstAccessUnit) { 615 postQueueEOS(i, ERROR_END_OF_STREAM); 616 } 617 618 if (!info->mUsingInterleavedTCP) { 619 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 620 621 // Clear the tag 622 if (mUIDValid) { 623 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 624 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 625 } 626 627 close(info->mRTPSocket); 628 close(info->mRTCPSocket); 629 } 630 } 631 mTracks.clear(); 632 mSetupTracksSuccessful = false; 633 mSeekPending = false; 634 mFirstAccessUnit = true; 635 mNTPAnchorUs = -1; 636 mMediaAnchorUs = -1; 637 mNumAccessUnitsReceived = 0; 638 mReceivedFirstRTCPPacket = false; 639 mReceivedFirstRTPPacket = false; 640 mSeekable = false; 641 642 sp<AMessage> reply = new AMessage('tear', id()); 643 644 int32_t reconnect; 645 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 646 reply->setInt32("reconnect", true); 647 } 648 649 AString request; 650 request = "TEARDOWN "; 651 652 // XXX should use aggregate url from SDP here... 653 request.append(mSessionURL); 654 request.append(" RTSP/1.0\r\n"); 655 656 request.append("Session: "); 657 request.append(mSessionID); 658 request.append("\r\n"); 659 660 request.append("\r\n"); 661 662 mConn->sendRequest(request.c_str(), reply); 663 break; 664 } 665 666 case 'tear': 667 { 668 int32_t result; 669 CHECK(msg->findInt32("result", &result)); 670 671 LOGI("TEARDOWN completed with result %d (%s)", 672 result, strerror(-result)); 673 674 sp<AMessage> reply = new AMessage('disc', id()); 675 676 int32_t reconnect; 677 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 678 reply->setInt32("reconnect", true); 679 } 680 681 mConn->disconnect(reply); 682 break; 683 } 684 685 case 'quit': 686 { 687 sp<AMessage> msg = mNotify->dup(); 688 msg->setInt32("what", kWhatDisconnected); 689 msg->setInt32("result", UNKNOWN_ERROR); 690 msg->post(); 691 break; 692 } 693 694 case 'chek': 695 { 696 int32_t generation; 697 CHECK(msg->findInt32("generation", &generation)); 698 if (generation != mCheckGeneration) { 699 // This is an outdated message. Ignore. 700 break; 701 } 702 703 if (mNumAccessUnitsReceived == 0) { 704 LOGI("stream ended? aborting."); 705 (new AMessage('abor', id()))->post(); 706 break; 707 } 708 709 mNumAccessUnitsReceived = 0; 710 msg->post(kAccessUnitTimeoutUs); 711 break; 712 } 713 714 case 'accu': 715 { 716 int32_t timeUpdate; 717 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 718 size_t trackIndex; 719 CHECK(msg->findSize("track-index", &trackIndex)); 720 721 uint32_t rtpTime; 722 uint64_t ntpTime; 723 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 724 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 725 726 onTimeUpdate(trackIndex, rtpTime, ntpTime); 727 break; 728 } 729 730 int32_t first; 731 if (msg->findInt32("first-rtcp", &first)) { 732 mReceivedFirstRTCPPacket = true; 733 break; 734 } 735 736 if (msg->findInt32("first-rtp", &first)) { 737 mReceivedFirstRTPPacket = true; 738 break; 739 } 740 741 ++mNumAccessUnitsReceived; 742 postAccessUnitTimeoutCheck(); 743 744 size_t trackIndex; 745 CHECK(msg->findSize("track-index", &trackIndex)); 746 747 if (trackIndex >= mTracks.size()) { 748 LOGV("late packets ignored."); 749 break; 750 } 751 752 TrackInfo *track = &mTracks.editItemAt(trackIndex); 753 754 int32_t eos; 755 if (msg->findInt32("eos", &eos)) { 756 LOGI("received BYE on track index %d", trackIndex); 757#if 0 758 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 759#endif 760 return; 761 } 762 763 sp<RefBase> obj; 764 CHECK(msg->findObject("access-unit", &obj)); 765 766 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 767 768 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 769 770 if (mSeekPending) { 771 LOGV("we're seeking, dropping stale packet."); 772 break; 773 } 774 775 if (seqNum < track->mFirstSeqNumInSegment) { 776 LOGV("dropping stale access-unit (%d < %d)", 777 seqNum, track->mFirstSeqNumInSegment); 778 break; 779 } 780 781 if (track->mNewSegment) { 782 track->mNewSegment = false; 783 } 784 785 onAccessUnitComplete(trackIndex, accessUnit); 786 break; 787 } 788 789 case 'seek': 790 { 791 if (!mSeekable) { 792 LOGW("This is a live stream, ignoring seek request."); 793 794 sp<AMessage> msg = mNotify->dup(); 795 msg->setInt32("what", kWhatSeekDone); 796 msg->post(); 797 break; 798 } 799 800 int64_t timeUs; 801 CHECK(msg->findInt64("time", &timeUs)); 802 803 mSeekPending = true; 804 805 // Disable the access unit timeout until we resumed 806 // playback again. 807 mCheckPending = true; 808 ++mCheckGeneration; 809 810 AString request = "PAUSE "; 811 request.append(mSessionURL); 812 request.append(" RTSP/1.0\r\n"); 813 814 request.append("Session: "); 815 request.append(mSessionID); 816 request.append("\r\n"); 817 818 request.append("\r\n"); 819 820 sp<AMessage> reply = new AMessage('see1', id()); 821 reply->setInt64("time", timeUs); 822 mConn->sendRequest(request.c_str(), reply); 823 break; 824 } 825 826 case 'see1': 827 { 828 // Session is paused now. 829 for (size_t i = 0; i < mTracks.size(); ++i) { 830 TrackInfo *info = &mTracks.editItemAt(i); 831 832 postQueueSeekDiscontinuity(i); 833 834 info->mRTPAnchor = 0; 835 info->mNTPAnchorUs = -1; 836 } 837 838 mNTPAnchorUs = -1; 839 840 int64_t timeUs; 841 CHECK(msg->findInt64("time", &timeUs)); 842 843 AString request = "PLAY "; 844 request.append(mSessionURL); 845 request.append(" RTSP/1.0\r\n"); 846 847 request.append("Session: "); 848 request.append(mSessionID); 849 request.append("\r\n"); 850 851 request.append( 852 StringPrintf( 853 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 854 855 request.append("\r\n"); 856 857 sp<AMessage> reply = new AMessage('see2', id()); 858 mConn->sendRequest(request.c_str(), reply); 859 break; 860 } 861 862 case 'see2': 863 { 864 CHECK(mSeekPending); 865 866 int32_t result; 867 CHECK(msg->findInt32("result", &result)); 868 869 LOGI("PLAY completed with result %d (%s)", 870 result, strerror(-result)); 871 872 mCheckPending = false; 873 postAccessUnitTimeoutCheck(); 874 875 if (result == OK) { 876 sp<RefBase> obj; 877 CHECK(msg->findObject("response", &obj)); 878 sp<ARTSPResponse> response = 879 static_cast<ARTSPResponse *>(obj.get()); 880 881 if (response->mStatusCode != 200) { 882 result = UNKNOWN_ERROR; 883 } else { 884 parsePlayResponse(response); 885 886 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 887 CHECK_GE(i, 0); 888 889 LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 890 891 LOGI("seek completed."); 892 } 893 } 894 895 if (result != OK) { 896 LOGE("seek failed, aborting."); 897 (new AMessage('abor', id()))->post(); 898 } 899 900 mSeekPending = false; 901 902 sp<AMessage> msg = mNotify->dup(); 903 msg->setInt32("what", kWhatSeekDone); 904 msg->post(); 905 break; 906 } 907 908 case 'biny': 909 { 910 sp<RefBase> obj; 911 CHECK(msg->findObject("buffer", &obj)); 912 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 913 914 int32_t index; 915 CHECK(buffer->meta()->findInt32("index", &index)); 916 917 mRTPConn->injectPacket(index, buffer); 918 break; 919 } 920 921 case 'tiou': 922 { 923 if (!mReceivedFirstRTCPPacket) { 924 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 925 LOGW("We received RTP packets but no RTCP packets, " 926 "using fake timestamps."); 927 928 mTryFakeRTCP = true; 929 930 mReceivedFirstRTCPPacket = true; 931 932 fakeTimestamps(); 933 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 934 LOGW("Never received any data, switching transports."); 935 936 mTryTCPInterleaving = true; 937 938 sp<AMessage> msg = new AMessage('abor', id()); 939 msg->setInt32("reconnect", true); 940 msg->post(); 941 } else { 942 LOGW("Never received any data, disconnecting."); 943 (new AMessage('abor', id()))->post(); 944 } 945 } 946 break; 947 } 948 949 default: 950 TRESPASS(); 951 break; 952 } 953 } 954 955 void postAccessUnitTimeoutCheck() { 956 if (mCheckPending) { 957 return; 958 } 959 960 mCheckPending = true; 961 sp<AMessage> check = new AMessage('chek', id()); 962 check->setInt32("generation", mCheckGeneration); 963 check->post(kAccessUnitTimeoutUs); 964 } 965 966 static void SplitString( 967 const AString &s, const char *separator, List<AString> *items) { 968 items->clear(); 969 size_t start = 0; 970 while (start < s.size()) { 971 ssize_t offset = s.find(separator, start); 972 973 if (offset < 0) { 974 items->push_back(AString(s, start, s.size() - start)); 975 break; 976 } 977 978 items->push_back(AString(s, start, offset - start)); 979 start = offset + strlen(separator); 980 } 981 } 982 983 void parsePlayResponse(const sp<ARTSPResponse> &response) { 984 mSeekable = false; 985 986 ssize_t i = response->mHeaders.indexOfKey("range"); 987 if (i < 0) { 988 // Server doesn't even tell use what range it is going to 989 // play, therefore we won't support seeking. 990 return; 991 } 992 993 AString range = response->mHeaders.valueAt(i); 994 LOGV("Range: %s", range.c_str()); 995 996 AString val; 997 CHECK(GetAttribute(range.c_str(), "npt", &val)); 998 999 float npt1, npt2; 1000 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1001 // This is a live stream and therefore not seekable. 1002 return; 1003 } 1004 1005 i = response->mHeaders.indexOfKey("rtp-info"); 1006 CHECK_GE(i, 0); 1007 1008 AString rtpInfo = response->mHeaders.valueAt(i); 1009 List<AString> streamInfos; 1010 SplitString(rtpInfo, ",", &streamInfos); 1011 1012 int n = 1; 1013 for (List<AString>::iterator it = streamInfos.begin(); 1014 it != streamInfos.end(); ++it) { 1015 (*it).trim(); 1016 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1017 1018 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1019 1020 size_t trackIndex = 0; 1021 while (trackIndex < mTracks.size() 1022 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1023 ++trackIndex; 1024 } 1025 CHECK_LT(trackIndex, mTracks.size()); 1026 1027 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1028 1029 char *end; 1030 unsigned long seq = strtoul(val.c_str(), &end, 10); 1031 1032 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1033 info->mFirstSeqNumInSegment = seq; 1034 info->mNewSegment = true; 1035 1036 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1037 1038 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1039 1040 LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1041 1042 info->mNormalPlayTimeRTP = rtpTime; 1043 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1044 1045 if (!mFirstAccessUnit) { 1046 postNormalPlayTimeMapping( 1047 trackIndex, 1048 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1049 } 1050 1051 ++n; 1052 } 1053 1054 mSeekable = true; 1055 } 1056 1057 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1058 CHECK_GE(index, 0u); 1059 CHECK_LT(index, mTracks.size()); 1060 1061 const TrackInfo &info = mTracks.itemAt(index); 1062 1063 *timeScale = info.mTimeScale; 1064 1065 return info.mPacketSource->getFormat(); 1066 } 1067 1068 size_t countTracks() const { 1069 return mTracks.size(); 1070 } 1071 1072private: 1073 struct TrackInfo { 1074 AString mURL; 1075 int mRTPSocket; 1076 int mRTCPSocket; 1077 bool mUsingInterleavedTCP; 1078 uint32_t mFirstSeqNumInSegment; 1079 bool mNewSegment; 1080 1081 uint32_t mRTPAnchor; 1082 int64_t mNTPAnchorUs; 1083 int32_t mTimeScale; 1084 1085 uint32_t mNormalPlayTimeRTP; 1086 int64_t mNormalPlayTimeUs; 1087 1088 sp<APacketSource> mPacketSource; 1089 1090 // Stores packets temporarily while no notion of time 1091 // has been established yet. 1092 List<sp<ABuffer> > mPackets; 1093 }; 1094 1095 sp<AMessage> mNotify; 1096 bool mUIDValid; 1097 uid_t mUID; 1098 sp<ALooper> mNetLooper; 1099 sp<ARTSPConnection> mConn; 1100 sp<ARTPConnection> mRTPConn; 1101 sp<ASessionDescription> mSessionDesc; 1102 AString mOriginalSessionURL; // This one still has user:pass@ 1103 AString mSessionURL; 1104 AString mSessionHost; 1105 AString mBaseURL; 1106 AString mSessionID; 1107 bool mSetupTracksSuccessful; 1108 bool mSeekPending; 1109 bool mFirstAccessUnit; 1110 1111 int64_t mNTPAnchorUs; 1112 int64_t mMediaAnchorUs; 1113 int64_t mLastMediaTimeUs; 1114 1115 int64_t mNumAccessUnitsReceived; 1116 bool mCheckPending; 1117 int32_t mCheckGeneration; 1118 bool mTryTCPInterleaving; 1119 bool mTryFakeRTCP; 1120 bool mReceivedFirstRTCPPacket; 1121 bool mReceivedFirstRTPPacket; 1122 bool mSeekable; 1123 1124 Vector<TrackInfo> mTracks; 1125 1126 void setupTrack(size_t index) { 1127 sp<APacketSource> source = 1128 new APacketSource(mSessionDesc, index); 1129 1130 if (source->initCheck() != OK) { 1131 LOGW("Unsupported format. Ignoring track #%d.", index); 1132 1133 sp<AMessage> reply = new AMessage('setu', id()); 1134 reply->setSize("index", index); 1135 reply->setInt32("result", ERROR_UNSUPPORTED); 1136 reply->post(); 1137 return; 1138 } 1139 1140 AString url; 1141 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1142 1143 AString trackURL; 1144 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1145 1146 mTracks.push(TrackInfo()); 1147 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1148 info->mURL = trackURL; 1149 info->mPacketSource = source; 1150 info->mUsingInterleavedTCP = false; 1151 info->mFirstSeqNumInSegment = 0; 1152 info->mNewSegment = true; 1153 info->mRTPAnchor = 0; 1154 info->mNTPAnchorUs = -1; 1155 info->mNormalPlayTimeRTP = 0; 1156 info->mNormalPlayTimeUs = 0ll; 1157 1158 unsigned long PT; 1159 AString formatDesc; 1160 AString formatParams; 1161 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1162 1163 int32_t timescale; 1164 int32_t numChannels; 1165 ASessionDescription::ParseFormatDesc( 1166 formatDesc.c_str(), ×cale, &numChannels); 1167 1168 info->mTimeScale = timescale; 1169 1170 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1171 1172 AString request = "SETUP "; 1173 request.append(trackURL); 1174 request.append(" RTSP/1.0\r\n"); 1175 1176 if (mTryTCPInterleaving) { 1177 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1178 info->mUsingInterleavedTCP = true; 1179 info->mRTPSocket = interleaveIndex; 1180 info->mRTCPSocket = interleaveIndex + 1; 1181 1182 request.append("Transport: RTP/AVP/TCP;interleaved="); 1183 request.append(interleaveIndex); 1184 request.append("-"); 1185 request.append(interleaveIndex + 1); 1186 } else { 1187 unsigned rtpPort; 1188 ARTPConnection::MakePortPair( 1189 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1190 1191 if (mUIDValid) { 1192 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1193 (uint32_t)*(uint32_t*) "RTP_"); 1194 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1195 (uint32_t)*(uint32_t*) "RTP_"); 1196 } 1197 1198 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1199 request.append(rtpPort); 1200 request.append("-"); 1201 request.append(rtpPort + 1); 1202 } 1203 1204 request.append("\r\n"); 1205 1206 if (index > 1) { 1207 request.append("Session: "); 1208 request.append(mSessionID); 1209 request.append("\r\n"); 1210 } 1211 1212 request.append("\r\n"); 1213 1214 sp<AMessage> reply = new AMessage('setu', id()); 1215 reply->setSize("index", index); 1216 reply->setSize("track-index", mTracks.size() - 1); 1217 mConn->sendRequest(request.c_str(), reply); 1218 } 1219 1220 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1221 out->clear(); 1222 1223 if (strncasecmp("rtsp://", baseURL, 7)) { 1224 // Base URL must be absolute 1225 return false; 1226 } 1227 1228 if (!strncasecmp("rtsp://", url, 7)) { 1229 // "url" is already an absolute URL, ignore base URL. 1230 out->setTo(url); 1231 return true; 1232 } 1233 1234 size_t n = strlen(baseURL); 1235 if (baseURL[n - 1] == '/') { 1236 out->setTo(baseURL); 1237 out->append(url); 1238 } else { 1239 const char *slashPos = strrchr(baseURL, '/'); 1240 1241 if (slashPos > &baseURL[6]) { 1242 out->setTo(baseURL, slashPos - baseURL); 1243 } else { 1244 out->setTo(baseURL); 1245 } 1246 1247 out->append("/"); 1248 out->append(url); 1249 } 1250 1251 return true; 1252 } 1253 1254 void fakeTimestamps() { 1255 for (size_t i = 0; i < mTracks.size(); ++i) { 1256 onTimeUpdate(i, 0, 0ll); 1257 } 1258 } 1259 1260 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1261 LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1262 trackIndex, rtpTime, ntpTime); 1263 1264 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1265 1266 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1267 1268 track->mRTPAnchor = rtpTime; 1269 track->mNTPAnchorUs = ntpTimeUs; 1270 1271 if (mNTPAnchorUs < 0) { 1272 mNTPAnchorUs = ntpTimeUs; 1273 mMediaAnchorUs = mLastMediaTimeUs; 1274 } 1275 } 1276 1277 void onAccessUnitComplete( 1278 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1279 LOGV("onAccessUnitComplete track %d", trackIndex); 1280 1281 if (mFirstAccessUnit) { 1282 sp<AMessage> msg = mNotify->dup(); 1283 msg->setInt32("what", kWhatConnected); 1284 msg->post(); 1285 1286 for (size_t i = 0; i < mTracks.size(); ++i) { 1287 TrackInfo *info = &mTracks.editItemAt(i); 1288 1289 postNormalPlayTimeMapping( 1290 i, 1291 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1292 } 1293 1294 mFirstAccessUnit = false; 1295 } 1296 1297 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1298 1299 if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) { 1300 LOGV("storing accessUnit, no time established yet"); 1301 track->mPackets.push_back(accessUnit); 1302 return; 1303 } 1304 1305 while (!track->mPackets.empty()) { 1306 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1307 track->mPackets.erase(track->mPackets.begin()); 1308 1309 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1310 postQueueAccessUnit(trackIndex, accessUnit); 1311 } 1312 } 1313 1314 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1315 postQueueAccessUnit(trackIndex, accessUnit); 1316 } 1317 } 1318 1319 bool addMediaTimestamp( 1320 int32_t trackIndex, const TrackInfo *track, 1321 const sp<ABuffer> &accessUnit) { 1322 uint32_t rtpTime; 1323 CHECK(accessUnit->meta()->findInt32( 1324 "rtp-time", (int32_t *)&rtpTime)); 1325 1326 int64_t relRtpTimeUs = 1327 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1328 / track->mTimeScale; 1329 1330 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1331 1332 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1333 1334 if (mediaTimeUs > mLastMediaTimeUs) { 1335 mLastMediaTimeUs = mediaTimeUs; 1336 } 1337 1338 if (mediaTimeUs < 0) { 1339 LOGV("dropping early accessUnit."); 1340 return false; 1341 } 1342 1343 LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1344 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1345 1346 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1347 1348 return true; 1349 } 1350 1351 void postQueueAccessUnit( 1352 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1353 sp<AMessage> msg = mNotify->dup(); 1354 msg->setInt32("what", kWhatAccessUnit); 1355 msg->setSize("trackIndex", trackIndex); 1356 msg->setObject("accessUnit", accessUnit); 1357 msg->post(); 1358 } 1359 1360 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1361 sp<AMessage> msg = mNotify->dup(); 1362 msg->setInt32("what", kWhatEOS); 1363 msg->setSize("trackIndex", trackIndex); 1364 msg->setInt32("finalResult", finalResult); 1365 msg->post(); 1366 } 1367 1368 void postQueueSeekDiscontinuity(size_t trackIndex) { 1369 sp<AMessage> msg = mNotify->dup(); 1370 msg->setInt32("what", kWhatSeekDiscontinuity); 1371 msg->setSize("trackIndex", trackIndex); 1372 msg->post(); 1373 } 1374 1375 void postNormalPlayTimeMapping( 1376 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1377 sp<AMessage> msg = mNotify->dup(); 1378 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1379 msg->setSize("trackIndex", trackIndex); 1380 msg->setInt32("rtpTime", rtpTime); 1381 msg->setInt64("nptUs", nptUs); 1382 msg->post(); 1383 } 1384 1385 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1386}; 1387 1388} // namespace android 1389 1390#endif // MY_HANDLER_H_ 1391