MyHandler.h revision ea47cb4edea4426b0da7807db10548ddae7104f2
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41#include <netdb.h> 42 43// If no access units are received within 3 secs, assume that the rtp 44// stream has ended and signal end of stream. 45static int64_t kAccessUnitTimeoutUs = 3000000ll; 46 47// If no access units arrive for the first 10 secs after starting the 48// stream, assume none ever will and signal EOS or switch transports. 49static int64_t kStartupTimeoutUs = 10000000ll; 50 51namespace android { 52 53static void MakeUserAgentString(AString *s) { 54 s->setTo("stagefright/1.1 (Linux;Android "); 55 56#if (PROPERTY_VALUE_MAX < 8) 57#error "PROPERTY_VALUE_MAX must be at least 8" 58#endif 59 60 char value[PROPERTY_VALUE_MAX]; 61 property_get("ro.build.version.release", value, "Unknown"); 62 s->append(value); 63 s->append(")"); 64} 65 66static bool GetAttribute(const char *s, const char *key, AString *value) { 67 value->clear(); 68 69 size_t keyLen = strlen(key); 70 71 for (;;) { 72 while (isspace(*s)) { 73 ++s; 74 } 75 76 const char *colonPos = strchr(s, ';'); 77 78 size_t len = 79 (colonPos == NULL) ? strlen(s) : colonPos - s; 80 81 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 82 value->setTo(&s[keyLen + 1], len - keyLen - 1); 83 return true; 84 } 85 86 if (colonPos == NULL) { 87 return false; 88 } 89 90 s = colonPos + 1; 91 } 92} 93 94struct MyHandler : public AHandler { 95 MyHandler(const char *url, const sp<ALooper> &looper) 96 : mLooper(looper), 97 mNetLooper(new ALooper), 98 mConn(new ARTSPConnection), 99 mRTPConn(new ARTPConnection), 100 mOriginalSessionURL(url), 101 mSessionURL(url), 102 mSetupTracksSuccessful(false), 103 mSeekPending(false), 104 mFirstAccessUnit(true), 105 mFirstAccessUnitNTP(0), 106 mNumAccessUnitsReceived(0), 107 mCheckPending(false), 108 mCheckGeneration(0), 109 mTryTCPInterleaving(false), 110 mTryFakeRTCP(false), 111 mReceivedFirstRTCPPacket(false), 112 mReceivedFirstRTPPacket(false), 113 mSeekable(false) { 114 mNetLooper->setName("rtsp net"); 115 mNetLooper->start(false /* runOnCallingThread */, 116 false /* canCallJava */, 117 PRIORITY_HIGHEST); 118 119 // Strip any authentication info from the session url, we don't 120 // want to transmit user/pass in cleartext. 121 AString host, path, user, pass; 122 unsigned port; 123 CHECK(ARTSPConnection::ParseURL( 124 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 125 126 if (user.size() > 0) { 127 mSessionURL.clear(); 128 mSessionURL.append("rtsp://"); 129 mSessionURL.append(host); 130 mSessionURL.append(":"); 131 mSessionURL.append(StringPrintf("%u", port)); 132 mSessionURL.append(path); 133 134 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 135 } 136 137 mSessionHost = host; 138 } 139 140 void connect(const sp<AMessage> &doneMsg) { 141 mDoneMsg = doneMsg; 142 143 mLooper->registerHandler(this); 144 mLooper->registerHandler(mConn); 145 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 146 147 sp<AMessage> notify = new AMessage('biny', id()); 148 mConn->observeBinaryData(notify); 149 150 sp<AMessage> reply = new AMessage('conn', id()); 151 mConn->connect(mOriginalSessionURL.c_str(), reply); 152 } 153 154 void disconnect(const sp<AMessage> &doneMsg) { 155 mDoneMsg = doneMsg; 156 157 (new AMessage('abor', id()))->post(); 158 } 159 160 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 161 sp<AMessage> msg = new AMessage('seek', id()); 162 msg->setInt64("time", timeUs); 163 msg->setMessage("doneMsg", doneMsg); 164 msg->post(); 165 } 166 167 int64_t getNormalPlayTimeUs() { 168 int64_t maxTimeUs = 0; 169 for (size_t i = 0; i < mTracks.size(); ++i) { 170 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 171 ->getNormalPlayTimeUs(); 172 173 if (i == 0 || timeUs > maxTimeUs) { 174 maxTimeUs = timeUs; 175 } 176 } 177 178 return maxTimeUs; 179 } 180 181 static void addRR(const sp<ABuffer> &buf) { 182 uint8_t *ptr = buf->data() + buf->size(); 183 ptr[0] = 0x80 | 0; 184 ptr[1] = 201; // RR 185 ptr[2] = 0; 186 ptr[3] = 1; 187 ptr[4] = 0xde; // SSRC 188 ptr[5] = 0xad; 189 ptr[6] = 0xbe; 190 ptr[7] = 0xef; 191 192 buf->setRange(0, buf->size() + 8); 193 } 194 195 static void addSDES(int s, const sp<ABuffer> &buffer) { 196 struct sockaddr_in addr; 197 socklen_t addrSize = sizeof(addr); 198 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 199 200 uint8_t *data = buffer->data() + buffer->size(); 201 data[0] = 0x80 | 1; 202 data[1] = 202; // SDES 203 data[4] = 0xde; // SSRC 204 data[5] = 0xad; 205 data[6] = 0xbe; 206 data[7] = 0xef; 207 208 size_t offset = 8; 209 210 data[offset++] = 1; // CNAME 211 212 AString cname = "stagefright@"; 213 cname.append(inet_ntoa(addr.sin_addr)); 214 data[offset++] = cname.size(); 215 216 memcpy(&data[offset], cname.c_str(), cname.size()); 217 offset += cname.size(); 218 219 data[offset++] = 6; // TOOL 220 221 AString tool; 222 MakeUserAgentString(&tool); 223 224 data[offset++] = tool.size(); 225 226 memcpy(&data[offset], tool.c_str(), tool.size()); 227 offset += tool.size(); 228 229 data[offset++] = 0; 230 231 if ((offset % 4) > 0) { 232 size_t count = 4 - (offset % 4); 233 switch (count) { 234 case 3: 235 data[offset++] = 0; 236 case 2: 237 data[offset++] = 0; 238 case 1: 239 data[offset++] = 0; 240 } 241 } 242 243 size_t numWords = (offset / 4) - 1; 244 data[2] = numWords >> 8; 245 data[3] = numWords & 0xff; 246 247 buffer->setRange(buffer->offset(), buffer->size() + offset); 248 } 249 250 // In case we're behind NAT, fire off two UDP packets to the remote 251 // rtp/rtcp ports to poke a hole into the firewall for future incoming 252 // packets. We're going to send an RR/SDES RTCP packet to both of them. 253 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 254 struct sockaddr_in addr; 255 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 256 addr.sin_family = AF_INET; 257 258 AString source; 259 AString server_port; 260 if (!GetAttribute(transport.c_str(), 261 "source", 262 &source)) { 263 LOGW("Missing 'source' field in Transport response. Using " 264 "RTSP endpoint address."); 265 266 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 267 if (ent == NULL) { 268 LOGE("Failed to look up address of session host '%s'", 269 mSessionHost.c_str()); 270 271 return false; 272 } 273 274 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 275 } else { 276 addr.sin_addr.s_addr = inet_addr(source.c_str()); 277 } 278 279 if (!GetAttribute(transport.c_str(), 280 "server_port", 281 &server_port)) { 282 LOGI("Missing 'server_port' field in Transport response."); 283 return false; 284 } 285 286 int rtpPort, rtcpPort; 287 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 288 || rtpPort <= 0 || rtpPort > 65535 289 || rtcpPort <=0 || rtcpPort > 65535 290 || rtcpPort != rtpPort + 1) { 291 LOGE("Server picked invalid RTP/RTCP port pair %s," 292 " RTP port must be even, RTCP port must be one higher.", 293 server_port.c_str()); 294 295 return false; 296 } 297 298 if (rtpPort & 1) { 299 LOGW("Server picked an odd RTP port, it should've picked an " 300 "even one, we'll let it pass for now, but this may break " 301 "in the future."); 302 } 303 304 if (addr.sin_addr.s_addr == INADDR_NONE) { 305 return true; 306 } 307 308 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 309 // No firewalls to traverse on the loopback interface. 310 return true; 311 } 312 313 // Make up an RR/SDES RTCP packet. 314 sp<ABuffer> buf = new ABuffer(65536); 315 buf->setRange(0, 0); 316 addRR(buf); 317 addSDES(rtpSocket, buf); 318 319 addr.sin_port = htons(rtpPort); 320 321 ssize_t n = sendto( 322 rtpSocket, buf->data(), buf->size(), 0, 323 (const sockaddr *)&addr, sizeof(addr)); 324 325 if (n < (ssize_t)buf->size()) { 326 LOGE("failed to poke a hole for RTP packets"); 327 return false; 328 } 329 330 addr.sin_port = htons(rtcpPort); 331 332 n = sendto( 333 rtcpSocket, buf->data(), buf->size(), 0, 334 (const sockaddr *)&addr, sizeof(addr)); 335 336 if (n < (ssize_t)buf->size()) { 337 LOGE("failed to poke a hole for RTCP packets"); 338 return false; 339 } 340 341 LOGV("successfully poked holes."); 342 343 return true; 344 } 345 346 virtual void onMessageReceived(const sp<AMessage> &msg) { 347 switch (msg->what()) { 348 case 'conn': 349 { 350 int32_t result; 351 CHECK(msg->findInt32("result", &result)); 352 353 LOGI("connection request completed with result %d (%s)", 354 result, strerror(-result)); 355 356 if (result == OK) { 357 AString request; 358 request = "DESCRIBE "; 359 request.append(mSessionURL); 360 request.append(" RTSP/1.0\r\n"); 361 request.append("Accept: application/sdp\r\n"); 362 request.append("\r\n"); 363 364 sp<AMessage> reply = new AMessage('desc', id()); 365 mConn->sendRequest(request.c_str(), reply); 366 } else { 367 (new AMessage('disc', id()))->post(); 368 } 369 break; 370 } 371 372 case 'disc': 373 { 374 int32_t reconnect; 375 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 376 sp<AMessage> reply = new AMessage('conn', id()); 377 mConn->connect(mOriginalSessionURL.c_str(), reply); 378 } else { 379 (new AMessage('quit', id()))->post(); 380 } 381 break; 382 } 383 384 case 'desc': 385 { 386 int32_t result; 387 CHECK(msg->findInt32("result", &result)); 388 389 LOGI("DESCRIBE completed with result %d (%s)", 390 result, strerror(-result)); 391 392 if (result == OK) { 393 sp<RefBase> obj; 394 CHECK(msg->findObject("response", &obj)); 395 sp<ARTSPResponse> response = 396 static_cast<ARTSPResponse *>(obj.get()); 397 398 if (response->mStatusCode == 302) { 399 ssize_t i = response->mHeaders.indexOfKey("location"); 400 CHECK_GE(i, 0); 401 402 mSessionURL = response->mHeaders.valueAt(i); 403 404 AString request; 405 request = "DESCRIBE "; 406 request.append(mSessionURL); 407 request.append(" RTSP/1.0\r\n"); 408 request.append("Accept: application/sdp\r\n"); 409 request.append("\r\n"); 410 411 sp<AMessage> reply = new AMessage('desc', id()); 412 mConn->sendRequest(request.c_str(), reply); 413 break; 414 } 415 416 if (response->mStatusCode != 200) { 417 result = UNKNOWN_ERROR; 418 } else { 419 mSessionDesc = new ASessionDescription; 420 421 mSessionDesc->setTo( 422 response->mContent->data(), 423 response->mContent->size()); 424 425 if (!mSessionDesc->isValid()) { 426 LOGE("Failed to parse session description."); 427 result = ERROR_MALFORMED; 428 } else { 429 ssize_t i = response->mHeaders.indexOfKey("content-base"); 430 if (i >= 0) { 431 mBaseURL = response->mHeaders.valueAt(i); 432 } else { 433 i = response->mHeaders.indexOfKey("content-location"); 434 if (i >= 0) { 435 mBaseURL = response->mHeaders.valueAt(i); 436 } else { 437 mBaseURL = mSessionURL; 438 } 439 } 440 441 if (!mBaseURL.startsWith("rtsp://")) { 442 // Some misbehaving servers specify a relative 443 // URL in one of the locations above, combine 444 // it with the absolute session URL to get 445 // something usable... 446 447 LOGW("Server specified a non-absolute base URL" 448 ", combining it with the session URL to " 449 "get something usable..."); 450 451 AString tmp; 452 CHECK(MakeURL( 453 mSessionURL.c_str(), 454 mBaseURL.c_str(), 455 &tmp)); 456 457 mBaseURL = tmp; 458 } 459 460 CHECK_GT(mSessionDesc->countTracks(), 1u); 461 setupTrack(1); 462 } 463 } 464 } 465 466 if (result != OK) { 467 sp<AMessage> reply = new AMessage('disc', id()); 468 mConn->disconnect(reply); 469 } 470 break; 471 } 472 473 case 'setu': 474 { 475 size_t index; 476 CHECK(msg->findSize("index", &index)); 477 478 TrackInfo *track = NULL; 479 size_t trackIndex; 480 if (msg->findSize("track-index", &trackIndex)) { 481 track = &mTracks.editItemAt(trackIndex); 482 } 483 484 int32_t result; 485 CHECK(msg->findInt32("result", &result)); 486 487 LOGI("SETUP(%d) completed with result %d (%s)", 488 index, result, strerror(-result)); 489 490 if (result == OK) { 491 CHECK(track != NULL); 492 493 sp<RefBase> obj; 494 CHECK(msg->findObject("response", &obj)); 495 sp<ARTSPResponse> response = 496 static_cast<ARTSPResponse *>(obj.get()); 497 498 if (response->mStatusCode != 200) { 499 result = UNKNOWN_ERROR; 500 } else { 501 ssize_t i = response->mHeaders.indexOfKey("session"); 502 CHECK_GE(i, 0); 503 504 mSessionID = response->mHeaders.valueAt(i); 505 i = mSessionID.find(";"); 506 if (i >= 0) { 507 // Remove options, i.e. ";timeout=90" 508 mSessionID.erase(i, mSessionID.size() - i); 509 } 510 511 sp<AMessage> notify = new AMessage('accu', id()); 512 notify->setSize("track-index", trackIndex); 513 514 i = response->mHeaders.indexOfKey("transport"); 515 CHECK_GE(i, 0); 516 517 if (!track->mUsingInterleavedTCP) { 518 AString transport = response->mHeaders.valueAt(i); 519 520 // We are going to continue even if we were 521 // unable to poke a hole into the firewall... 522 pokeAHole( 523 track->mRTPSocket, 524 track->mRTCPSocket, 525 transport); 526 } 527 528 mRTPConn->addStream( 529 track->mRTPSocket, track->mRTCPSocket, 530 mSessionDesc, index, 531 notify, track->mUsingInterleavedTCP); 532 533 mSetupTracksSuccessful = true; 534 } 535 } 536 537 if (result != OK) { 538 if (track) { 539 if (!track->mUsingInterleavedTCP) { 540 close(track->mRTPSocket); 541 close(track->mRTCPSocket); 542 } 543 544 mTracks.removeItemsAt(trackIndex); 545 } 546 } 547 548 ++index; 549 if (index < mSessionDesc->countTracks()) { 550 setupTrack(index); 551 } else if (mSetupTracksSuccessful) { 552 AString request = "PLAY "; 553 request.append(mSessionURL); 554 request.append(" RTSP/1.0\r\n"); 555 556 request.append("Session: "); 557 request.append(mSessionID); 558 request.append("\r\n"); 559 560 request.append("\r\n"); 561 562 sp<AMessage> reply = new AMessage('play', id()); 563 mConn->sendRequest(request.c_str(), reply); 564 } else { 565 sp<AMessage> reply = new AMessage('disc', id()); 566 mConn->disconnect(reply); 567 } 568 break; 569 } 570 571 case 'play': 572 { 573 int32_t result; 574 CHECK(msg->findInt32("result", &result)); 575 576 LOGI("PLAY completed with result %d (%s)", 577 result, strerror(-result)); 578 579 if (result == OK) { 580 sp<RefBase> obj; 581 CHECK(msg->findObject("response", &obj)); 582 sp<ARTSPResponse> response = 583 static_cast<ARTSPResponse *>(obj.get()); 584 585 if (response->mStatusCode != 200) { 586 result = UNKNOWN_ERROR; 587 } else { 588 parsePlayResponse(response); 589 590 sp<AMessage> timeout = new AMessage('tiou', id()); 591 timeout->post(kStartupTimeoutUs); 592 } 593 } 594 595 if (result != OK) { 596 sp<AMessage> reply = new AMessage('disc', id()); 597 mConn->disconnect(reply); 598 } 599 600 break; 601 } 602 603 case 'abor': 604 { 605 for (size_t i = 0; i < mTracks.size(); ++i) { 606 TrackInfo *info = &mTracks.editItemAt(i); 607 608 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 609 610 if (!info->mUsingInterleavedTCP) { 611 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 612 613 close(info->mRTPSocket); 614 close(info->mRTCPSocket); 615 } 616 } 617 mTracks.clear(); 618 mSetupTracksSuccessful = false; 619 mSeekPending = false; 620 mFirstAccessUnit = true; 621 mFirstAccessUnitNTP = 0; 622 mNumAccessUnitsReceived = 0; 623 mReceivedFirstRTCPPacket = false; 624 mReceivedFirstRTPPacket = false; 625 mSeekable = false; 626 627 sp<AMessage> reply = new AMessage('tear', id()); 628 629 int32_t reconnect; 630 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 631 reply->setInt32("reconnect", true); 632 } 633 634 AString request; 635 request = "TEARDOWN "; 636 637 // XXX should use aggregate url from SDP here... 638 request.append(mSessionURL); 639 request.append(" RTSP/1.0\r\n"); 640 641 request.append("Session: "); 642 request.append(mSessionID); 643 request.append("\r\n"); 644 645 request.append("\r\n"); 646 647 mConn->sendRequest(request.c_str(), reply); 648 break; 649 } 650 651 case 'tear': 652 { 653 int32_t result; 654 CHECK(msg->findInt32("result", &result)); 655 656 LOGI("TEARDOWN completed with result %d (%s)", 657 result, strerror(-result)); 658 659 sp<AMessage> reply = new AMessage('disc', id()); 660 661 int32_t reconnect; 662 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 663 reply->setInt32("reconnect", true); 664 } 665 666 mConn->disconnect(reply); 667 break; 668 } 669 670 case 'quit': 671 { 672 if (mDoneMsg != NULL) { 673 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 674 mDoneMsg->post(); 675 mDoneMsg = NULL; 676 } 677 break; 678 } 679 680 case 'chek': 681 { 682 int32_t generation; 683 CHECK(msg->findInt32("generation", &generation)); 684 if (generation != mCheckGeneration) { 685 // This is an outdated message. Ignore. 686 break; 687 } 688 689 if (mNumAccessUnitsReceived == 0) { 690 LOGI("stream ended? aborting."); 691 (new AMessage('abor', id()))->post(); 692 break; 693 } 694 695 mNumAccessUnitsReceived = 0; 696 msg->post(kAccessUnitTimeoutUs); 697 break; 698 } 699 700 case 'accu': 701 { 702 int32_t first; 703 if (msg->findInt32("first-rtcp", &first)) { 704 mReceivedFirstRTCPPacket = true; 705 break; 706 } 707 708 if (msg->findInt32("first-rtp", &first)) { 709 mReceivedFirstRTPPacket = true; 710 break; 711 } 712 713 ++mNumAccessUnitsReceived; 714 postAccessUnitTimeoutCheck(); 715 716 size_t trackIndex; 717 CHECK(msg->findSize("track-index", &trackIndex)); 718 719 if (trackIndex >= mTracks.size()) { 720 LOGV("late packets ignored."); 721 break; 722 } 723 724 TrackInfo *track = &mTracks.editItemAt(trackIndex); 725 726 int32_t eos; 727 if (msg->findInt32("eos", &eos)) { 728 LOGI("received BYE on track index %d", trackIndex); 729#if 0 730 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 731#endif 732 return; 733 } 734 735 sp<RefBase> obj; 736 CHECK(msg->findObject("access-unit", &obj)); 737 738 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 739 740 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 741 742 if (mSeekPending) { 743 LOGV("we're seeking, dropping stale packet."); 744 break; 745 } 746 747 if (seqNum < track->mFirstSeqNumInSegment) { 748 LOGV("dropping stale access-unit (%d < %d)", 749 seqNum, track->mFirstSeqNumInSegment); 750 break; 751 } 752 753 uint64_t ntpTime; 754 CHECK(accessUnit->meta()->findInt64( 755 "ntp-time", (int64_t *)&ntpTime)); 756 757 uint32_t rtpTime; 758 CHECK(accessUnit->meta()->findInt32( 759 "rtp-time", (int32_t *)&rtpTime)); 760 761 if (track->mNewSegment) { 762 track->mNewSegment = false; 763 764 LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d", 765 ntpTime, rtpTime, seqNum); 766 } 767 768 if (mFirstAccessUnit) { 769 mDoneMsg->setInt32("result", OK); 770 mDoneMsg->post(); 771 mDoneMsg = NULL; 772 773 mFirstAccessUnit = false; 774 mFirstAccessUnitNTP = ntpTime; 775 } 776 777 if (ntpTime >= mFirstAccessUnitNTP) { 778 ntpTime -= mFirstAccessUnitNTP; 779 } else { 780 ntpTime = 0; 781 } 782 783 int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 784 785 accessUnit->meta()->setInt64("timeUs", timeUs); 786 787#if 0 788 int32_t damaged; 789 if (accessUnit->meta()->findInt32("damaged", &damaged) 790 && damaged != 0) { 791 LOGI("ignoring damaged AU"); 792 } else 793#endif 794 { 795 TrackInfo *track = &mTracks.editItemAt(trackIndex); 796 track->mPacketSource->queueAccessUnit(accessUnit); 797 } 798 break; 799 } 800 801 case 'seek': 802 { 803 sp<AMessage> doneMsg; 804 CHECK(msg->findMessage("doneMsg", &doneMsg)); 805 806 if (mSeekPending) { 807 doneMsg->post(); 808 break; 809 } 810 811 if (!mSeekable) { 812 LOGW("This is a live stream, ignoring seek request."); 813 doneMsg->post(); 814 break; 815 } 816 817 int64_t timeUs; 818 CHECK(msg->findInt64("time", &timeUs)); 819 820 mSeekPending = true; 821 822 // Disable the access unit timeout until we resumed 823 // playback again. 824 mCheckPending = true; 825 ++mCheckGeneration; 826 827 AString request = "PAUSE "; 828 request.append(mSessionURL); 829 request.append(" RTSP/1.0\r\n"); 830 831 request.append("Session: "); 832 request.append(mSessionID); 833 request.append("\r\n"); 834 835 request.append("\r\n"); 836 837 sp<AMessage> reply = new AMessage('see1', id()); 838 reply->setInt64("time", timeUs); 839 reply->setMessage("doneMsg", doneMsg); 840 mConn->sendRequest(request.c_str(), reply); 841 break; 842 } 843 844 case 'see1': 845 { 846 // Session is paused now. 847 for (size_t i = 0; i < mTracks.size(); ++i) { 848 mTracks.editItemAt(i).mPacketSource->flushQueue(); 849 } 850 851 int64_t timeUs; 852 CHECK(msg->findInt64("time", &timeUs)); 853 854 AString request = "PLAY "; 855 request.append(mSessionURL); 856 request.append(" RTSP/1.0\r\n"); 857 858 request.append("Session: "); 859 request.append(mSessionID); 860 request.append("\r\n"); 861 862 request.append( 863 StringPrintf( 864 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 865 866 request.append("\r\n"); 867 868 sp<AMessage> doneMsg; 869 CHECK(msg->findMessage("doneMsg", &doneMsg)); 870 871 sp<AMessage> reply = new AMessage('see2', id()); 872 reply->setMessage("doneMsg", doneMsg); 873 mConn->sendRequest(request.c_str(), reply); 874 break; 875 } 876 877 case 'see2': 878 { 879 CHECK(mSeekPending); 880 881 int32_t result; 882 CHECK(msg->findInt32("result", &result)); 883 884 LOGI("PLAY completed with result %d (%s)", 885 result, strerror(-result)); 886 887 mCheckPending = false; 888 postAccessUnitTimeoutCheck(); 889 890 if (result == OK) { 891 sp<RefBase> obj; 892 CHECK(msg->findObject("response", &obj)); 893 sp<ARTSPResponse> response = 894 static_cast<ARTSPResponse *>(obj.get()); 895 896 if (response->mStatusCode != 200) { 897 result = UNKNOWN_ERROR; 898 } else { 899 parsePlayResponse(response); 900 901 LOGI("seek completed."); 902 } 903 } 904 905 if (result != OK) { 906 LOGE("seek failed, aborting."); 907 (new AMessage('abor', id()))->post(); 908 } 909 910 mSeekPending = false; 911 912 sp<AMessage> doneMsg; 913 CHECK(msg->findMessage("doneMsg", &doneMsg)); 914 915 doneMsg->post(); 916 break; 917 } 918 919 case 'biny': 920 { 921 sp<RefBase> obj; 922 CHECK(msg->findObject("buffer", &obj)); 923 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 924 925 int32_t index; 926 CHECK(buffer->meta()->findInt32("index", &index)); 927 928 mRTPConn->injectPacket(index, buffer); 929 break; 930 } 931 932 case 'tiou': 933 { 934 if (!mReceivedFirstRTCPPacket) { 935 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 936 LOGW("We received RTP packets but no RTCP packets, " 937 "using fake timestamps."); 938 939 mTryFakeRTCP = true; 940 941 mReceivedFirstRTCPPacket = true; 942 mRTPConn->fakeTimestamps(); 943 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 944 LOGW("Never received any data, switching transports."); 945 946 mTryTCPInterleaving = true; 947 948 sp<AMessage> msg = new AMessage('abor', id()); 949 msg->setInt32("reconnect", true); 950 msg->post(); 951 } else { 952 LOGW("Never received any data, disconnecting."); 953 (new AMessage('abor', id()))->post(); 954 } 955 } 956 break; 957 } 958 959 default: 960 TRESPASS(); 961 break; 962 } 963 } 964 965 void postAccessUnitTimeoutCheck() { 966 if (mCheckPending) { 967 return; 968 } 969 970 mCheckPending = true; 971 sp<AMessage> check = new AMessage('chek', id()); 972 check->setInt32("generation", mCheckGeneration); 973 check->post(kAccessUnitTimeoutUs); 974 } 975 976 static void SplitString( 977 const AString &s, const char *separator, List<AString> *items) { 978 items->clear(); 979 size_t start = 0; 980 while (start < s.size()) { 981 ssize_t offset = s.find(separator, start); 982 983 if (offset < 0) { 984 items->push_back(AString(s, start, s.size() - start)); 985 break; 986 } 987 988 items->push_back(AString(s, start, offset - start)); 989 start = offset + strlen(separator); 990 } 991 } 992 993 void parsePlayResponse(const sp<ARTSPResponse> &response) { 994 mSeekable = false; 995 996 ssize_t i = response->mHeaders.indexOfKey("range"); 997 if (i < 0) { 998 // Server doesn't even tell use what range it is going to 999 // play, therefore we won't support seeking. 1000 return; 1001 } 1002 1003 AString range = response->mHeaders.valueAt(i); 1004 LOGV("Range: %s", range.c_str()); 1005 1006 AString val; 1007 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1008 1009 float npt1, npt2; 1010 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1011 // This is a live stream and therefore not seekable. 1012 return; 1013 } 1014 1015 i = response->mHeaders.indexOfKey("rtp-info"); 1016 CHECK_GE(i, 0); 1017 1018 AString rtpInfo = response->mHeaders.valueAt(i); 1019 List<AString> streamInfos; 1020 SplitString(rtpInfo, ",", &streamInfos); 1021 1022 int n = 1; 1023 for (List<AString>::iterator it = streamInfos.begin(); 1024 it != streamInfos.end(); ++it) { 1025 (*it).trim(); 1026 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1027 1028 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1029 1030 size_t trackIndex = 0; 1031 while (trackIndex < mTracks.size() 1032 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1033 ++trackIndex; 1034 } 1035 CHECK_LT(trackIndex, mTracks.size()); 1036 1037 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1038 1039 char *end; 1040 unsigned long seq = strtoul(val.c_str(), &end, 10); 1041 1042 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1043 info->mFirstSeqNumInSegment = seq; 1044 info->mNewSegment = true; 1045 1046 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1047 1048 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1049 1050 LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1); 1051 1052 info->mPacketSource->setNormalPlayTimeMapping( 1053 rtpTime, (int64_t)(npt1 * 1E6)); 1054 1055 ++n; 1056 } 1057 1058 mSeekable = true; 1059 } 1060 1061 sp<APacketSource> getPacketSource(size_t index) { 1062 CHECK_GE(index, 0u); 1063 CHECK_LT(index, mTracks.size()); 1064 1065 return mTracks.editItemAt(index).mPacketSource; 1066 } 1067 1068 size_t countTracks() const { 1069 return mTracks.size(); 1070 } 1071 1072private: 1073 sp<ALooper> mLooper; 1074 sp<ALooper> mNetLooper; 1075 sp<ARTSPConnection> mConn; 1076 sp<ARTPConnection> mRTPConn; 1077 sp<ASessionDescription> mSessionDesc; 1078 AString mOriginalSessionURL; // This one still has user:pass@ 1079 AString mSessionURL; 1080 AString mSessionHost; 1081 AString mBaseURL; 1082 AString mSessionID; 1083 bool mSetupTracksSuccessful; 1084 bool mSeekPending; 1085 bool mFirstAccessUnit; 1086 uint64_t mFirstAccessUnitNTP; 1087 int64_t mNumAccessUnitsReceived; 1088 bool mCheckPending; 1089 int32_t mCheckGeneration; 1090 bool mTryTCPInterleaving; 1091 bool mTryFakeRTCP; 1092 bool mReceivedFirstRTCPPacket; 1093 bool mReceivedFirstRTPPacket; 1094 bool mSeekable; 1095 1096 struct TrackInfo { 1097 AString mURL; 1098 int mRTPSocket; 1099 int mRTCPSocket; 1100 bool mUsingInterleavedTCP; 1101 uint32_t mFirstSeqNumInSegment; 1102 bool mNewSegment; 1103 1104 sp<APacketSource> mPacketSource; 1105 }; 1106 Vector<TrackInfo> mTracks; 1107 1108 sp<AMessage> mDoneMsg; 1109 1110 void setupTrack(size_t index) { 1111 sp<APacketSource> source = 1112 new APacketSource(mSessionDesc, index); 1113 1114 if (source->initCheck() != OK) { 1115 LOGW("Unsupported format. Ignoring track #%d.", index); 1116 1117 sp<AMessage> reply = new AMessage('setu', id()); 1118 reply->setSize("index", index); 1119 reply->setInt32("result", ERROR_UNSUPPORTED); 1120 reply->post(); 1121 return; 1122 } 1123 1124 AString url; 1125 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1126 1127 AString trackURL; 1128 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1129 1130 mTracks.push(TrackInfo()); 1131 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1132 info->mURL = trackURL; 1133 info->mPacketSource = source; 1134 info->mUsingInterleavedTCP = false; 1135 info->mFirstSeqNumInSegment = 0; 1136 info->mNewSegment = true; 1137 1138 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1139 1140 AString request = "SETUP "; 1141 request.append(trackURL); 1142 request.append(" RTSP/1.0\r\n"); 1143 1144 if (mTryTCPInterleaving) { 1145 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1146 info->mUsingInterleavedTCP = true; 1147 info->mRTPSocket = interleaveIndex; 1148 info->mRTCPSocket = interleaveIndex + 1; 1149 1150 request.append("Transport: RTP/AVP/TCP;interleaved="); 1151 request.append(interleaveIndex); 1152 request.append("-"); 1153 request.append(interleaveIndex + 1); 1154 } else { 1155 unsigned rtpPort; 1156 ARTPConnection::MakePortPair( 1157 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1158 1159 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1160 request.append(rtpPort); 1161 request.append("-"); 1162 request.append(rtpPort + 1); 1163 } 1164 1165 request.append("\r\n"); 1166 1167 if (index > 1) { 1168 request.append("Session: "); 1169 request.append(mSessionID); 1170 request.append("\r\n"); 1171 } 1172 1173 request.append("\r\n"); 1174 1175 sp<AMessage> reply = new AMessage('setu', id()); 1176 reply->setSize("index", index); 1177 reply->setSize("track-index", mTracks.size() - 1); 1178 mConn->sendRequest(request.c_str(), reply); 1179 } 1180 1181 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1182 out->clear(); 1183 1184 if (strncasecmp("rtsp://", baseURL, 7)) { 1185 // Base URL must be absolute 1186 return false; 1187 } 1188 1189 if (!strncasecmp("rtsp://", url, 7)) { 1190 // "url" is already an absolute URL, ignore base URL. 1191 out->setTo(url); 1192 return true; 1193 } 1194 1195 size_t n = strlen(baseURL); 1196 if (baseURL[n - 1] == '/') { 1197 out->setTo(baseURL); 1198 out->append(url); 1199 } else { 1200 char *slashPos = strrchr(baseURL, '/'); 1201 1202 if (slashPos > &baseURL[6]) { 1203 out->setTo(baseURL, slashPos - baseURL); 1204 } else { 1205 out->setTo(baseURL); 1206 } 1207 1208 out->append("/"); 1209 out->append(url); 1210 } 1211 1212 return true; 1213 } 1214 1215 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1216}; 1217 1218} // namespace android 1219 1220#endif // MY_HANDLER_H_ 1221