MyHandler.h revision a6925e6149faf4a936a5b557a769d117454413d8
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41#include <netdb.h> 42 43// If no access units are received within 5 secs, assume that the rtp 44// stream has ended and signal end of stream. 45static int64_t kAccessUnitTimeoutUs = 5000000ll; 46 47// If no access units arrive for the first 10 secs after starting the 48// stream, assume none ever will and signal EOS or switch transports. 49static int64_t kStartupTimeoutUs = 10000000ll; 50 51namespace android { 52 53static void MakeUserAgentString(AString *s) { 54 s->setTo("stagefright/1.1 (Linux;Android "); 55 56#if (PROPERTY_VALUE_MAX < 8) 57#error "PROPERTY_VALUE_MAX must be at least 8" 58#endif 59 60 char value[PROPERTY_VALUE_MAX]; 61 property_get("ro.build.version.release", value, "Unknown"); 62 s->append(value); 63 s->append(")"); 64} 65 66static bool GetAttribute(const char *s, const char *key, AString *value) { 67 value->clear(); 68 69 size_t keyLen = strlen(key); 70 71 for (;;) { 72 while (isspace(*s)) { 73 ++s; 74 } 75 76 const char *colonPos = strchr(s, ';'); 77 78 size_t len = 79 (colonPos == NULL) ? strlen(s) : colonPos - s; 80 81 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 82 value->setTo(&s[keyLen + 1], len - keyLen - 1); 83 return true; 84 } 85 86 if (colonPos == NULL) { 87 return false; 88 } 89 90 s = colonPos + 1; 91 } 92} 93 94struct MyHandler : public AHandler { 95 MyHandler(const char *url, const sp<ALooper> &looper) 96 : mLooper(looper), 97 mNetLooper(new ALooper), 98 mConn(new ARTSPConnection), 99 mRTPConn(new ARTPConnection), 100 mOriginalSessionURL(url), 101 mSessionURL(url), 102 mSetupTracksSuccessful(false), 103 mSeekPending(false), 104 mFirstAccessUnit(true), 105 mNTPAnchorUs(-1), 106 mMediaAnchorUs(-1), 107 mLastMediaTimeUs(0), 108 mNumAccessUnitsReceived(0), 109 mCheckPending(false), 110 mCheckGeneration(0), 111 mTryTCPInterleaving(false), 112 mTryFakeRTCP(false), 113 mReceivedFirstRTCPPacket(false), 114 mReceivedFirstRTPPacket(false), 115 mSeekable(false) { 116 mNetLooper->setName("rtsp net"); 117 mNetLooper->start(false /* runOnCallingThread */, 118 false /* canCallJava */, 119 PRIORITY_HIGHEST); 120 121 // Strip any authentication info from the session url, we don't 122 // want to transmit user/pass in cleartext. 123 AString host, path, user, pass; 124 unsigned port; 125 CHECK(ARTSPConnection::ParseURL( 126 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 127 128 if (user.size() > 0) { 129 mSessionURL.clear(); 130 mSessionURL.append("rtsp://"); 131 mSessionURL.append(host); 132 mSessionURL.append(":"); 133 mSessionURL.append(StringPrintf("%u", port)); 134 mSessionURL.append(path); 135 136 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 137 } 138 139 mSessionHost = host; 140 } 141 142 void connect(const sp<AMessage> &doneMsg) { 143 mDoneMsg = doneMsg; 144 145 mLooper->registerHandler(this); 146 mLooper->registerHandler(mConn); 147 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 148 149 sp<AMessage> notify = new AMessage('biny', id()); 150 mConn->observeBinaryData(notify); 151 152 sp<AMessage> reply = new AMessage('conn', id()); 153 mConn->connect(mOriginalSessionURL.c_str(), reply); 154 } 155 156 void disconnect(const sp<AMessage> &doneMsg) { 157 mDoneMsg = doneMsg; 158 159 (new AMessage('abor', id()))->post(); 160 } 161 162 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 163 sp<AMessage> msg = new AMessage('seek', id()); 164 msg->setInt64("time", timeUs); 165 msg->setMessage("doneMsg", doneMsg); 166 msg->post(); 167 } 168 169 int64_t getNormalPlayTimeUs() { 170 int64_t maxTimeUs = 0; 171 for (size_t i = 0; i < mTracks.size(); ++i) { 172 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 173 ->getNormalPlayTimeUs(); 174 175 if (i == 0 || timeUs > maxTimeUs) { 176 maxTimeUs = timeUs; 177 } 178 } 179 180 return maxTimeUs; 181 } 182 183 static void addRR(const sp<ABuffer> &buf) { 184 uint8_t *ptr = buf->data() + buf->size(); 185 ptr[0] = 0x80 | 0; 186 ptr[1] = 201; // RR 187 ptr[2] = 0; 188 ptr[3] = 1; 189 ptr[4] = 0xde; // SSRC 190 ptr[5] = 0xad; 191 ptr[6] = 0xbe; 192 ptr[7] = 0xef; 193 194 buf->setRange(0, buf->size() + 8); 195 } 196 197 static void addSDES(int s, const sp<ABuffer> &buffer) { 198 struct sockaddr_in addr; 199 socklen_t addrSize = sizeof(addr); 200 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 201 202 uint8_t *data = buffer->data() + buffer->size(); 203 data[0] = 0x80 | 1; 204 data[1] = 202; // SDES 205 data[4] = 0xde; // SSRC 206 data[5] = 0xad; 207 data[6] = 0xbe; 208 data[7] = 0xef; 209 210 size_t offset = 8; 211 212 data[offset++] = 1; // CNAME 213 214 AString cname = "stagefright@"; 215 cname.append(inet_ntoa(addr.sin_addr)); 216 data[offset++] = cname.size(); 217 218 memcpy(&data[offset], cname.c_str(), cname.size()); 219 offset += cname.size(); 220 221 data[offset++] = 6; // TOOL 222 223 AString tool; 224 MakeUserAgentString(&tool); 225 226 data[offset++] = tool.size(); 227 228 memcpy(&data[offset], tool.c_str(), tool.size()); 229 offset += tool.size(); 230 231 data[offset++] = 0; 232 233 if ((offset % 4) > 0) { 234 size_t count = 4 - (offset % 4); 235 switch (count) { 236 case 3: 237 data[offset++] = 0; 238 case 2: 239 data[offset++] = 0; 240 case 1: 241 data[offset++] = 0; 242 } 243 } 244 245 size_t numWords = (offset / 4) - 1; 246 data[2] = numWords >> 8; 247 data[3] = numWords & 0xff; 248 249 buffer->setRange(buffer->offset(), buffer->size() + offset); 250 } 251 252 // In case we're behind NAT, fire off two UDP packets to the remote 253 // rtp/rtcp ports to poke a hole into the firewall for future incoming 254 // packets. We're going to send an RR/SDES RTCP packet to both of them. 255 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 256 struct sockaddr_in addr; 257 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 258 addr.sin_family = AF_INET; 259 260 AString source; 261 AString server_port; 262 if (!GetAttribute(transport.c_str(), 263 "source", 264 &source)) { 265 LOGW("Missing 'source' field in Transport response. Using " 266 "RTSP endpoint address."); 267 268 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 269 if (ent == NULL) { 270 LOGE("Failed to look up address of session host '%s'", 271 mSessionHost.c_str()); 272 273 return false; 274 } 275 276 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 277 } else { 278 addr.sin_addr.s_addr = inet_addr(source.c_str()); 279 } 280 281 if (!GetAttribute(transport.c_str(), 282 "server_port", 283 &server_port)) { 284 LOGI("Missing 'server_port' field in Transport response."); 285 return false; 286 } 287 288 int rtpPort, rtcpPort; 289 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 290 || rtpPort <= 0 || rtpPort > 65535 291 || rtcpPort <=0 || rtcpPort > 65535 292 || rtcpPort != rtpPort + 1) { 293 LOGE("Server picked invalid RTP/RTCP port pair %s," 294 " RTP port must be even, RTCP port must be one higher.", 295 server_port.c_str()); 296 297 return false; 298 } 299 300 if (rtpPort & 1) { 301 LOGW("Server picked an odd RTP port, it should've picked an " 302 "even one, we'll let it pass for now, but this may break " 303 "in the future."); 304 } 305 306 if (addr.sin_addr.s_addr == INADDR_NONE) { 307 return true; 308 } 309 310 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 311 // No firewalls to traverse on the loopback interface. 312 return true; 313 } 314 315 // Make up an RR/SDES RTCP packet. 316 sp<ABuffer> buf = new ABuffer(65536); 317 buf->setRange(0, 0); 318 addRR(buf); 319 addSDES(rtpSocket, buf); 320 321 addr.sin_port = htons(rtpPort); 322 323 ssize_t n = sendto( 324 rtpSocket, buf->data(), buf->size(), 0, 325 (const sockaddr *)&addr, sizeof(addr)); 326 327 if (n < (ssize_t)buf->size()) { 328 LOGE("failed to poke a hole for RTP packets"); 329 return false; 330 } 331 332 addr.sin_port = htons(rtcpPort); 333 334 n = sendto( 335 rtcpSocket, buf->data(), buf->size(), 0, 336 (const sockaddr *)&addr, sizeof(addr)); 337 338 if (n < (ssize_t)buf->size()) { 339 LOGE("failed to poke a hole for RTCP packets"); 340 return false; 341 } 342 343 LOGV("successfully poked holes."); 344 345 return true; 346 } 347 348 virtual void onMessageReceived(const sp<AMessage> &msg) { 349 switch (msg->what()) { 350 case 'conn': 351 { 352 int32_t result; 353 CHECK(msg->findInt32("result", &result)); 354 355 LOGI("connection request completed with result %d (%s)", 356 result, strerror(-result)); 357 358 if (result == OK) { 359 AString request; 360 request = "DESCRIBE "; 361 request.append(mSessionURL); 362 request.append(" RTSP/1.0\r\n"); 363 request.append("Accept: application/sdp\r\n"); 364 request.append("\r\n"); 365 366 sp<AMessage> reply = new AMessage('desc', id()); 367 mConn->sendRequest(request.c_str(), reply); 368 } else { 369 (new AMessage('disc', id()))->post(); 370 } 371 break; 372 } 373 374 case 'disc': 375 { 376 int32_t reconnect; 377 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 378 sp<AMessage> reply = new AMessage('conn', id()); 379 mConn->connect(mOriginalSessionURL.c_str(), reply); 380 } else { 381 (new AMessage('quit', id()))->post(); 382 } 383 break; 384 } 385 386 case 'desc': 387 { 388 int32_t result; 389 CHECK(msg->findInt32("result", &result)); 390 391 LOGI("DESCRIBE completed with result %d (%s)", 392 result, strerror(-result)); 393 394 if (result == OK) { 395 sp<RefBase> obj; 396 CHECK(msg->findObject("response", &obj)); 397 sp<ARTSPResponse> response = 398 static_cast<ARTSPResponse *>(obj.get()); 399 400 if (response->mStatusCode == 302) { 401 ssize_t i = response->mHeaders.indexOfKey("location"); 402 CHECK_GE(i, 0); 403 404 mSessionURL = response->mHeaders.valueAt(i); 405 406 AString request; 407 request = "DESCRIBE "; 408 request.append(mSessionURL); 409 request.append(" RTSP/1.0\r\n"); 410 request.append("Accept: application/sdp\r\n"); 411 request.append("\r\n"); 412 413 sp<AMessage> reply = new AMessage('desc', id()); 414 mConn->sendRequest(request.c_str(), reply); 415 break; 416 } 417 418 if (response->mStatusCode != 200) { 419 result = UNKNOWN_ERROR; 420 } else { 421 mSessionDesc = new ASessionDescription; 422 423 mSessionDesc->setTo( 424 response->mContent->data(), 425 response->mContent->size()); 426 427 if (!mSessionDesc->isValid()) { 428 LOGE("Failed to parse session description."); 429 result = ERROR_MALFORMED; 430 } else { 431 ssize_t i = response->mHeaders.indexOfKey("content-base"); 432 if (i >= 0) { 433 mBaseURL = response->mHeaders.valueAt(i); 434 } else { 435 i = response->mHeaders.indexOfKey("content-location"); 436 if (i >= 0) { 437 mBaseURL = response->mHeaders.valueAt(i); 438 } else { 439 mBaseURL = mSessionURL; 440 } 441 } 442 443 if (!mBaseURL.startsWith("rtsp://")) { 444 // Some misbehaving servers specify a relative 445 // URL in one of the locations above, combine 446 // it with the absolute session URL to get 447 // something usable... 448 449 LOGW("Server specified a non-absolute base URL" 450 ", combining it with the session URL to " 451 "get something usable..."); 452 453 AString tmp; 454 CHECK(MakeURL( 455 mSessionURL.c_str(), 456 mBaseURL.c_str(), 457 &tmp)); 458 459 mBaseURL = tmp; 460 } 461 462 CHECK_GT(mSessionDesc->countTracks(), 1u); 463 setupTrack(1); 464 } 465 } 466 } 467 468 if (result != OK) { 469 sp<AMessage> reply = new AMessage('disc', id()); 470 mConn->disconnect(reply); 471 } 472 break; 473 } 474 475 case 'setu': 476 { 477 size_t index; 478 CHECK(msg->findSize("index", &index)); 479 480 TrackInfo *track = NULL; 481 size_t trackIndex; 482 if (msg->findSize("track-index", &trackIndex)) { 483 track = &mTracks.editItemAt(trackIndex); 484 } 485 486 int32_t result; 487 CHECK(msg->findInt32("result", &result)); 488 489 LOGI("SETUP(%d) completed with result %d (%s)", 490 index, result, strerror(-result)); 491 492 if (result == OK) { 493 CHECK(track != NULL); 494 495 sp<RefBase> obj; 496 CHECK(msg->findObject("response", &obj)); 497 sp<ARTSPResponse> response = 498 static_cast<ARTSPResponse *>(obj.get()); 499 500 if (response->mStatusCode != 200) { 501 result = UNKNOWN_ERROR; 502 } else { 503 ssize_t i = response->mHeaders.indexOfKey("session"); 504 CHECK_GE(i, 0); 505 506 mSessionID = response->mHeaders.valueAt(i); 507 i = mSessionID.find(";"); 508 if (i >= 0) { 509 // Remove options, i.e. ";timeout=90" 510 mSessionID.erase(i, mSessionID.size() - i); 511 } 512 513 sp<AMessage> notify = new AMessage('accu', id()); 514 notify->setSize("track-index", trackIndex); 515 516 i = response->mHeaders.indexOfKey("transport"); 517 CHECK_GE(i, 0); 518 519 if (!track->mUsingInterleavedTCP) { 520 AString transport = response->mHeaders.valueAt(i); 521 522 // We are going to continue even if we were 523 // unable to poke a hole into the firewall... 524 pokeAHole( 525 track->mRTPSocket, 526 track->mRTCPSocket, 527 transport); 528 } 529 530 mRTPConn->addStream( 531 track->mRTPSocket, track->mRTCPSocket, 532 mSessionDesc, index, 533 notify, track->mUsingInterleavedTCP); 534 535 mSetupTracksSuccessful = true; 536 } 537 } 538 539 if (result != OK) { 540 if (track) { 541 if (!track->mUsingInterleavedTCP) { 542 close(track->mRTPSocket); 543 close(track->mRTCPSocket); 544 } 545 546 mTracks.removeItemsAt(trackIndex); 547 } 548 } 549 550 ++index; 551 if (index < mSessionDesc->countTracks()) { 552 setupTrack(index); 553 } else if (mSetupTracksSuccessful) { 554 AString request = "PLAY "; 555 request.append(mSessionURL); 556 request.append(" RTSP/1.0\r\n"); 557 558 request.append("Session: "); 559 request.append(mSessionID); 560 request.append("\r\n"); 561 562 request.append("\r\n"); 563 564 sp<AMessage> reply = new AMessage('play', id()); 565 mConn->sendRequest(request.c_str(), reply); 566 } else { 567 sp<AMessage> reply = new AMessage('disc', id()); 568 mConn->disconnect(reply); 569 } 570 break; 571 } 572 573 case 'play': 574 { 575 int32_t result; 576 CHECK(msg->findInt32("result", &result)); 577 578 LOGI("PLAY completed with result %d (%s)", 579 result, strerror(-result)); 580 581 if (result == OK) { 582 sp<RefBase> obj; 583 CHECK(msg->findObject("response", &obj)); 584 sp<ARTSPResponse> response = 585 static_cast<ARTSPResponse *>(obj.get()); 586 587 if (response->mStatusCode != 200) { 588 result = UNKNOWN_ERROR; 589 } else { 590 parsePlayResponse(response); 591 592 sp<AMessage> timeout = new AMessage('tiou', id()); 593 timeout->post(kStartupTimeoutUs); 594 } 595 } 596 597 if (result != OK) { 598 sp<AMessage> reply = new AMessage('disc', id()); 599 mConn->disconnect(reply); 600 } 601 602 break; 603 } 604 605 case 'abor': 606 { 607 for (size_t i = 0; i < mTracks.size(); ++i) { 608 TrackInfo *info = &mTracks.editItemAt(i); 609 610 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 611 612 if (!info->mUsingInterleavedTCP) { 613 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 614 615 close(info->mRTPSocket); 616 close(info->mRTCPSocket); 617 } 618 } 619 mTracks.clear(); 620 mSetupTracksSuccessful = false; 621 mSeekPending = false; 622 mFirstAccessUnit = true; 623 mNTPAnchorUs = -1; 624 mMediaAnchorUs = -1; 625 mNumAccessUnitsReceived = 0; 626 mReceivedFirstRTCPPacket = false; 627 mReceivedFirstRTPPacket = false; 628 mSeekable = false; 629 630 sp<AMessage> reply = new AMessage('tear', id()); 631 632 int32_t reconnect; 633 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 634 reply->setInt32("reconnect", true); 635 } 636 637 AString request; 638 request = "TEARDOWN "; 639 640 // XXX should use aggregate url from SDP here... 641 request.append(mSessionURL); 642 request.append(" RTSP/1.0\r\n"); 643 644 request.append("Session: "); 645 request.append(mSessionID); 646 request.append("\r\n"); 647 648 request.append("\r\n"); 649 650 mConn->sendRequest(request.c_str(), reply); 651 break; 652 } 653 654 case 'tear': 655 { 656 int32_t result; 657 CHECK(msg->findInt32("result", &result)); 658 659 LOGI("TEARDOWN completed with result %d (%s)", 660 result, strerror(-result)); 661 662 sp<AMessage> reply = new AMessage('disc', id()); 663 664 int32_t reconnect; 665 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 666 reply->setInt32("reconnect", true); 667 } 668 669 mConn->disconnect(reply); 670 break; 671 } 672 673 case 'quit': 674 { 675 if (mDoneMsg != NULL) { 676 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 677 mDoneMsg->post(); 678 mDoneMsg = NULL; 679 } 680 break; 681 } 682 683 case 'chek': 684 { 685 int32_t generation; 686 CHECK(msg->findInt32("generation", &generation)); 687 if (generation != mCheckGeneration) { 688 // This is an outdated message. Ignore. 689 break; 690 } 691 692 if (mNumAccessUnitsReceived == 0) { 693 LOGI("stream ended? aborting."); 694 (new AMessage('abor', id()))->post(); 695 break; 696 } 697 698 mNumAccessUnitsReceived = 0; 699 msg->post(kAccessUnitTimeoutUs); 700 break; 701 } 702 703 case 'accu': 704 { 705 int32_t timeUpdate; 706 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 707 size_t trackIndex; 708 CHECK(msg->findSize("track-index", &trackIndex)); 709 710 uint32_t rtpTime; 711 uint64_t ntpTime; 712 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 713 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 714 715 onTimeUpdate(trackIndex, rtpTime, ntpTime); 716 break; 717 } 718 719 int32_t first; 720 if (msg->findInt32("first-rtcp", &first)) { 721 mReceivedFirstRTCPPacket = true; 722 break; 723 } 724 725 if (msg->findInt32("first-rtp", &first)) { 726 mReceivedFirstRTPPacket = true; 727 break; 728 } 729 730 ++mNumAccessUnitsReceived; 731 postAccessUnitTimeoutCheck(); 732 733 size_t trackIndex; 734 CHECK(msg->findSize("track-index", &trackIndex)); 735 736 if (trackIndex >= mTracks.size()) { 737 LOGV("late packets ignored."); 738 break; 739 } 740 741 TrackInfo *track = &mTracks.editItemAt(trackIndex); 742 743 int32_t eos; 744 if (msg->findInt32("eos", &eos)) { 745 LOGI("received BYE on track index %d", trackIndex); 746#if 0 747 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 748#endif 749 return; 750 } 751 752 sp<RefBase> obj; 753 CHECK(msg->findObject("access-unit", &obj)); 754 755 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 756 757 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 758 759 if (mSeekPending) { 760 LOGV("we're seeking, dropping stale packet."); 761 break; 762 } 763 764 if (seqNum < track->mFirstSeqNumInSegment) { 765 LOGV("dropping stale access-unit (%d < %d)", 766 seqNum, track->mFirstSeqNumInSegment); 767 break; 768 } 769 770 if (track->mNewSegment) { 771 track->mNewSegment = false; 772 } 773 774 onAccessUnitComplete(trackIndex, accessUnit); 775 break; 776 } 777 778 case 'seek': 779 { 780 sp<AMessage> doneMsg; 781 CHECK(msg->findMessage("doneMsg", &doneMsg)); 782 783 if (mSeekPending) { 784 doneMsg->post(); 785 break; 786 } 787 788 if (!mSeekable) { 789 LOGW("This is a live stream, ignoring seek request."); 790 doneMsg->post(); 791 break; 792 } 793 794 int64_t timeUs; 795 CHECK(msg->findInt64("time", &timeUs)); 796 797 mSeekPending = true; 798 799 // Disable the access unit timeout until we resumed 800 // playback again. 801 mCheckPending = true; 802 ++mCheckGeneration; 803 804 AString request = "PAUSE "; 805 request.append(mSessionURL); 806 request.append(" RTSP/1.0\r\n"); 807 808 request.append("Session: "); 809 request.append(mSessionID); 810 request.append("\r\n"); 811 812 request.append("\r\n"); 813 814 sp<AMessage> reply = new AMessage('see1', id()); 815 reply->setInt64("time", timeUs); 816 reply->setMessage("doneMsg", doneMsg); 817 mConn->sendRequest(request.c_str(), reply); 818 break; 819 } 820 821 case 'see1': 822 { 823 // Session is paused now. 824 for (size_t i = 0; i < mTracks.size(); ++i) { 825 TrackInfo *info = &mTracks.editItemAt(i); 826 827 info->mPacketSource->flushQueue(); 828 info->mRTPAnchor = 0; 829 info->mNTPAnchorUs = -1; 830 } 831 832 mNTPAnchorUs = -1; 833 834 int64_t timeUs; 835 CHECK(msg->findInt64("time", &timeUs)); 836 837 AString request = "PLAY "; 838 request.append(mSessionURL); 839 request.append(" RTSP/1.0\r\n"); 840 841 request.append("Session: "); 842 request.append(mSessionID); 843 request.append("\r\n"); 844 845 request.append( 846 StringPrintf( 847 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 848 849 request.append("\r\n"); 850 851 sp<AMessage> doneMsg; 852 CHECK(msg->findMessage("doneMsg", &doneMsg)); 853 854 sp<AMessage> reply = new AMessage('see2', id()); 855 reply->setMessage("doneMsg", doneMsg); 856 mConn->sendRequest(request.c_str(), reply); 857 break; 858 } 859 860 case 'see2': 861 { 862 CHECK(mSeekPending); 863 864 int32_t result; 865 CHECK(msg->findInt32("result", &result)); 866 867 LOGI("PLAY completed with result %d (%s)", 868 result, strerror(-result)); 869 870 mCheckPending = false; 871 postAccessUnitTimeoutCheck(); 872 873 if (result == OK) { 874 sp<RefBase> obj; 875 CHECK(msg->findObject("response", &obj)); 876 sp<ARTSPResponse> response = 877 static_cast<ARTSPResponse *>(obj.get()); 878 879 if (response->mStatusCode != 200) { 880 result = UNKNOWN_ERROR; 881 } else { 882 parsePlayResponse(response); 883 884 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 885 CHECK_GE(i, 0); 886 887 LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 888 889 LOGI("seek completed."); 890 } 891 } 892 893 if (result != OK) { 894 LOGE("seek failed, aborting."); 895 (new AMessage('abor', id()))->post(); 896 } 897 898 mSeekPending = false; 899 900 sp<AMessage> doneMsg; 901 CHECK(msg->findMessage("doneMsg", &doneMsg)); 902 903 doneMsg->post(); 904 break; 905 } 906 907 case 'biny': 908 { 909 sp<RefBase> obj; 910 CHECK(msg->findObject("buffer", &obj)); 911 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 912 913 int32_t index; 914 CHECK(buffer->meta()->findInt32("index", &index)); 915 916 mRTPConn->injectPacket(index, buffer); 917 break; 918 } 919 920 case 'tiou': 921 { 922 if (!mReceivedFirstRTCPPacket) { 923 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 924 LOGW("We received RTP packets but no RTCP packets, " 925 "using fake timestamps."); 926 927 mTryFakeRTCP = true; 928 929 mReceivedFirstRTCPPacket = true; 930 931 fakeTimestamps(); 932 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 933 LOGW("Never received any data, switching transports."); 934 935 mTryTCPInterleaving = true; 936 937 sp<AMessage> msg = new AMessage('abor', id()); 938 msg->setInt32("reconnect", true); 939 msg->post(); 940 } else { 941 LOGW("Never received any data, disconnecting."); 942 (new AMessage('abor', id()))->post(); 943 } 944 } 945 break; 946 } 947 948 default: 949 TRESPASS(); 950 break; 951 } 952 } 953 954 void postAccessUnitTimeoutCheck() { 955 if (mCheckPending) { 956 return; 957 } 958 959 mCheckPending = true; 960 sp<AMessage> check = new AMessage('chek', id()); 961 check->setInt32("generation", mCheckGeneration); 962 check->post(kAccessUnitTimeoutUs); 963 } 964 965 static void SplitString( 966 const AString &s, const char *separator, List<AString> *items) { 967 items->clear(); 968 size_t start = 0; 969 while (start < s.size()) { 970 ssize_t offset = s.find(separator, start); 971 972 if (offset < 0) { 973 items->push_back(AString(s, start, s.size() - start)); 974 break; 975 } 976 977 items->push_back(AString(s, start, offset - start)); 978 start = offset + strlen(separator); 979 } 980 } 981 982 void parsePlayResponse(const sp<ARTSPResponse> &response) { 983 mSeekable = false; 984 985 ssize_t i = response->mHeaders.indexOfKey("range"); 986 if (i < 0) { 987 // Server doesn't even tell use what range it is going to 988 // play, therefore we won't support seeking. 989 return; 990 } 991 992 AString range = response->mHeaders.valueAt(i); 993 LOGV("Range: %s", range.c_str()); 994 995 AString val; 996 CHECK(GetAttribute(range.c_str(), "npt", &val)); 997 998 bool seekable = true; 999 1000 float npt1, npt2; 1001 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1002 // This is a live stream and therefore not seekable. 1003 seekable = false; 1004 } 1005 1006 i = response->mHeaders.indexOfKey("rtp-info"); 1007 CHECK_GE(i, 0); 1008 1009 AString rtpInfo = response->mHeaders.valueAt(i); 1010 List<AString> streamInfos; 1011 SplitString(rtpInfo, ",", &streamInfos); 1012 1013 int n = 1; 1014 for (List<AString>::iterator it = streamInfos.begin(); 1015 it != streamInfos.end(); ++it) { 1016 (*it).trim(); 1017 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1018 1019 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1020 1021 size_t trackIndex = 0; 1022 while (trackIndex < mTracks.size() 1023 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1024 ++trackIndex; 1025 } 1026 CHECK_LT(trackIndex, mTracks.size()); 1027 1028 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1029 1030 char *end; 1031 unsigned long seq = strtoul(val.c_str(), &end, 10); 1032 1033 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1034 info->mFirstSeqNumInSegment = seq; 1035 info->mNewSegment = true; 1036 1037 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1038 1039 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1040 1041 LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1042 1043 info->mPacketSource->setNormalPlayTimeMapping( 1044 rtpTime, (int64_t)(npt1 * 1E6)); 1045 1046 ++n; 1047 } 1048 1049 mSeekable = seekable; 1050 } 1051 1052 sp<APacketSource> getPacketSource(size_t index) { 1053 CHECK_GE(index, 0u); 1054 CHECK_LT(index, mTracks.size()); 1055 1056 return mTracks.editItemAt(index).mPacketSource; 1057 } 1058 1059 size_t countTracks() const { 1060 return mTracks.size(); 1061 } 1062 1063private: 1064 struct TrackInfo { 1065 AString mURL; 1066 int mRTPSocket; 1067 int mRTCPSocket; 1068 bool mUsingInterleavedTCP; 1069 uint32_t mFirstSeqNumInSegment; 1070 bool mNewSegment; 1071 1072 uint32_t mRTPAnchor; 1073 int64_t mNTPAnchorUs; 1074 int32_t mTimeScale; 1075 1076 sp<APacketSource> mPacketSource; 1077 1078 // Stores packets temporarily while no notion of time 1079 // has been established yet. 1080 List<sp<ABuffer> > mPackets; 1081 }; 1082 1083 sp<ALooper> mLooper; 1084 sp<ALooper> mNetLooper; 1085 sp<ARTSPConnection> mConn; 1086 sp<ARTPConnection> mRTPConn; 1087 sp<ASessionDescription> mSessionDesc; 1088 AString mOriginalSessionURL; // This one still has user:pass@ 1089 AString mSessionURL; 1090 AString mSessionHost; 1091 AString mBaseURL; 1092 AString mSessionID; 1093 bool mSetupTracksSuccessful; 1094 bool mSeekPending; 1095 bool mFirstAccessUnit; 1096 1097 int64_t mNTPAnchorUs; 1098 int64_t mMediaAnchorUs; 1099 int64_t mLastMediaTimeUs; 1100 1101 int64_t mNumAccessUnitsReceived; 1102 bool mCheckPending; 1103 int32_t mCheckGeneration; 1104 bool mTryTCPInterleaving; 1105 bool mTryFakeRTCP; 1106 bool mReceivedFirstRTCPPacket; 1107 bool mReceivedFirstRTPPacket; 1108 bool mSeekable; 1109 1110 Vector<TrackInfo> mTracks; 1111 1112 sp<AMessage> mDoneMsg; 1113 1114 void setupTrack(size_t index) { 1115 sp<APacketSource> source = 1116 new APacketSource(mSessionDesc, index); 1117 1118 if (source->initCheck() != OK) { 1119 LOGW("Unsupported format. Ignoring track #%d.", index); 1120 1121 sp<AMessage> reply = new AMessage('setu', id()); 1122 reply->setSize("index", index); 1123 reply->setInt32("result", ERROR_UNSUPPORTED); 1124 reply->post(); 1125 return; 1126 } 1127 1128 AString url; 1129 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1130 1131 AString trackURL; 1132 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1133 1134 mTracks.push(TrackInfo()); 1135 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1136 info->mURL = trackURL; 1137 info->mPacketSource = source; 1138 info->mUsingInterleavedTCP = false; 1139 info->mFirstSeqNumInSegment = 0; 1140 info->mNewSegment = true; 1141 info->mRTPAnchor = 0; 1142 info->mNTPAnchorUs = -1; 1143 1144 unsigned long PT; 1145 AString formatDesc; 1146 AString formatParams; 1147 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1148 1149 int32_t timescale; 1150 int32_t numChannels; 1151 ASessionDescription::ParseFormatDesc( 1152 formatDesc.c_str(), ×cale, &numChannels); 1153 1154 info->mTimeScale = timescale; 1155 1156 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1157 1158 AString request = "SETUP "; 1159 request.append(trackURL); 1160 request.append(" RTSP/1.0\r\n"); 1161 1162 if (mTryTCPInterleaving) { 1163 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1164 info->mUsingInterleavedTCP = true; 1165 info->mRTPSocket = interleaveIndex; 1166 info->mRTCPSocket = interleaveIndex + 1; 1167 1168 request.append("Transport: RTP/AVP/TCP;interleaved="); 1169 request.append(interleaveIndex); 1170 request.append("-"); 1171 request.append(interleaveIndex + 1); 1172 } else { 1173 unsigned rtpPort; 1174 ARTPConnection::MakePortPair( 1175 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1176 1177 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1178 request.append(rtpPort); 1179 request.append("-"); 1180 request.append(rtpPort + 1); 1181 } 1182 1183 request.append("\r\n"); 1184 1185 if (index > 1) { 1186 request.append("Session: "); 1187 request.append(mSessionID); 1188 request.append("\r\n"); 1189 } 1190 1191 request.append("\r\n"); 1192 1193 sp<AMessage> reply = new AMessage('setu', id()); 1194 reply->setSize("index", index); 1195 reply->setSize("track-index", mTracks.size() - 1); 1196 mConn->sendRequest(request.c_str(), reply); 1197 } 1198 1199 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1200 out->clear(); 1201 1202 if (strncasecmp("rtsp://", baseURL, 7)) { 1203 // Base URL must be absolute 1204 return false; 1205 } 1206 1207 if (!strncasecmp("rtsp://", url, 7)) { 1208 // "url" is already an absolute URL, ignore base URL. 1209 out->setTo(url); 1210 return true; 1211 } 1212 1213 size_t n = strlen(baseURL); 1214 if (baseURL[n - 1] == '/') { 1215 out->setTo(baseURL); 1216 out->append(url); 1217 } else { 1218 const char *slashPos = strrchr(baseURL, '/'); 1219 1220 if (slashPos > &baseURL[6]) { 1221 out->setTo(baseURL, slashPos - baseURL); 1222 } else { 1223 out->setTo(baseURL); 1224 } 1225 1226 out->append("/"); 1227 out->append(url); 1228 } 1229 1230 return true; 1231 } 1232 1233 void fakeTimestamps() { 1234 for (size_t i = 0; i < mTracks.size(); ++i) { 1235 onTimeUpdate(i, 0, 0ll); 1236 } 1237 } 1238 1239 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1240 LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1241 trackIndex, rtpTime, ntpTime); 1242 1243 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1244 1245 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1246 1247 track->mRTPAnchor = rtpTime; 1248 track->mNTPAnchorUs = ntpTimeUs; 1249 1250 if (mNTPAnchorUs < 0) { 1251 mNTPAnchorUs = ntpTimeUs; 1252 mMediaAnchorUs = mLastMediaTimeUs; 1253 } 1254 } 1255 1256 void onAccessUnitComplete( 1257 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1258 LOGV("onAccessUnitComplete track %d", trackIndex); 1259 1260 if (mFirstAccessUnit) { 1261 mDoneMsg->setInt32("result", OK); 1262 mDoneMsg->post(); 1263 mDoneMsg = NULL; 1264 1265 mFirstAccessUnit = false; 1266 } 1267 1268 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1269 1270 if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) { 1271 LOGV("storing accessUnit, no time established yet"); 1272 track->mPackets.push_back(accessUnit); 1273 return; 1274 } 1275 1276 while (!track->mPackets.empty()) { 1277 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1278 track->mPackets.erase(track->mPackets.begin()); 1279 1280 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1281 track->mPacketSource->queueAccessUnit(accessUnit); 1282 } 1283 } 1284 1285 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1286 track->mPacketSource->queueAccessUnit(accessUnit); 1287 } 1288 } 1289 1290 bool addMediaTimestamp( 1291 int32_t trackIndex, const TrackInfo *track, 1292 const sp<ABuffer> &accessUnit) { 1293 uint32_t rtpTime; 1294 CHECK(accessUnit->meta()->findInt32( 1295 "rtp-time", (int32_t *)&rtpTime)); 1296 1297 int64_t relRtpTimeUs = 1298 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1299 / track->mTimeScale; 1300 1301 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1302 1303 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1304 1305 if (mediaTimeUs > mLastMediaTimeUs) { 1306 mLastMediaTimeUs = mediaTimeUs; 1307 } 1308 1309 if (mediaTimeUs < 0) { 1310 LOGV("dropping early accessUnit."); 1311 return false; 1312 } 1313 1314 LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1315 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1316 1317 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1318 1319 return true; 1320 } 1321 1322 1323 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1324}; 1325 1326} // namespace android 1327 1328#endif // MY_HANDLER_H_ 1329