MyHandler.h revision 100a4408968b90e314526185d572c72ea4cc784a
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41 42// If no access units are received within 5 secs, assume that the rtp 43// stream has ended and signal end of stream. 44static int64_t kAccessUnitTimeoutUs = 5000000ll; 45 46// If no access units arrive for the first 10 secs after starting the 47// stream, assume none ever will and signal EOS or switch transports. 48static int64_t kStartupTimeoutUs = 10000000ll; 49 50namespace android { 51 52static void MakeUserAgentString(AString *s) { 53 s->setTo("stagefright/1.1 (Linux;Android "); 54 55#if (PROPERTY_VALUE_MAX < 8) 56#error "PROPERTY_VALUE_MAX must be at least 8" 57#endif 58 59 char value[PROPERTY_VALUE_MAX]; 60 property_get("ro.build.version.release", value, "Unknown"); 61 s->append(value); 62 s->append(")"); 63} 64 65static bool GetAttribute(const char *s, const char *key, AString *value) { 66 value->clear(); 67 68 size_t keyLen = strlen(key); 69 70 for (;;) { 71 while (isspace(*s)) { 72 ++s; 73 } 74 75 const char *colonPos = strchr(s, ';'); 76 77 size_t len = 78 (colonPos == NULL) ? strlen(s) : colonPos - s; 79 80 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 81 value->setTo(&s[keyLen + 1], len - keyLen - 1); 82 return true; 83 } 84 85 if (colonPos == NULL) { 86 return false; 87 } 88 89 s = colonPos + 1; 90 } 91} 92 93struct MyHandler : public AHandler { 94 MyHandler(const char *url, const sp<ALooper> &looper) 95 : mLooper(looper), 96 mNetLooper(new ALooper), 97 mConn(new ARTSPConnection), 98 mRTPConn(new ARTPConnection), 99 mOriginalSessionURL(url), 100 mSessionURL(url), 101 mSetupTracksSuccessful(false), 102 mSeekPending(false), 103 mFirstAccessUnit(true), 104 mNTPAnchorUs(-1), 105 mMediaAnchorUs(-1), 106 mLastMediaTimeUs(0), 107 mNumAccessUnitsReceived(0), 108 mCheckPending(false), 109 mCheckGeneration(0), 110 mTryTCPInterleaving(false), 111 mTryFakeRTCP(false), 112 mReceivedFirstRTCPPacket(false), 113 mReceivedFirstRTPPacket(false), 114 mSeekable(false) { 115 mNetLooper->setName("rtsp net"); 116 mNetLooper->start(false /* runOnCallingThread */, 117 false /* canCallJava */, 118 PRIORITY_HIGHEST); 119 120 // Strip any authentication info from the session url, we don't 121 // want to transmit user/pass in cleartext. 122 AString host, path, user, pass; 123 unsigned port; 124 if (ARTSPConnection::ParseURL( 125 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 126 && user.size() > 0) { 127 mSessionURL.clear(); 128 mSessionURL.append("rtsp://"); 129 mSessionURL.append(host); 130 mSessionURL.append(":"); 131 mSessionURL.append(StringPrintf("%u", port)); 132 mSessionURL.append(path); 133 134 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 135 } 136 } 137 138 void connect(const sp<AMessage> &doneMsg) { 139 mDoneMsg = doneMsg; 140 141 mLooper->registerHandler(this); 142 mLooper->registerHandler(mConn); 143 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 144 145 sp<AMessage> notify = new AMessage('biny', id()); 146 mConn->observeBinaryData(notify); 147 148 sp<AMessage> reply = new AMessage('conn', id()); 149 mConn->connect(mOriginalSessionURL.c_str(), reply); 150 } 151 152 void disconnect(const sp<AMessage> &doneMsg) { 153 mDoneMsg = doneMsg; 154 155 (new AMessage('abor', id()))->post(); 156 } 157 158 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 159 sp<AMessage> msg = new AMessage('seek', id()); 160 msg->setInt64("time", timeUs); 161 msg->setMessage("doneMsg", doneMsg); 162 msg->post(); 163 } 164 165 int64_t getNormalPlayTimeUs() { 166 int64_t maxTimeUs = 0; 167 for (size_t i = 0; i < mTracks.size(); ++i) { 168 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 169 ->getNormalPlayTimeUs(); 170 171 if (i == 0 || timeUs > maxTimeUs) { 172 maxTimeUs = timeUs; 173 } 174 } 175 176 return maxTimeUs; 177 } 178 179 static void addRR(const sp<ABuffer> &buf) { 180 uint8_t *ptr = buf->data() + buf->size(); 181 ptr[0] = 0x80 | 0; 182 ptr[1] = 201; // RR 183 ptr[2] = 0; 184 ptr[3] = 1; 185 ptr[4] = 0xde; // SSRC 186 ptr[5] = 0xad; 187 ptr[6] = 0xbe; 188 ptr[7] = 0xef; 189 190 buf->setRange(0, buf->size() + 8); 191 } 192 193 static void addSDES(int s, const sp<ABuffer> &buffer) { 194 struct sockaddr_in addr; 195 socklen_t addrSize = sizeof(addr); 196 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 197 198 uint8_t *data = buffer->data() + buffer->size(); 199 data[0] = 0x80 | 1; 200 data[1] = 202; // SDES 201 data[4] = 0xde; // SSRC 202 data[5] = 0xad; 203 data[6] = 0xbe; 204 data[7] = 0xef; 205 206 size_t offset = 8; 207 208 data[offset++] = 1; // CNAME 209 210 AString cname = "stagefright@"; 211 cname.append(inet_ntoa(addr.sin_addr)); 212 data[offset++] = cname.size(); 213 214 memcpy(&data[offset], cname.c_str(), cname.size()); 215 offset += cname.size(); 216 217 data[offset++] = 6; // TOOL 218 219 AString tool; 220 MakeUserAgentString(&tool); 221 222 data[offset++] = tool.size(); 223 224 memcpy(&data[offset], tool.c_str(), tool.size()); 225 offset += tool.size(); 226 227 data[offset++] = 0; 228 229 if ((offset % 4) > 0) { 230 size_t count = 4 - (offset % 4); 231 switch (count) { 232 case 3: 233 data[offset++] = 0; 234 case 2: 235 data[offset++] = 0; 236 case 1: 237 data[offset++] = 0; 238 } 239 } 240 241 size_t numWords = (offset / 4) - 1; 242 data[2] = numWords >> 8; 243 data[3] = numWords & 0xff; 244 245 buffer->setRange(buffer->offset(), buffer->size() + offset); 246 } 247 248 // In case we're behind NAT, fire off two UDP packets to the remote 249 // rtp/rtcp ports to poke a hole into the firewall for future incoming 250 // packets. We're going to send an RR/SDES RTCP packet to both of them. 251 void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 252 AString source; 253 AString server_port; 254 if (!GetAttribute(transport.c_str(), 255 "source", 256 &source) 257 || !GetAttribute(transport.c_str(), 258 "server_port", 259 &server_port)) { 260 return; 261 } 262 263 int rtpPort, rtcpPort; 264 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 265 || rtpPort <= 0 || rtpPort > 65535 266 || rtcpPort <=0 || rtcpPort > 65535 267 || rtcpPort != rtpPort + 1 268 || (rtpPort & 1) != 0) { 269 return; 270 } 271 272 struct sockaddr_in addr; 273 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 274 addr.sin_family = AF_INET; 275 addr.sin_addr.s_addr = inet_addr(source.c_str()); 276 277 if (addr.sin_addr.s_addr == INADDR_NONE) { 278 return; 279 } 280 281 // Make up an RR/SDES RTCP packet. 282 sp<ABuffer> buf = new ABuffer(65536); 283 buf->setRange(0, 0); 284 addRR(buf); 285 addSDES(rtpSocket, buf); 286 287 addr.sin_port = htons(rtpPort); 288 289 ssize_t n = sendto( 290 rtpSocket, buf->data(), buf->size(), 0, 291 (const sockaddr *)&addr, sizeof(addr)); 292 CHECK_EQ(n, (ssize_t)buf->size()); 293 294 addr.sin_port = htons(rtcpPort); 295 296 n = sendto( 297 rtcpSocket, buf->data(), buf->size(), 0, 298 (const sockaddr *)&addr, sizeof(addr)); 299 CHECK_EQ(n, (ssize_t)buf->size()); 300 301 LOGV("successfully poked holes."); 302 } 303 304 virtual void onMessageReceived(const sp<AMessage> &msg) { 305 switch (msg->what()) { 306 case 'conn': 307 { 308 int32_t result; 309 CHECK(msg->findInt32("result", &result)); 310 311 LOGI("connection request completed with result %d (%s)", 312 result, strerror(-result)); 313 314 if (result == OK) { 315 AString request; 316 request = "DESCRIBE "; 317 request.append(mSessionURL); 318 request.append(" RTSP/1.0\r\n"); 319 request.append("Accept: application/sdp\r\n"); 320 request.append("\r\n"); 321 322 sp<AMessage> reply = new AMessage('desc', id()); 323 mConn->sendRequest(request.c_str(), reply); 324 } else { 325 (new AMessage('disc', id()))->post(); 326 } 327 break; 328 } 329 330 case 'disc': 331 { 332 int32_t reconnect; 333 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 334 sp<AMessage> reply = new AMessage('conn', id()); 335 mConn->connect(mOriginalSessionURL.c_str(), reply); 336 } else { 337 (new AMessage('quit', id()))->post(); 338 } 339 break; 340 } 341 342 case 'desc': 343 { 344 int32_t result; 345 CHECK(msg->findInt32("result", &result)); 346 347 LOGI("DESCRIBE completed with result %d (%s)", 348 result, strerror(-result)); 349 350 if (result == OK) { 351 sp<RefBase> obj; 352 CHECK(msg->findObject("response", &obj)); 353 sp<ARTSPResponse> response = 354 static_cast<ARTSPResponse *>(obj.get()); 355 356 if (response->mStatusCode == 302) { 357 ssize_t i = response->mHeaders.indexOfKey("location"); 358 CHECK_GE(i, 0); 359 360 mSessionURL = response->mHeaders.valueAt(i); 361 362 AString request; 363 request = "DESCRIBE "; 364 request.append(mSessionURL); 365 request.append(" RTSP/1.0\r\n"); 366 request.append("Accept: application/sdp\r\n"); 367 request.append("\r\n"); 368 369 sp<AMessage> reply = new AMessage('desc', id()); 370 mConn->sendRequest(request.c_str(), reply); 371 break; 372 } 373 374 if (response->mStatusCode != 200) { 375 result = UNKNOWN_ERROR; 376 } else { 377 mSessionDesc = new ASessionDescription; 378 379 mSessionDesc->setTo( 380 response->mContent->data(), 381 response->mContent->size()); 382 383 if (!mSessionDesc->isValid()) { 384 result = ERROR_MALFORMED; 385 } else { 386 ssize_t i = response->mHeaders.indexOfKey("content-base"); 387 if (i >= 0) { 388 mBaseURL = response->mHeaders.valueAt(i); 389 } else { 390 i = response->mHeaders.indexOfKey("content-location"); 391 if (i >= 0) { 392 mBaseURL = response->mHeaders.valueAt(i); 393 } else { 394 mBaseURL = mSessionURL; 395 } 396 } 397 398 CHECK_GT(mSessionDesc->countTracks(), 1u); 399 setupTrack(1); 400 } 401 } 402 } 403 404 if (result != OK) { 405 sp<AMessage> reply = new AMessage('disc', id()); 406 mConn->disconnect(reply); 407 } 408 break; 409 } 410 411 case 'setu': 412 { 413 size_t index; 414 CHECK(msg->findSize("index", &index)); 415 416 TrackInfo *track = NULL; 417 size_t trackIndex; 418 if (msg->findSize("track-index", &trackIndex)) { 419 track = &mTracks.editItemAt(trackIndex); 420 } 421 422 int32_t result; 423 CHECK(msg->findInt32("result", &result)); 424 425 LOGI("SETUP(%d) completed with result %d (%s)", 426 index, result, strerror(-result)); 427 428 if (result == OK) { 429 CHECK(track != NULL); 430 431 sp<RefBase> obj; 432 CHECK(msg->findObject("response", &obj)); 433 sp<ARTSPResponse> response = 434 static_cast<ARTSPResponse *>(obj.get()); 435 436 if (response->mStatusCode != 200) { 437 result = UNKNOWN_ERROR; 438 } else { 439 ssize_t i = response->mHeaders.indexOfKey("session"); 440 CHECK_GE(i, 0); 441 442 mSessionID = response->mHeaders.valueAt(i); 443 i = mSessionID.find(";"); 444 if (i >= 0) { 445 // Remove options, i.e. ";timeout=90" 446 mSessionID.erase(i, mSessionID.size() - i); 447 } 448 449 sp<AMessage> notify = new AMessage('accu', id()); 450 notify->setSize("track-index", trackIndex); 451 452 i = response->mHeaders.indexOfKey("transport"); 453 CHECK_GE(i, 0); 454 455 if (!track->mUsingInterleavedTCP) { 456 AString transport = response->mHeaders.valueAt(i); 457 458 pokeAHole(track->mRTPSocket, 459 track->mRTCPSocket, 460 transport); 461 } 462 463 mRTPConn->addStream( 464 track->mRTPSocket, track->mRTCPSocket, 465 mSessionDesc, index, 466 notify, track->mUsingInterleavedTCP); 467 468 mSetupTracksSuccessful = true; 469 } 470 } 471 472 if (result != OK) { 473 if (track) { 474 if (!track->mUsingInterleavedTCP) { 475 close(track->mRTPSocket); 476 close(track->mRTCPSocket); 477 } 478 479 mTracks.removeItemsAt(trackIndex); 480 } 481 } 482 483 ++index; 484 if (index < mSessionDesc->countTracks()) { 485 setupTrack(index); 486 } else if (mSetupTracksSuccessful) { 487 AString request = "PLAY "; 488 request.append(mSessionURL); 489 request.append(" RTSP/1.0\r\n"); 490 491 request.append("Session: "); 492 request.append(mSessionID); 493 request.append("\r\n"); 494 495 request.append("\r\n"); 496 497 sp<AMessage> reply = new AMessage('play', id()); 498 mConn->sendRequest(request.c_str(), reply); 499 } else { 500 sp<AMessage> reply = new AMessage('disc', id()); 501 mConn->disconnect(reply); 502 } 503 break; 504 } 505 506 case 'play': 507 { 508 int32_t result; 509 CHECK(msg->findInt32("result", &result)); 510 511 LOGI("PLAY completed with result %d (%s)", 512 result, strerror(-result)); 513 514 if (result == OK) { 515 sp<RefBase> obj; 516 CHECK(msg->findObject("response", &obj)); 517 sp<ARTSPResponse> response = 518 static_cast<ARTSPResponse *>(obj.get()); 519 520 if (response->mStatusCode != 200) { 521 result = UNKNOWN_ERROR; 522 } else { 523 parsePlayResponse(response); 524 525 sp<AMessage> timeout = new AMessage('tiou', id()); 526 timeout->post(kStartupTimeoutUs); 527 } 528 } 529 530 if (result != OK) { 531 sp<AMessage> reply = new AMessage('disc', id()); 532 mConn->disconnect(reply); 533 } 534 535 break; 536 } 537 538 case 'abor': 539 { 540 for (size_t i = 0; i < mTracks.size(); ++i) { 541 TrackInfo *info = &mTracks.editItemAt(i); 542 543 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 544 545 if (!info->mUsingInterleavedTCP) { 546 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 547 548 close(info->mRTPSocket); 549 close(info->mRTCPSocket); 550 } 551 } 552 mTracks.clear(); 553 mSetupTracksSuccessful = false; 554 mSeekPending = false; 555 mFirstAccessUnit = true; 556 mNTPAnchorUs = -1; 557 mMediaAnchorUs = -1; 558 mNumAccessUnitsReceived = 0; 559 mReceivedFirstRTCPPacket = false; 560 mReceivedFirstRTPPacket = false; 561 mSeekable = false; 562 563 sp<AMessage> reply = new AMessage('tear', id()); 564 565 int32_t reconnect; 566 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 567 reply->setInt32("reconnect", true); 568 } 569 570 AString request; 571 request = "TEARDOWN "; 572 573 // XXX should use aggregate url from SDP here... 574 request.append(mSessionURL); 575 request.append(" RTSP/1.0\r\n"); 576 577 request.append("Session: "); 578 request.append(mSessionID); 579 request.append("\r\n"); 580 581 request.append("\r\n"); 582 583 mConn->sendRequest(request.c_str(), reply); 584 break; 585 } 586 587 case 'tear': 588 { 589 int32_t result; 590 CHECK(msg->findInt32("result", &result)); 591 592 LOGI("TEARDOWN completed with result %d (%s)", 593 result, strerror(-result)); 594 595 sp<AMessage> reply = new AMessage('disc', id()); 596 597 int32_t reconnect; 598 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 599 reply->setInt32("reconnect", true); 600 } 601 602 mConn->disconnect(reply); 603 break; 604 } 605 606 case 'quit': 607 { 608 if (mDoneMsg != NULL) { 609 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 610 mDoneMsg->post(); 611 mDoneMsg = NULL; 612 } 613 break; 614 } 615 616 case 'chek': 617 { 618 int32_t generation; 619 CHECK(msg->findInt32("generation", &generation)); 620 if (generation != mCheckGeneration) { 621 // This is an outdated message. Ignore. 622 break; 623 } 624 625 if (mNumAccessUnitsReceived == 0) { 626 LOGI("stream ended? aborting."); 627 (new AMessage('abor', id()))->post(); 628 break; 629 } 630 631 mNumAccessUnitsReceived = 0; 632 msg->post(kAccessUnitTimeoutUs); 633 break; 634 } 635 636 case 'accu': 637 { 638 int32_t timeUpdate; 639 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 640 size_t trackIndex; 641 CHECK(msg->findSize("track-index", &trackIndex)); 642 643 uint32_t rtpTime; 644 uint64_t ntpTime; 645 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 646 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 647 648 onTimeUpdate(trackIndex, rtpTime, ntpTime); 649 break; 650 } 651 652 int32_t first; 653 if (msg->findInt32("first-rtcp", &first)) { 654 mReceivedFirstRTCPPacket = true; 655 break; 656 } 657 658 if (msg->findInt32("first-rtp", &first)) { 659 mReceivedFirstRTPPacket = true; 660 break; 661 } 662 663 ++mNumAccessUnitsReceived; 664 postAccessUnitTimeoutCheck(); 665 666 size_t trackIndex; 667 CHECK(msg->findSize("track-index", &trackIndex)); 668 669 if (trackIndex >= mTracks.size()) { 670 LOGV("late packets ignored."); 671 break; 672 } 673 674 TrackInfo *track = &mTracks.editItemAt(trackIndex); 675 676 int32_t eos; 677 if (msg->findInt32("eos", &eos)) { 678 LOGI("received BYE on track index %d", trackIndex); 679#if 0 680 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 681#endif 682 return; 683 } 684 685 sp<RefBase> obj; 686 CHECK(msg->findObject("access-unit", &obj)); 687 688 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 689 690 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 691 692 if (mSeekPending) { 693 LOGV("we're seeking, dropping stale packet."); 694 break; 695 } 696 697 if (seqNum < track->mFirstSeqNumInSegment) { 698 LOGV("dropping stale access-unit (%d < %d)", 699 seqNum, track->mFirstSeqNumInSegment); 700 break; 701 } 702 703 if (track->mNewSegment) { 704 track->mNewSegment = false; 705 } 706 707 onAccessUnitComplete(trackIndex, accessUnit); 708 break; 709 } 710 711 case 'seek': 712 { 713 sp<AMessage> doneMsg; 714 CHECK(msg->findMessage("doneMsg", &doneMsg)); 715 716 if (mSeekPending) { 717 doneMsg->post(); 718 break; 719 } 720 721 if (!mSeekable) { 722 LOGW("This is a live stream, ignoring seek request."); 723 doneMsg->post(); 724 break; 725 } 726 727 int64_t timeUs; 728 CHECK(msg->findInt64("time", &timeUs)); 729 730 mSeekPending = true; 731 732 // Disable the access unit timeout until we resumed 733 // playback again. 734 mCheckPending = true; 735 ++mCheckGeneration; 736 737 AString request = "PAUSE "; 738 request.append(mSessionURL); 739 request.append(" RTSP/1.0\r\n"); 740 741 request.append("Session: "); 742 request.append(mSessionID); 743 request.append("\r\n"); 744 745 request.append("\r\n"); 746 747 sp<AMessage> reply = new AMessage('see1', id()); 748 reply->setInt64("time", timeUs); 749 reply->setMessage("doneMsg", doneMsg); 750 mConn->sendRequest(request.c_str(), reply); 751 break; 752 } 753 754 case 'see1': 755 { 756 // Session is paused now. 757 for (size_t i = 0; i < mTracks.size(); ++i) { 758 TrackInfo *info = &mTracks.editItemAt(i); 759 760 info->mPacketSource->flushQueue(); 761 info->mRTPAnchor = 0; 762 info->mNTPAnchorUs = -1; 763 } 764 765 mNTPAnchorUs = -1; 766 767 int64_t timeUs; 768 CHECK(msg->findInt64("time", &timeUs)); 769 770 AString request = "PLAY "; 771 request.append(mSessionURL); 772 request.append(" RTSP/1.0\r\n"); 773 774 request.append("Session: "); 775 request.append(mSessionID); 776 request.append("\r\n"); 777 778 request.append( 779 StringPrintf( 780 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 781 782 request.append("\r\n"); 783 784 sp<AMessage> doneMsg; 785 CHECK(msg->findMessage("doneMsg", &doneMsg)); 786 787 sp<AMessage> reply = new AMessage('see2', id()); 788 reply->setMessage("doneMsg", doneMsg); 789 mConn->sendRequest(request.c_str(), reply); 790 break; 791 } 792 793 case 'see2': 794 { 795 CHECK(mSeekPending); 796 797 int32_t result; 798 CHECK(msg->findInt32("result", &result)); 799 800 LOGI("PLAY completed with result %d (%s)", 801 result, strerror(-result)); 802 803 mCheckPending = false; 804 postAccessUnitTimeoutCheck(); 805 806 if (result == OK) { 807 sp<RefBase> obj; 808 CHECK(msg->findObject("response", &obj)); 809 sp<ARTSPResponse> response = 810 static_cast<ARTSPResponse *>(obj.get()); 811 812 if (response->mStatusCode != 200) { 813 result = UNKNOWN_ERROR; 814 } else { 815 parsePlayResponse(response); 816 817 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 818 CHECK_GE(i, 0); 819 820 LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 821 822 LOGI("seek completed."); 823 } 824 } 825 826 if (result != OK) { 827 LOGE("seek failed, aborting."); 828 (new AMessage('abor', id()))->post(); 829 } 830 831 mSeekPending = false; 832 833 sp<AMessage> doneMsg; 834 CHECK(msg->findMessage("doneMsg", &doneMsg)); 835 836 doneMsg->post(); 837 break; 838 } 839 840 case 'biny': 841 { 842 sp<RefBase> obj; 843 CHECK(msg->findObject("buffer", &obj)); 844 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 845 846 int32_t index; 847 CHECK(buffer->meta()->findInt32("index", &index)); 848 849 mRTPConn->injectPacket(index, buffer); 850 break; 851 } 852 853 case 'tiou': 854 { 855 if (!mReceivedFirstRTCPPacket) { 856 if (mTryFakeRTCP) { 857 LOGW("Never received any data, disconnecting."); 858 (new AMessage('abor', id()))->post(); 859 } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) { 860 LOGW("We received RTP packets but no RTCP packets, " 861 "using fake timestamps."); 862 863 mTryFakeRTCP = true; 864 865 mReceivedFirstRTCPPacket = true; 866 } else { 867 LOGW("Never received any data, switching transports."); 868 869 mTryTCPInterleaving = true; 870 871 sp<AMessage> msg = new AMessage('abor', id()); 872 msg->setInt32("reconnect", true); 873 msg->post(); 874 } 875 } 876 break; 877 } 878 879 default: 880 TRESPASS(); 881 break; 882 } 883 } 884 885 void postAccessUnitTimeoutCheck() { 886 if (mCheckPending) { 887 return; 888 } 889 890 mCheckPending = true; 891 sp<AMessage> check = new AMessage('chek', id()); 892 check->setInt32("generation", mCheckGeneration); 893 check->post(kAccessUnitTimeoutUs); 894 } 895 896 static void SplitString( 897 const AString &s, const char *separator, List<AString> *items) { 898 items->clear(); 899 size_t start = 0; 900 while (start < s.size()) { 901 ssize_t offset = s.find(separator, start); 902 903 if (offset < 0) { 904 items->push_back(AString(s, start, s.size() - start)); 905 break; 906 } 907 908 items->push_back(AString(s, start, offset - start)); 909 start = offset + strlen(separator); 910 } 911 } 912 913 void parsePlayResponse(const sp<ARTSPResponse> &response) { 914 mSeekable = false; 915 916 ssize_t i = response->mHeaders.indexOfKey("range"); 917 if (i < 0) { 918 // Server doesn't even tell use what range it is going to 919 // play, therefore we won't support seeking. 920 return; 921 } 922 923 AString range = response->mHeaders.valueAt(i); 924 LOGV("Range: %s", range.c_str()); 925 926 AString val; 927 CHECK(GetAttribute(range.c_str(), "npt", &val)); 928 929 float npt1, npt2; 930 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 931 // This is a live stream and therefore not seekable. 932 return; 933 } 934 935 i = response->mHeaders.indexOfKey("rtp-info"); 936 CHECK_GE(i, 0); 937 938 AString rtpInfo = response->mHeaders.valueAt(i); 939 List<AString> streamInfos; 940 SplitString(rtpInfo, ",", &streamInfos); 941 942 int n = 1; 943 for (List<AString>::iterator it = streamInfos.begin(); 944 it != streamInfos.end(); ++it) { 945 (*it).trim(); 946 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 947 948 CHECK(GetAttribute((*it).c_str(), "url", &val)); 949 950 size_t trackIndex = 0; 951 while (trackIndex < mTracks.size() 952 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 953 ++trackIndex; 954 } 955 CHECK_LT(trackIndex, mTracks.size()); 956 957 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 958 959 char *end; 960 unsigned long seq = strtoul(val.c_str(), &end, 10); 961 962 TrackInfo *info = &mTracks.editItemAt(trackIndex); 963 info->mFirstSeqNumInSegment = seq; 964 info->mNewSegment = true; 965 966 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 967 968 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 969 970 LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 971 972 info->mPacketSource->setNormalPlayTimeMapping( 973 rtpTime, (int64_t)(npt1 * 1E6)); 974 975 ++n; 976 } 977 978 mSeekable = true; 979 } 980 981 sp<APacketSource> getPacketSource(size_t index) { 982 CHECK_GE(index, 0u); 983 CHECK_LT(index, mTracks.size()); 984 985 return mTracks.editItemAt(index).mPacketSource; 986 } 987 988 size_t countTracks() const { 989 return mTracks.size(); 990 } 991 992private: 993 struct TrackInfo { 994 AString mURL; 995 int mRTPSocket; 996 int mRTCPSocket; 997 bool mUsingInterleavedTCP; 998 uint32_t mFirstSeqNumInSegment; 999 bool mNewSegment; 1000 1001 uint32_t mRTPAnchor; 1002 int64_t mNTPAnchorUs; 1003 int32_t mTimeScale; 1004 1005 sp<APacketSource> mPacketSource; 1006 1007 // Stores packets temporarily while no notion of time 1008 // has been established yet. 1009 List<sp<ABuffer> > mPackets; 1010 }; 1011 1012 sp<ALooper> mLooper; 1013 sp<ALooper> mNetLooper; 1014 sp<ARTSPConnection> mConn; 1015 sp<ARTPConnection> mRTPConn; 1016 sp<ASessionDescription> mSessionDesc; 1017 AString mOriginalSessionURL; // This one still has user:pass@ 1018 AString mSessionURL; 1019 AString mBaseURL; 1020 AString mSessionID; 1021 bool mSetupTracksSuccessful; 1022 bool mSeekPending; 1023 bool mFirstAccessUnit; 1024 1025 int64_t mNTPAnchorUs; 1026 int64_t mMediaAnchorUs; 1027 int64_t mLastMediaTimeUs; 1028 1029 int64_t mNumAccessUnitsReceived; 1030 bool mCheckPending; 1031 int32_t mCheckGeneration; 1032 bool mTryTCPInterleaving; 1033 bool mTryFakeRTCP; 1034 bool mReceivedFirstRTCPPacket; 1035 bool mReceivedFirstRTPPacket; 1036 bool mSeekable; 1037 1038 Vector<TrackInfo> mTracks; 1039 1040 sp<AMessage> mDoneMsg; 1041 1042 void setupTrack(size_t index) { 1043 sp<APacketSource> source = 1044 new APacketSource(mSessionDesc, index); 1045 1046 if (source->initCheck() != OK) { 1047 LOGW("Unsupported format. Ignoring track #%d.", index); 1048 1049 sp<AMessage> reply = new AMessage('setu', id()); 1050 reply->setSize("index", index); 1051 reply->setInt32("result", ERROR_UNSUPPORTED); 1052 reply->post(); 1053 return; 1054 } 1055 1056 AString url; 1057 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1058 1059 AString trackURL; 1060 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1061 1062 mTracks.push(TrackInfo()); 1063 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1064 info->mURL = trackURL; 1065 info->mPacketSource = source; 1066 info->mUsingInterleavedTCP = false; 1067 info->mFirstSeqNumInSegment = 0; 1068 info->mNewSegment = true; 1069 info->mRTPAnchor = 0; 1070 info->mNTPAnchorUs = -1; 1071 1072 unsigned long PT; 1073 AString formatDesc; 1074 AString formatParams; 1075 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1076 1077 int32_t timescale; 1078 int32_t numChannels; 1079 ASessionDescription::ParseFormatDesc( 1080 formatDesc.c_str(), ×cale, &numChannels); 1081 1082 info->mTimeScale = timescale; 1083 1084 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1085 1086 AString request = "SETUP "; 1087 request.append(trackURL); 1088 request.append(" RTSP/1.0\r\n"); 1089 1090 if (mTryTCPInterleaving) { 1091 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1092 info->mUsingInterleavedTCP = true; 1093 info->mRTPSocket = interleaveIndex; 1094 info->mRTCPSocket = interleaveIndex + 1; 1095 1096 request.append("Transport: RTP/AVP/TCP;interleaved="); 1097 request.append(interleaveIndex); 1098 request.append("-"); 1099 request.append(interleaveIndex + 1); 1100 } else { 1101 unsigned rtpPort; 1102 ARTPConnection::MakePortPair( 1103 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1104 1105 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1106 request.append(rtpPort); 1107 request.append("-"); 1108 request.append(rtpPort + 1); 1109 } 1110 1111 request.append("\r\n"); 1112 1113 if (index > 1) { 1114 request.append("Session: "); 1115 request.append(mSessionID); 1116 request.append("\r\n"); 1117 } 1118 1119 request.append("\r\n"); 1120 1121 sp<AMessage> reply = new AMessage('setu', id()); 1122 reply->setSize("index", index); 1123 reply->setSize("track-index", mTracks.size() - 1); 1124 mConn->sendRequest(request.c_str(), reply); 1125 } 1126 1127 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1128 out->clear(); 1129 1130 if (strncasecmp("rtsp://", baseURL, 7)) { 1131 // Base URL must be absolute 1132 return false; 1133 } 1134 1135 if (!strncasecmp("rtsp://", url, 7)) { 1136 // "url" is already an absolute URL, ignore base URL. 1137 out->setTo(url); 1138 return true; 1139 } 1140 1141 size_t n = strlen(baseURL); 1142 if (baseURL[n - 1] == '/') { 1143 out->setTo(baseURL); 1144 out->append(url); 1145 } else { 1146 const char *slashPos = strrchr(baseURL, '/'); 1147 1148 if (slashPos > &baseURL[6]) { 1149 out->setTo(baseURL, slashPos - baseURL); 1150 } else { 1151 out->setTo(baseURL); 1152 } 1153 1154 out->append("/"); 1155 out->append(url); 1156 } 1157 1158 return true; 1159 } 1160 1161 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1162 LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1163 trackIndex, rtpTime, ntpTime); 1164 1165 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1166 1167 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1168 1169 track->mRTPAnchor = rtpTime; 1170 track->mNTPAnchorUs = ntpTimeUs; 1171 1172 if (mNTPAnchorUs < 0) { 1173 mNTPAnchorUs = ntpTimeUs; 1174 mMediaAnchorUs = mLastMediaTimeUs; 1175 } 1176 } 1177 1178 void onAccessUnitComplete( 1179 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1180 LOGV("onAccessUnitComplete track %d", trackIndex); 1181 1182 if (mFirstAccessUnit) { 1183 mDoneMsg->setInt32("result", OK); 1184 mDoneMsg->post(); 1185 mDoneMsg = NULL; 1186 1187 mFirstAccessUnit = false; 1188 } 1189 1190 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1191 1192 if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) { 1193 LOGV("storing accessUnit, no time established yet"); 1194 track->mPackets.push_back(accessUnit); 1195 return; 1196 } 1197 1198 while (!track->mPackets.empty()) { 1199 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1200 track->mPackets.erase(track->mPackets.begin()); 1201 1202 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1203 track->mPacketSource->queueAccessUnit(accessUnit); 1204 } 1205 } 1206 1207 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1208 track->mPacketSource->queueAccessUnit(accessUnit); 1209 } 1210 } 1211 1212 bool addMediaTimestamp( 1213 int32_t trackIndex, const TrackInfo *track, 1214 const sp<ABuffer> &accessUnit) { 1215 uint32_t rtpTime; 1216 CHECK(accessUnit->meta()->findInt32( 1217 "rtp-time", (int32_t *)&rtpTime)); 1218 1219 int64_t relRtpTimeUs = 1220 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1221 / track->mTimeScale; 1222 1223 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1224 1225 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1226 1227 if (mediaTimeUs > mLastMediaTimeUs) { 1228 mLastMediaTimeUs = mediaTimeUs; 1229 } 1230 1231 if (mediaTimeUs < 0) { 1232 LOGV("dropping early accessUnit."); 1233 return false; 1234 } 1235 1236 LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1237 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1238 1239 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1240 1241 return true; 1242 } 1243 1244 1245 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1246}; 1247 1248} // namespace android 1249 1250#endif // MY_HANDLER_H_ 1251