MyHandler.h revision 4579b7d49f6dd4f37e6043e59debfd72d69b8e7b
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41 42// If no access units are received within 3 secs, assume that the rtp 43// stream has ended and signal end of stream. 44static int64_t kAccessUnitTimeoutUs = 3000000ll; 45 46// If no access units arrive for the first 10 secs after starting the 47// stream, assume none ever will and signal EOS or switch transports. 48static int64_t kStartupTimeoutUs = 10000000ll; 49 50namespace android { 51 52static void MakeUserAgentString(AString *s) { 53 s->setTo("stagefright/1.1 (Linux;Android "); 54 55#if (PROPERTY_VALUE_MAX < 8) 56#error "PROPERTY_VALUE_MAX must be at least 8" 57#endif 58 59 char value[PROPERTY_VALUE_MAX]; 60 property_get("ro.build.version.release", value, "Unknown"); 61 s->append(value); 62 s->append(")"); 63} 64 65static bool GetAttribute(const char *s, const char *key, AString *value) { 66 value->clear(); 67 68 size_t keyLen = strlen(key); 69 70 for (;;) { 71 while (isspace(*s)) { 72 ++s; 73 } 74 75 const char *colonPos = strchr(s, ';'); 76 77 size_t len = 78 (colonPos == NULL) ? strlen(s) : colonPos - s; 79 80 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 81 value->setTo(&s[keyLen + 1], len - keyLen - 1); 82 return true; 83 } 84 85 if (colonPos == NULL) { 86 return false; 87 } 88 89 s = colonPos + 1; 90 } 91} 92 93struct MyHandler : public AHandler { 94 MyHandler(const char *url, const sp<ALooper> &looper) 95 : mLooper(looper), 96 mNetLooper(new ALooper), 97 mConn(new ARTSPConnection), 98 mRTPConn(new ARTPConnection), 99 mOriginalSessionURL(url), 100 mSessionURL(url), 101 mSetupTracksSuccessful(false), 102 mSeekPending(false), 103 mFirstAccessUnit(true), 104 mFirstAccessUnitNTP(0), 105 mNumAccessUnitsReceived(0), 106 mCheckPending(false), 107 mCheckGeneration(0), 108 mTryTCPInterleaving(false), 109 mTryFakeRTCP(false), 110 mReceivedFirstRTCPPacket(false), 111 mReceivedFirstRTPPacket(false), 112 mSeekable(false) { 113 mNetLooper->setName("rtsp net"); 114 mNetLooper->start(false /* runOnCallingThread */, 115 false /* canCallJava */, 116 PRIORITY_HIGHEST); 117 118 // Strip any authentication info from the session url, we don't 119 // want to transmit user/pass in cleartext. 120 AString host, path, user, pass; 121 unsigned port; 122 if (ARTSPConnection::ParseURL( 123 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 124 && user.size() > 0) { 125 mSessionURL.clear(); 126 mSessionURL.append("rtsp://"); 127 mSessionURL.append(host); 128 mSessionURL.append(":"); 129 mSessionURL.append(StringPrintf("%u", port)); 130 mSessionURL.append(path); 131 132 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 133 } 134 } 135 136 void connect(const sp<AMessage> &doneMsg) { 137 mDoneMsg = doneMsg; 138 139 mLooper->registerHandler(this); 140 mLooper->registerHandler(mConn); 141 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 142 143 sp<AMessage> notify = new AMessage('biny', id()); 144 mConn->observeBinaryData(notify); 145 146 sp<AMessage> reply = new AMessage('conn', id()); 147 mConn->connect(mOriginalSessionURL.c_str(), reply); 148 } 149 150 void disconnect(const sp<AMessage> &doneMsg) { 151 mDoneMsg = doneMsg; 152 153 (new AMessage('abor', id()))->post(); 154 } 155 156 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 157 sp<AMessage> msg = new AMessage('seek', id()); 158 msg->setInt64("time", timeUs); 159 msg->setMessage("doneMsg", doneMsg); 160 msg->post(); 161 } 162 163 int64_t getNormalPlayTimeUs() { 164 int64_t maxTimeUs = 0; 165 for (size_t i = 0; i < mTracks.size(); ++i) { 166 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 167 ->getNormalPlayTimeUs(); 168 169 if (i == 0 || timeUs > maxTimeUs) { 170 maxTimeUs = timeUs; 171 } 172 } 173 174 return maxTimeUs; 175 } 176 177 static void addRR(const sp<ABuffer> &buf) { 178 uint8_t *ptr = buf->data() + buf->size(); 179 ptr[0] = 0x80 | 0; 180 ptr[1] = 201; // RR 181 ptr[2] = 0; 182 ptr[3] = 1; 183 ptr[4] = 0xde; // SSRC 184 ptr[5] = 0xad; 185 ptr[6] = 0xbe; 186 ptr[7] = 0xef; 187 188 buf->setRange(0, buf->size() + 8); 189 } 190 191 static void addSDES(int s, const sp<ABuffer> &buffer) { 192 struct sockaddr_in addr; 193 socklen_t addrSize = sizeof(addr); 194 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 195 196 uint8_t *data = buffer->data() + buffer->size(); 197 data[0] = 0x80 | 1; 198 data[1] = 202; // SDES 199 data[4] = 0xde; // SSRC 200 data[5] = 0xad; 201 data[6] = 0xbe; 202 data[7] = 0xef; 203 204 size_t offset = 8; 205 206 data[offset++] = 1; // CNAME 207 208 AString cname = "stagefright@"; 209 cname.append(inet_ntoa(addr.sin_addr)); 210 data[offset++] = cname.size(); 211 212 memcpy(&data[offset], cname.c_str(), cname.size()); 213 offset += cname.size(); 214 215 data[offset++] = 6; // TOOL 216 217 AString tool; 218 MakeUserAgentString(&tool); 219 220 data[offset++] = tool.size(); 221 222 memcpy(&data[offset], tool.c_str(), tool.size()); 223 offset += tool.size(); 224 225 data[offset++] = 0; 226 227 if ((offset % 4) > 0) { 228 size_t count = 4 - (offset % 4); 229 switch (count) { 230 case 3: 231 data[offset++] = 0; 232 case 2: 233 data[offset++] = 0; 234 case 1: 235 data[offset++] = 0; 236 } 237 } 238 239 size_t numWords = (offset / 4) - 1; 240 data[2] = numWords >> 8; 241 data[3] = numWords & 0xff; 242 243 buffer->setRange(buffer->offset(), buffer->size() + offset); 244 } 245 246 // In case we're behind NAT, fire off two UDP packets to the remote 247 // rtp/rtcp ports to poke a hole into the firewall for future incoming 248 // packets. We're going to send an RR/SDES RTCP packet to both of them. 249 void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 250 AString source; 251 AString server_port; 252 if (!GetAttribute(transport.c_str(), 253 "source", 254 &source) 255 || !GetAttribute(transport.c_str(), 256 "server_port", 257 &server_port)) { 258 return; 259 } 260 261 int rtpPort, rtcpPort; 262 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 263 || rtpPort <= 0 || rtpPort > 65535 264 || rtcpPort <=0 || rtcpPort > 65535 265 || rtcpPort != rtpPort + 1 266 || (rtpPort & 1) != 0) { 267 return; 268 } 269 270 struct sockaddr_in addr; 271 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 272 addr.sin_family = AF_INET; 273 addr.sin_addr.s_addr = inet_addr(source.c_str()); 274 275 if (addr.sin_addr.s_addr == INADDR_NONE) { 276 return; 277 } 278 279 // Make up an RR/SDES RTCP packet. 280 sp<ABuffer> buf = new ABuffer(65536); 281 buf->setRange(0, 0); 282 addRR(buf); 283 addSDES(rtpSocket, buf); 284 285 addr.sin_port = htons(rtpPort); 286 287 ssize_t n = sendto( 288 rtpSocket, buf->data(), buf->size(), 0, 289 (const sockaddr *)&addr, sizeof(addr)); 290 CHECK_EQ(n, (ssize_t)buf->size()); 291 292 addr.sin_port = htons(rtcpPort); 293 294 n = sendto( 295 rtcpSocket, buf->data(), buf->size(), 0, 296 (const sockaddr *)&addr, sizeof(addr)); 297 CHECK_EQ(n, (ssize_t)buf->size()); 298 299 LOGV("successfully poked holes."); 300 } 301 302 virtual void onMessageReceived(const sp<AMessage> &msg) { 303 switch (msg->what()) { 304 case 'conn': 305 { 306 int32_t result; 307 CHECK(msg->findInt32("result", &result)); 308 309 LOGI("connection request completed with result %d (%s)", 310 result, strerror(-result)); 311 312 if (result == OK) { 313 AString request; 314 request = "DESCRIBE "; 315 request.append(mSessionURL); 316 request.append(" RTSP/1.0\r\n"); 317 request.append("Accept: application/sdp\r\n"); 318 request.append("\r\n"); 319 320 sp<AMessage> reply = new AMessage('desc', id()); 321 mConn->sendRequest(request.c_str(), reply); 322 } else { 323 (new AMessage('disc', id()))->post(); 324 } 325 break; 326 } 327 328 case 'disc': 329 { 330 int32_t reconnect; 331 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 332 sp<AMessage> reply = new AMessage('conn', id()); 333 mConn->connect(mOriginalSessionURL.c_str(), reply); 334 } else { 335 (new AMessage('quit', id()))->post(); 336 } 337 break; 338 } 339 340 case 'desc': 341 { 342 int32_t result; 343 CHECK(msg->findInt32("result", &result)); 344 345 LOGI("DESCRIBE completed with result %d (%s)", 346 result, strerror(-result)); 347 348 if (result == OK) { 349 sp<RefBase> obj; 350 CHECK(msg->findObject("response", &obj)); 351 sp<ARTSPResponse> response = 352 static_cast<ARTSPResponse *>(obj.get()); 353 354 if (response->mStatusCode == 302) { 355 ssize_t i = response->mHeaders.indexOfKey("location"); 356 CHECK_GE(i, 0); 357 358 mSessionURL = response->mHeaders.valueAt(i); 359 360 AString request; 361 request = "DESCRIBE "; 362 request.append(mSessionURL); 363 request.append(" RTSP/1.0\r\n"); 364 request.append("Accept: application/sdp\r\n"); 365 request.append("\r\n"); 366 367 sp<AMessage> reply = new AMessage('desc', id()); 368 mConn->sendRequest(request.c_str(), reply); 369 break; 370 } 371 372 if (response->mStatusCode != 200) { 373 result = UNKNOWN_ERROR; 374 } else { 375 mSessionDesc = new ASessionDescription; 376 377 mSessionDesc->setTo( 378 response->mContent->data(), 379 response->mContent->size()); 380 381 if (!mSessionDesc->isValid()) { 382 result = ERROR_MALFORMED; 383 } else { 384 ssize_t i = response->mHeaders.indexOfKey("content-base"); 385 if (i >= 0) { 386 mBaseURL = response->mHeaders.valueAt(i); 387 } else { 388 i = response->mHeaders.indexOfKey("content-location"); 389 if (i >= 0) { 390 mBaseURL = response->mHeaders.valueAt(i); 391 } else { 392 mBaseURL = mSessionURL; 393 } 394 } 395 396 CHECK_GT(mSessionDesc->countTracks(), 1u); 397 setupTrack(1); 398 } 399 } 400 } 401 402 if (result != OK) { 403 sp<AMessage> reply = new AMessage('disc', id()); 404 mConn->disconnect(reply); 405 } 406 break; 407 } 408 409 case 'setu': 410 { 411 size_t index; 412 CHECK(msg->findSize("index", &index)); 413 414 TrackInfo *track = NULL; 415 size_t trackIndex; 416 if (msg->findSize("track-index", &trackIndex)) { 417 track = &mTracks.editItemAt(trackIndex); 418 } 419 420 int32_t result; 421 CHECK(msg->findInt32("result", &result)); 422 423 LOGI("SETUP(%d) completed with result %d (%s)", 424 index, result, strerror(-result)); 425 426 if (result == OK) { 427 CHECK(track != NULL); 428 429 sp<RefBase> obj; 430 CHECK(msg->findObject("response", &obj)); 431 sp<ARTSPResponse> response = 432 static_cast<ARTSPResponse *>(obj.get()); 433 434 if (response->mStatusCode != 200) { 435 result = UNKNOWN_ERROR; 436 } else { 437 ssize_t i = response->mHeaders.indexOfKey("session"); 438 CHECK_GE(i, 0); 439 440 mSessionID = response->mHeaders.valueAt(i); 441 i = mSessionID.find(";"); 442 if (i >= 0) { 443 // Remove options, i.e. ";timeout=90" 444 mSessionID.erase(i, mSessionID.size() - i); 445 } 446 447 sp<AMessage> notify = new AMessage('accu', id()); 448 notify->setSize("track-index", trackIndex); 449 450 i = response->mHeaders.indexOfKey("transport"); 451 CHECK_GE(i, 0); 452 453 if (!track->mUsingInterleavedTCP) { 454 AString transport = response->mHeaders.valueAt(i); 455 456 pokeAHole(track->mRTPSocket, 457 track->mRTCPSocket, 458 transport); 459 } 460 461 mRTPConn->addStream( 462 track->mRTPSocket, track->mRTCPSocket, 463 mSessionDesc, index, 464 notify, track->mUsingInterleavedTCP); 465 466 mSetupTracksSuccessful = true; 467 } 468 } 469 470 if (result != OK) { 471 if (track) { 472 if (!track->mUsingInterleavedTCP) { 473 close(track->mRTPSocket); 474 close(track->mRTCPSocket); 475 } 476 477 mTracks.removeItemsAt(trackIndex); 478 } 479 } 480 481 ++index; 482 if (index < mSessionDesc->countTracks()) { 483 setupTrack(index); 484 } else if (mSetupTracksSuccessful) { 485 AString request = "PLAY "; 486 request.append(mSessionURL); 487 request.append(" RTSP/1.0\r\n"); 488 489 request.append("Session: "); 490 request.append(mSessionID); 491 request.append("\r\n"); 492 493 request.append("\r\n"); 494 495 sp<AMessage> reply = new AMessage('play', id()); 496 mConn->sendRequest(request.c_str(), reply); 497 } else { 498 sp<AMessage> reply = new AMessage('disc', id()); 499 mConn->disconnect(reply); 500 } 501 break; 502 } 503 504 case 'play': 505 { 506 int32_t result; 507 CHECK(msg->findInt32("result", &result)); 508 509 LOGI("PLAY completed with result %d (%s)", 510 result, strerror(-result)); 511 512 if (result == OK) { 513 sp<RefBase> obj; 514 CHECK(msg->findObject("response", &obj)); 515 sp<ARTSPResponse> response = 516 static_cast<ARTSPResponse *>(obj.get()); 517 518 if (response->mStatusCode != 200) { 519 result = UNKNOWN_ERROR; 520 } else { 521 parsePlayResponse(response); 522 523 sp<AMessage> timeout = new AMessage('tiou', id()); 524 timeout->post(kStartupTimeoutUs); 525 } 526 } 527 528 if (result != OK) { 529 sp<AMessage> reply = new AMessage('disc', id()); 530 mConn->disconnect(reply); 531 } 532 533 break; 534 } 535 536 case 'abor': 537 { 538 for (size_t i = 0; i < mTracks.size(); ++i) { 539 TrackInfo *info = &mTracks.editItemAt(i); 540 541 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 542 543 if (!info->mUsingInterleavedTCP) { 544 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 545 546 close(info->mRTPSocket); 547 close(info->mRTCPSocket); 548 } 549 } 550 mTracks.clear(); 551 mSetupTracksSuccessful = false; 552 mSeekPending = false; 553 mFirstAccessUnit = true; 554 mFirstAccessUnitNTP = 0; 555 mNumAccessUnitsReceived = 0; 556 mReceivedFirstRTCPPacket = false; 557 mReceivedFirstRTPPacket = false; 558 mSeekable = false; 559 560 sp<AMessage> reply = new AMessage('tear', id()); 561 562 int32_t reconnect; 563 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 564 reply->setInt32("reconnect", true); 565 } 566 567 AString request; 568 request = "TEARDOWN "; 569 570 // XXX should use aggregate url from SDP here... 571 request.append(mSessionURL); 572 request.append(" RTSP/1.0\r\n"); 573 574 request.append("Session: "); 575 request.append(mSessionID); 576 request.append("\r\n"); 577 578 request.append("\r\n"); 579 580 mConn->sendRequest(request.c_str(), reply); 581 break; 582 } 583 584 case 'tear': 585 { 586 int32_t result; 587 CHECK(msg->findInt32("result", &result)); 588 589 LOGI("TEARDOWN completed with result %d (%s)", 590 result, strerror(-result)); 591 592 sp<AMessage> reply = new AMessage('disc', id()); 593 594 int32_t reconnect; 595 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 596 reply->setInt32("reconnect", true); 597 } 598 599 mConn->disconnect(reply); 600 break; 601 } 602 603 case 'quit': 604 { 605 if (mDoneMsg != NULL) { 606 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 607 mDoneMsg->post(); 608 mDoneMsg = NULL; 609 } 610 break; 611 } 612 613 case 'chek': 614 { 615 int32_t generation; 616 CHECK(msg->findInt32("generation", &generation)); 617 if (generation != mCheckGeneration) { 618 // This is an outdated message. Ignore. 619 break; 620 } 621 622 if (mNumAccessUnitsReceived == 0) { 623 LOGI("stream ended? aborting."); 624 (new AMessage('abor', id()))->post(); 625 break; 626 } 627 628 mNumAccessUnitsReceived = 0; 629 msg->post(kAccessUnitTimeoutUs); 630 break; 631 } 632 633 case 'accu': 634 { 635 int32_t first; 636 if (msg->findInt32("first-rtcp", &first)) { 637 mReceivedFirstRTCPPacket = true; 638 break; 639 } 640 641 if (msg->findInt32("first-rtp", &first)) { 642 mReceivedFirstRTPPacket = true; 643 break; 644 } 645 646 ++mNumAccessUnitsReceived; 647 postAccessUnitTimeoutCheck(); 648 649 size_t trackIndex; 650 CHECK(msg->findSize("track-index", &trackIndex)); 651 652 if (trackIndex >= mTracks.size()) { 653 LOGV("late packets ignored."); 654 break; 655 } 656 657 TrackInfo *track = &mTracks.editItemAt(trackIndex); 658 659 int32_t eos; 660 if (msg->findInt32("eos", &eos)) { 661 LOGI("received BYE on track index %d", trackIndex); 662#if 0 663 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 664#endif 665 return; 666 } 667 668 sp<RefBase> obj; 669 CHECK(msg->findObject("access-unit", &obj)); 670 671 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 672 673 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 674 675 if (mSeekPending) { 676 LOGV("we're seeking, dropping stale packet."); 677 break; 678 } 679 680 if (seqNum < track->mFirstSeqNumInSegment) { 681 LOGV("dropping stale access-unit (%d < %d)", 682 seqNum, track->mFirstSeqNumInSegment); 683 break; 684 } 685 686 uint64_t ntpTime; 687 CHECK(accessUnit->meta()->findInt64( 688 "ntp-time", (int64_t *)&ntpTime)); 689 690 uint32_t rtpTime; 691 CHECK(accessUnit->meta()->findInt32( 692 "rtp-time", (int32_t *)&rtpTime)); 693 694 if (track->mNewSegment) { 695 track->mNewSegment = false; 696 697 LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d", 698 ntpTime, rtpTime, seqNum); 699 } 700 701 if (mFirstAccessUnit) { 702 mDoneMsg->setInt32("result", OK); 703 mDoneMsg->post(); 704 mDoneMsg = NULL; 705 706 mFirstAccessUnit = false; 707 mFirstAccessUnitNTP = ntpTime; 708 } 709 710 if (ntpTime >= mFirstAccessUnitNTP) { 711 ntpTime -= mFirstAccessUnitNTP; 712 } else { 713 ntpTime = 0; 714 } 715 716 int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 717 718 accessUnit->meta()->setInt64("timeUs", timeUs); 719 720#if 0 721 int32_t damaged; 722 if (accessUnit->meta()->findInt32("damaged", &damaged) 723 && damaged != 0) { 724 LOGI("ignoring damaged AU"); 725 } else 726#endif 727 { 728 TrackInfo *track = &mTracks.editItemAt(trackIndex); 729 track->mPacketSource->queueAccessUnit(accessUnit); 730 } 731 break; 732 } 733 734 case 'seek': 735 { 736 sp<AMessage> doneMsg; 737 CHECK(msg->findMessage("doneMsg", &doneMsg)); 738 739 if (mSeekPending) { 740 doneMsg->post(); 741 break; 742 } 743 744 if (!mSeekable) { 745 LOGW("This is a live stream, ignoring seek request."); 746 doneMsg->post(); 747 break; 748 } 749 750 int64_t timeUs; 751 CHECK(msg->findInt64("time", &timeUs)); 752 753 mSeekPending = true; 754 755 // Disable the access unit timeout until we resumed 756 // playback again. 757 mCheckPending = true; 758 ++mCheckGeneration; 759 760 AString request = "PAUSE "; 761 request.append(mSessionURL); 762 request.append(" RTSP/1.0\r\n"); 763 764 request.append("Session: "); 765 request.append(mSessionID); 766 request.append("\r\n"); 767 768 request.append("\r\n"); 769 770 sp<AMessage> reply = new AMessage('see1', id()); 771 reply->setInt64("time", timeUs); 772 reply->setMessage("doneMsg", doneMsg); 773 mConn->sendRequest(request.c_str(), reply); 774 break; 775 } 776 777 case 'see1': 778 { 779 // Session is paused now. 780 for (size_t i = 0; i < mTracks.size(); ++i) { 781 mTracks.editItemAt(i).mPacketSource->flushQueue(); 782 } 783 784 int64_t timeUs; 785 CHECK(msg->findInt64("time", &timeUs)); 786 787 AString request = "PLAY "; 788 request.append(mSessionURL); 789 request.append(" RTSP/1.0\r\n"); 790 791 request.append("Session: "); 792 request.append(mSessionID); 793 request.append("\r\n"); 794 795 request.append( 796 StringPrintf( 797 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 798 799 request.append("\r\n"); 800 801 sp<AMessage> doneMsg; 802 CHECK(msg->findMessage("doneMsg", &doneMsg)); 803 804 sp<AMessage> reply = new AMessage('see2', id()); 805 reply->setMessage("doneMsg", doneMsg); 806 mConn->sendRequest(request.c_str(), reply); 807 break; 808 } 809 810 case 'see2': 811 { 812 CHECK(mSeekPending); 813 814 int32_t result; 815 CHECK(msg->findInt32("result", &result)); 816 817 LOGI("PLAY completed with result %d (%s)", 818 result, strerror(-result)); 819 820 mCheckPending = false; 821 postAccessUnitTimeoutCheck(); 822 823 if (result == OK) { 824 sp<RefBase> obj; 825 CHECK(msg->findObject("response", &obj)); 826 sp<ARTSPResponse> response = 827 static_cast<ARTSPResponse *>(obj.get()); 828 829 if (response->mStatusCode != 200) { 830 result = UNKNOWN_ERROR; 831 } else { 832 parsePlayResponse(response); 833 834 LOGI("seek completed."); 835 } 836 } 837 838 if (result != OK) { 839 LOGE("seek failed, aborting."); 840 (new AMessage('abor', id()))->post(); 841 } 842 843 mSeekPending = false; 844 845 sp<AMessage> doneMsg; 846 CHECK(msg->findMessage("doneMsg", &doneMsg)); 847 848 doneMsg->post(); 849 break; 850 } 851 852 case 'biny': 853 { 854 sp<RefBase> obj; 855 CHECK(msg->findObject("buffer", &obj)); 856 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 857 858 int32_t index; 859 CHECK(buffer->meta()->findInt32("index", &index)); 860 861 mRTPConn->injectPacket(index, buffer); 862 break; 863 } 864 865 case 'tiou': 866 { 867 if (!mReceivedFirstRTCPPacket) { 868 if (mTryFakeRTCP) { 869 LOGW("Never received any data, disconnecting."); 870 (new AMessage('abor', id()))->post(); 871 } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) { 872 LOGW("We received RTP packets but no RTCP packets, " 873 "using fake timestamps."); 874 875 mTryFakeRTCP = true; 876 877 mReceivedFirstRTCPPacket = true; 878 mRTPConn->fakeTimestamps(); 879 } else { 880 LOGW("Never received any data, switching transports."); 881 882 mTryTCPInterleaving = true; 883 884 sp<AMessage> msg = new AMessage('abor', id()); 885 msg->setInt32("reconnect", true); 886 msg->post(); 887 } 888 } 889 break; 890 } 891 892 default: 893 TRESPASS(); 894 break; 895 } 896 } 897 898 void postAccessUnitTimeoutCheck() { 899 if (mCheckPending) { 900 return; 901 } 902 903 mCheckPending = true; 904 sp<AMessage> check = new AMessage('chek', id()); 905 check->setInt32("generation", mCheckGeneration); 906 check->post(kAccessUnitTimeoutUs); 907 } 908 909 static void SplitString( 910 const AString &s, const char *separator, List<AString> *items) { 911 items->clear(); 912 size_t start = 0; 913 while (start < s.size()) { 914 ssize_t offset = s.find(separator, start); 915 916 if (offset < 0) { 917 items->push_back(AString(s, start, s.size() - start)); 918 break; 919 } 920 921 items->push_back(AString(s, start, offset - start)); 922 start = offset + strlen(separator); 923 } 924 } 925 926 void parsePlayResponse(const sp<ARTSPResponse> &response) { 927 mSeekable = false; 928 929 ssize_t i = response->mHeaders.indexOfKey("range"); 930 if (i < 0) { 931 // Server doesn't even tell use what range it is going to 932 // play, therefore we won't support seeking. 933 return; 934 } 935 936 AString range = response->mHeaders.valueAt(i); 937 LOGV("Range: %s", range.c_str()); 938 939 AString val; 940 CHECK(GetAttribute(range.c_str(), "npt", &val)); 941 float npt1, npt2; 942 943 if (val == "now-" || val == "0-") { 944 // This is a live stream and therefore not seekable. 945 return; 946 } else { 947 CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2); 948 } 949 950 i = response->mHeaders.indexOfKey("rtp-info"); 951 CHECK_GE(i, 0); 952 953 AString rtpInfo = response->mHeaders.valueAt(i); 954 List<AString> streamInfos; 955 SplitString(rtpInfo, ",", &streamInfos); 956 957 int n = 1; 958 for (List<AString>::iterator it = streamInfos.begin(); 959 it != streamInfos.end(); ++it) { 960 (*it).trim(); 961 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 962 963 CHECK(GetAttribute((*it).c_str(), "url", &val)); 964 965 size_t trackIndex = 0; 966 while (trackIndex < mTracks.size() 967 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 968 ++trackIndex; 969 } 970 CHECK_LT(trackIndex, mTracks.size()); 971 972 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 973 974 char *end; 975 unsigned long seq = strtoul(val.c_str(), &end, 10); 976 977 TrackInfo *info = &mTracks.editItemAt(trackIndex); 978 info->mFirstSeqNumInSegment = seq; 979 info->mNewSegment = true; 980 981 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 982 983 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 984 985 LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1); 986 987 info->mPacketSource->setNormalPlayTimeMapping( 988 rtpTime, (int64_t)(npt1 * 1E6)); 989 990 ++n; 991 } 992 993 mSeekable = true; 994 } 995 996 sp<APacketSource> getPacketSource(size_t index) { 997 CHECK_GE(index, 0u); 998 CHECK_LT(index, mTracks.size()); 999 1000 return mTracks.editItemAt(index).mPacketSource; 1001 } 1002 1003 size_t countTracks() const { 1004 return mTracks.size(); 1005 } 1006 1007private: 1008 sp<ALooper> mLooper; 1009 sp<ALooper> mNetLooper; 1010 sp<ARTSPConnection> mConn; 1011 sp<ARTPConnection> mRTPConn; 1012 sp<ASessionDescription> mSessionDesc; 1013 AString mOriginalSessionURL; // This one still has user:pass@ 1014 AString mSessionURL; 1015 AString mBaseURL; 1016 AString mSessionID; 1017 bool mSetupTracksSuccessful; 1018 bool mSeekPending; 1019 bool mFirstAccessUnit; 1020 uint64_t mFirstAccessUnitNTP; 1021 int64_t mNumAccessUnitsReceived; 1022 bool mCheckPending; 1023 int32_t mCheckGeneration; 1024 bool mTryTCPInterleaving; 1025 bool mTryFakeRTCP; 1026 bool mReceivedFirstRTCPPacket; 1027 bool mReceivedFirstRTPPacket; 1028 bool mSeekable; 1029 1030 struct TrackInfo { 1031 AString mURL; 1032 int mRTPSocket; 1033 int mRTCPSocket; 1034 bool mUsingInterleavedTCP; 1035 uint32_t mFirstSeqNumInSegment; 1036 bool mNewSegment; 1037 1038 sp<APacketSource> mPacketSource; 1039 }; 1040 Vector<TrackInfo> mTracks; 1041 1042 sp<AMessage> mDoneMsg; 1043 1044 void setupTrack(size_t index) { 1045 sp<APacketSource> source = 1046 new APacketSource(mSessionDesc, index); 1047 1048 if (source->initCheck() != OK) { 1049 LOGW("Unsupported format. Ignoring track #%d.", index); 1050 1051 sp<AMessage> reply = new AMessage('setu', id()); 1052 reply->setSize("index", index); 1053 reply->setInt32("result", ERROR_UNSUPPORTED); 1054 reply->post(); 1055 return; 1056 } 1057 1058 AString url; 1059 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1060 1061 AString trackURL; 1062 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1063 1064 mTracks.push(TrackInfo()); 1065 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1066 info->mURL = trackURL; 1067 info->mPacketSource = source; 1068 info->mUsingInterleavedTCP = false; 1069 info->mFirstSeqNumInSegment = 0; 1070 info->mNewSegment = true; 1071 1072 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1073 1074 AString request = "SETUP "; 1075 request.append(trackURL); 1076 request.append(" RTSP/1.0\r\n"); 1077 1078 if (mTryTCPInterleaving) { 1079 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1080 info->mUsingInterleavedTCP = true; 1081 info->mRTPSocket = interleaveIndex; 1082 info->mRTCPSocket = interleaveIndex + 1; 1083 1084 request.append("Transport: RTP/AVP/TCP;interleaved="); 1085 request.append(interleaveIndex); 1086 request.append("-"); 1087 request.append(interleaveIndex + 1); 1088 } else { 1089 unsigned rtpPort; 1090 ARTPConnection::MakePortPair( 1091 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1092 1093 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1094 request.append(rtpPort); 1095 request.append("-"); 1096 request.append(rtpPort + 1); 1097 } 1098 1099 request.append("\r\n"); 1100 1101 if (index > 1) { 1102 request.append("Session: "); 1103 request.append(mSessionID); 1104 request.append("\r\n"); 1105 } 1106 1107 request.append("\r\n"); 1108 1109 sp<AMessage> reply = new AMessage('setu', id()); 1110 reply->setSize("index", index); 1111 reply->setSize("track-index", mTracks.size() - 1); 1112 mConn->sendRequest(request.c_str(), reply); 1113 } 1114 1115 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1116 out->clear(); 1117 1118 if (strncasecmp("rtsp://", baseURL, 7)) { 1119 // Base URL must be absolute 1120 return false; 1121 } 1122 1123 if (!strncasecmp("rtsp://", url, 7)) { 1124 // "url" is already an absolute URL, ignore base URL. 1125 out->setTo(url); 1126 return true; 1127 } 1128 1129 size_t n = strlen(baseURL); 1130 if (baseURL[n - 1] == '/') { 1131 out->setTo(baseURL); 1132 out->append(url); 1133 } else { 1134 const char *slashPos = strrchr(baseURL, '/'); 1135 1136 if (slashPos > &baseURL[6]) { 1137 out->setTo(baseURL, slashPos - baseURL); 1138 } else { 1139 out->setTo(baseURL); 1140 } 1141 1142 out->append("/"); 1143 out->append(url); 1144 } 1145 1146 return true; 1147 } 1148 1149 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1150}; 1151 1152} // namespace android 1153 1154#endif // MY_HANDLER_H_ 1155