MyHandler.h revision a44501ea0896c2508bd6b807185d9049be6752f3
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41 42// If no access units are received within 3 secs, assume that the rtp 43// stream has ended and signal end of stream. 44static int64_t kAccessUnitTimeoutUs = 3000000ll; 45 46// If no access units arrive for the first 10 secs after starting the 47// stream, assume none ever will and signal EOS or switch transports. 48static int64_t kStartupTimeoutUs = 10000000ll; 49 50namespace android { 51 52static void MakeUserAgentString(AString *s) { 53 s->setTo("stagefright/1.1 (Linux;Android "); 54 55#if (PROPERTY_VALUE_MAX < 8) 56#error "PROPERTY_VALUE_MAX must be at least 8" 57#endif 58 59 char value[PROPERTY_VALUE_MAX]; 60 property_get("ro.build.version.release", value, "Unknown"); 61 s->append(value); 62 s->append(")"); 63} 64 65static bool GetAttribute(const char *s, const char *key, AString *value) { 66 value->clear(); 67 68 size_t keyLen = strlen(key); 69 70 for (;;) { 71 while (isspace(*s)) { 72 ++s; 73 } 74 75 const char *colonPos = strchr(s, ';'); 76 77 size_t len = 78 (colonPos == NULL) ? strlen(s) : colonPos - s; 79 80 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 81 value->setTo(&s[keyLen + 1], len - keyLen - 1); 82 return true; 83 } 84 85 if (colonPos == NULL) { 86 return false; 87 } 88 89 s = colonPos + 1; 90 } 91} 92 93struct MyHandler : public AHandler { 94 MyHandler(const char *url, const sp<ALooper> &looper) 95 : mLooper(looper), 96 mNetLooper(new ALooper), 97 mConn(new ARTSPConnection), 98 mRTPConn(new ARTPConnection), 99 mSessionURL(url), 100 mSetupTracksSuccessful(false), 101 mSeekPending(false), 102 mFirstAccessUnit(true), 103 mFirstAccessUnitNTP(0), 104 mNumAccessUnitsReceived(0), 105 mCheckPending(false), 106 mCheckGeneration(0), 107 mTryTCPInterleaving(false), 108 mTryFakeRTCP(false), 109 mReceivedFirstRTCPPacket(false), 110 mReceivedFirstRTPPacket(false), 111 mSeekable(false) { 112 mNetLooper->setName("rtsp net"); 113 mNetLooper->start(false /* runOnCallingThread */, 114 false /* canCallJava */, 115 PRIORITY_HIGHEST); 116 } 117 118 void connect(const sp<AMessage> &doneMsg) { 119 mDoneMsg = doneMsg; 120 121 mLooper->registerHandler(this); 122 mLooper->registerHandler(mConn); 123 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 124 125 sp<AMessage> notify = new AMessage('biny', id()); 126 mConn->observeBinaryData(notify); 127 128 sp<AMessage> reply = new AMessage('conn', id()); 129 mConn->connect(mSessionURL.c_str(), reply); 130 } 131 132 void disconnect(const sp<AMessage> &doneMsg) { 133 mDoneMsg = doneMsg; 134 135 (new AMessage('abor', id()))->post(); 136 } 137 138 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 139 sp<AMessage> msg = new AMessage('seek', id()); 140 msg->setInt64("time", timeUs); 141 msg->setMessage("doneMsg", doneMsg); 142 msg->post(); 143 } 144 145 int64_t getNormalPlayTimeUs() { 146 int64_t maxTimeUs = 0; 147 for (size_t i = 0; i < mTracks.size(); ++i) { 148 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 149 ->getNormalPlayTimeUs(); 150 151 if (i == 0 || timeUs > maxTimeUs) { 152 maxTimeUs = timeUs; 153 } 154 } 155 156 return maxTimeUs; 157 } 158 159 static void addRR(const sp<ABuffer> &buf) { 160 uint8_t *ptr = buf->data() + buf->size(); 161 ptr[0] = 0x80 | 0; 162 ptr[1] = 201; // RR 163 ptr[2] = 0; 164 ptr[3] = 1; 165 ptr[4] = 0xde; // SSRC 166 ptr[5] = 0xad; 167 ptr[6] = 0xbe; 168 ptr[7] = 0xef; 169 170 buf->setRange(0, buf->size() + 8); 171 } 172 173 static void addSDES(int s, const sp<ABuffer> &buffer) { 174 struct sockaddr_in addr; 175 socklen_t addrSize = sizeof(addr); 176 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 177 178 uint8_t *data = buffer->data() + buffer->size(); 179 data[0] = 0x80 | 1; 180 data[1] = 202; // SDES 181 data[4] = 0xde; // SSRC 182 data[5] = 0xad; 183 data[6] = 0xbe; 184 data[7] = 0xef; 185 186 size_t offset = 8; 187 188 data[offset++] = 1; // CNAME 189 190 AString cname = "stagefright@"; 191 cname.append(inet_ntoa(addr.sin_addr)); 192 data[offset++] = cname.size(); 193 194 memcpy(&data[offset], cname.c_str(), cname.size()); 195 offset += cname.size(); 196 197 data[offset++] = 6; // TOOL 198 199 AString tool; 200 MakeUserAgentString(&tool); 201 202 data[offset++] = tool.size(); 203 204 memcpy(&data[offset], tool.c_str(), tool.size()); 205 offset += tool.size(); 206 207 data[offset++] = 0; 208 209 if ((offset % 4) > 0) { 210 size_t count = 4 - (offset % 4); 211 switch (count) { 212 case 3: 213 data[offset++] = 0; 214 case 2: 215 data[offset++] = 0; 216 case 1: 217 data[offset++] = 0; 218 } 219 } 220 221 size_t numWords = (offset / 4) - 1; 222 data[2] = numWords >> 8; 223 data[3] = numWords & 0xff; 224 225 buffer->setRange(buffer->offset(), buffer->size() + offset); 226 } 227 228 // In case we're behind NAT, fire off two UDP packets to the remote 229 // rtp/rtcp ports to poke a hole into the firewall for future incoming 230 // packets. We're going to send an RR/SDES RTCP packet to both of them. 231 void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 232 AString source; 233 AString server_port; 234 if (!GetAttribute(transport.c_str(), 235 "source", 236 &source) 237 || !GetAttribute(transport.c_str(), 238 "server_port", 239 &server_port)) { 240 return; 241 } 242 243 int rtpPort, rtcpPort; 244 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 245 || rtpPort <= 0 || rtpPort > 65535 246 || rtcpPort <=0 || rtcpPort > 65535 247 || rtcpPort != rtpPort + 1 248 || (rtpPort & 1) != 0) { 249 return; 250 } 251 252 struct sockaddr_in addr; 253 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 254 addr.sin_family = AF_INET; 255 addr.sin_addr.s_addr = inet_addr(source.c_str()); 256 257 if (addr.sin_addr.s_addr == INADDR_NONE) { 258 return; 259 } 260 261 // Make up an RR/SDES RTCP packet. 262 sp<ABuffer> buf = new ABuffer(65536); 263 buf->setRange(0, 0); 264 addRR(buf); 265 addSDES(rtpSocket, buf); 266 267 addr.sin_port = htons(rtpPort); 268 269 ssize_t n = sendto( 270 rtpSocket, buf->data(), buf->size(), 0, 271 (const sockaddr *)&addr, sizeof(addr)); 272 CHECK_EQ(n, (ssize_t)buf->size()); 273 274 addr.sin_port = htons(rtcpPort); 275 276 n = sendto( 277 rtcpSocket, buf->data(), buf->size(), 0, 278 (const sockaddr *)&addr, sizeof(addr)); 279 CHECK_EQ(n, (ssize_t)buf->size()); 280 281 LOGV("successfully poked holes."); 282 } 283 284 virtual void onMessageReceived(const sp<AMessage> &msg) { 285 switch (msg->what()) { 286 case 'conn': 287 { 288 int32_t result; 289 CHECK(msg->findInt32("result", &result)); 290 291 LOGI("connection request completed with result %d (%s)", 292 result, strerror(-result)); 293 294 if (result == OK) { 295 AString request; 296 request = "DESCRIBE "; 297 request.append(mSessionURL); 298 request.append(" RTSP/1.0\r\n"); 299 request.append("Accept: application/sdp\r\n"); 300 request.append("\r\n"); 301 302 sp<AMessage> reply = new AMessage('desc', id()); 303 mConn->sendRequest(request.c_str(), reply); 304 } else { 305 (new AMessage('disc', id()))->post(); 306 } 307 break; 308 } 309 310 case 'disc': 311 { 312 int32_t reconnect; 313 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 314 sp<AMessage> reply = new AMessage('conn', id()); 315 mConn->connect(mSessionURL.c_str(), reply); 316 } else { 317 (new AMessage('quit', id()))->post(); 318 } 319 break; 320 } 321 322 case 'desc': 323 { 324 int32_t result; 325 CHECK(msg->findInt32("result", &result)); 326 327 LOGI("DESCRIBE completed with result %d (%s)", 328 result, strerror(-result)); 329 330 if (result == OK) { 331 sp<RefBase> obj; 332 CHECK(msg->findObject("response", &obj)); 333 sp<ARTSPResponse> response = 334 static_cast<ARTSPResponse *>(obj.get()); 335 336 if (response->mStatusCode == 302) { 337 ssize_t i = response->mHeaders.indexOfKey("location"); 338 CHECK_GE(i, 0); 339 340 mSessionURL = response->mHeaders.valueAt(i); 341 342 AString request; 343 request = "DESCRIBE "; 344 request.append(mSessionURL); 345 request.append(" RTSP/1.0\r\n"); 346 request.append("Accept: application/sdp\r\n"); 347 request.append("\r\n"); 348 349 sp<AMessage> reply = new AMessage('desc', id()); 350 mConn->sendRequest(request.c_str(), reply); 351 break; 352 } 353 354 if (response->mStatusCode != 200) { 355 result = UNKNOWN_ERROR; 356 } else { 357 mSessionDesc = new ASessionDescription; 358 359 mSessionDesc->setTo( 360 response->mContent->data(), 361 response->mContent->size()); 362 363 if (!mSessionDesc->isValid()) { 364 result = ERROR_MALFORMED; 365 } else { 366 ssize_t i = response->mHeaders.indexOfKey("content-base"); 367 if (i >= 0) { 368 mBaseURL = response->mHeaders.valueAt(i); 369 } else { 370 i = response->mHeaders.indexOfKey("content-location"); 371 if (i >= 0) { 372 mBaseURL = response->mHeaders.valueAt(i); 373 } else { 374 mBaseURL = mSessionURL; 375 } 376 } 377 378 CHECK_GT(mSessionDesc->countTracks(), 1u); 379 setupTrack(1); 380 } 381 } 382 } 383 384 if (result != OK) { 385 sp<AMessage> reply = new AMessage('disc', id()); 386 mConn->disconnect(reply); 387 } 388 break; 389 } 390 391 case 'setu': 392 { 393 size_t index; 394 CHECK(msg->findSize("index", &index)); 395 396 TrackInfo *track = NULL; 397 size_t trackIndex; 398 if (msg->findSize("track-index", &trackIndex)) { 399 track = &mTracks.editItemAt(trackIndex); 400 } 401 402 int32_t result; 403 CHECK(msg->findInt32("result", &result)); 404 405 LOGI("SETUP(%d) completed with result %d (%s)", 406 index, result, strerror(-result)); 407 408 if (result == OK) { 409 CHECK(track != NULL); 410 411 sp<RefBase> obj; 412 CHECK(msg->findObject("response", &obj)); 413 sp<ARTSPResponse> response = 414 static_cast<ARTSPResponse *>(obj.get()); 415 416 if (response->mStatusCode != 200) { 417 result = UNKNOWN_ERROR; 418 } else { 419 ssize_t i = response->mHeaders.indexOfKey("session"); 420 CHECK_GE(i, 0); 421 422 mSessionID = response->mHeaders.valueAt(i); 423 i = mSessionID.find(";"); 424 if (i >= 0) { 425 // Remove options, i.e. ";timeout=90" 426 mSessionID.erase(i, mSessionID.size() - i); 427 } 428 429 sp<AMessage> notify = new AMessage('accu', id()); 430 notify->setSize("track-index", trackIndex); 431 432 i = response->mHeaders.indexOfKey("transport"); 433 CHECK_GE(i, 0); 434 435 if (!track->mUsingInterleavedTCP) { 436 AString transport = response->mHeaders.valueAt(i); 437 438 pokeAHole(track->mRTPSocket, 439 track->mRTCPSocket, 440 transport); 441 } 442 443 mRTPConn->addStream( 444 track->mRTPSocket, track->mRTCPSocket, 445 mSessionDesc, index, 446 notify, track->mUsingInterleavedTCP); 447 448 mSetupTracksSuccessful = true; 449 } 450 } 451 452 if (result != OK) { 453 if (track) { 454 if (!track->mUsingInterleavedTCP) { 455 close(track->mRTPSocket); 456 close(track->mRTCPSocket); 457 } 458 459 mTracks.removeItemsAt(trackIndex); 460 } 461 } 462 463 ++index; 464 if (index < mSessionDesc->countTracks()) { 465 setupTrack(index); 466 } else if (mSetupTracksSuccessful) { 467 AString request = "PLAY "; 468 request.append(mSessionURL); 469 request.append(" RTSP/1.0\r\n"); 470 471 request.append("Session: "); 472 request.append(mSessionID); 473 request.append("\r\n"); 474 475 request.append("\r\n"); 476 477 sp<AMessage> reply = new AMessage('play', id()); 478 mConn->sendRequest(request.c_str(), reply); 479 } else { 480 sp<AMessage> reply = new AMessage('disc', id()); 481 mConn->disconnect(reply); 482 } 483 break; 484 } 485 486 case 'play': 487 { 488 int32_t result; 489 CHECK(msg->findInt32("result", &result)); 490 491 LOGI("PLAY completed with result %d (%s)", 492 result, strerror(-result)); 493 494 if (result == OK) { 495 sp<RefBase> obj; 496 CHECK(msg->findObject("response", &obj)); 497 sp<ARTSPResponse> response = 498 static_cast<ARTSPResponse *>(obj.get()); 499 500 if (response->mStatusCode != 200) { 501 result = UNKNOWN_ERROR; 502 } else { 503 parsePlayResponse(response); 504 505 sp<AMessage> timeout = new AMessage('tiou', id()); 506 timeout->post(kStartupTimeoutUs); 507 } 508 } 509 510 if (result != OK) { 511 sp<AMessage> reply = new AMessage('disc', id()); 512 mConn->disconnect(reply); 513 } 514 515 break; 516 } 517 518 case 'abor': 519 { 520 for (size_t i = 0; i < mTracks.size(); ++i) { 521 TrackInfo *info = &mTracks.editItemAt(i); 522 523 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 524 525 if (!info->mUsingInterleavedTCP) { 526 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 527 528 close(info->mRTPSocket); 529 close(info->mRTCPSocket); 530 } 531 } 532 mTracks.clear(); 533 mSetupTracksSuccessful = false; 534 mSeekPending = false; 535 mFirstAccessUnit = true; 536 mFirstAccessUnitNTP = 0; 537 mNumAccessUnitsReceived = 0; 538 mReceivedFirstRTCPPacket = false; 539 mReceivedFirstRTPPacket = false; 540 mSeekable = false; 541 542 sp<AMessage> reply = new AMessage('tear', id()); 543 544 int32_t reconnect; 545 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 546 reply->setInt32("reconnect", true); 547 } 548 549 AString request; 550 request = "TEARDOWN "; 551 552 // XXX should use aggregate url from SDP here... 553 request.append(mSessionURL); 554 request.append(" RTSP/1.0\r\n"); 555 556 request.append("Session: "); 557 request.append(mSessionID); 558 request.append("\r\n"); 559 560 request.append("\r\n"); 561 562 mConn->sendRequest(request.c_str(), reply); 563 break; 564 } 565 566 case 'tear': 567 { 568 int32_t result; 569 CHECK(msg->findInt32("result", &result)); 570 571 LOGI("TEARDOWN completed with result %d (%s)", 572 result, strerror(-result)); 573 574 sp<AMessage> reply = new AMessage('disc', id()); 575 576 int32_t reconnect; 577 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 578 reply->setInt32("reconnect", true); 579 } 580 581 mConn->disconnect(reply); 582 break; 583 } 584 585 case 'quit': 586 { 587 if (mDoneMsg != NULL) { 588 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 589 mDoneMsg->post(); 590 mDoneMsg = NULL; 591 } 592 break; 593 } 594 595 case 'chek': 596 { 597 int32_t generation; 598 CHECK(msg->findInt32("generation", &generation)); 599 if (generation != mCheckGeneration) { 600 // This is an outdated message. Ignore. 601 break; 602 } 603 604 if (mNumAccessUnitsReceived == 0) { 605 LOGI("stream ended? aborting."); 606 (new AMessage('abor', id()))->post(); 607 break; 608 } 609 610 mNumAccessUnitsReceived = 0; 611 msg->post(kAccessUnitTimeoutUs); 612 break; 613 } 614 615 case 'accu': 616 { 617 int32_t first; 618 if (msg->findInt32("first-rtcp", &first)) { 619 mReceivedFirstRTCPPacket = true; 620 break; 621 } 622 623 if (msg->findInt32("first-rtp", &first)) { 624 mReceivedFirstRTPPacket = true; 625 break; 626 } 627 628 ++mNumAccessUnitsReceived; 629 postAccessUnitTimeoutCheck(); 630 631 size_t trackIndex; 632 CHECK(msg->findSize("track-index", &trackIndex)); 633 634 if (trackIndex >= mTracks.size()) { 635 LOGV("late packets ignored."); 636 break; 637 } 638 639 TrackInfo *track = &mTracks.editItemAt(trackIndex); 640 641 int32_t eos; 642 if (msg->findInt32("eos", &eos)) { 643 LOGI("received BYE on track index %d", trackIndex); 644#if 0 645 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 646#endif 647 return; 648 } 649 650 sp<RefBase> obj; 651 CHECK(msg->findObject("access-unit", &obj)); 652 653 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 654 655 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 656 657 if (mSeekPending) { 658 LOGV("we're seeking, dropping stale packet."); 659 break; 660 } 661 662 if (seqNum < track->mFirstSeqNumInSegment) { 663 LOGV("dropping stale access-unit (%d < %d)", 664 seqNum, track->mFirstSeqNumInSegment); 665 break; 666 } 667 668 uint64_t ntpTime; 669 CHECK(accessUnit->meta()->findInt64( 670 "ntp-time", (int64_t *)&ntpTime)); 671 672 uint32_t rtpTime; 673 CHECK(accessUnit->meta()->findInt32( 674 "rtp-time", (int32_t *)&rtpTime)); 675 676 if (track->mNewSegment) { 677 track->mNewSegment = false; 678 679 LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d", 680 ntpTime, rtpTime, seqNum); 681 } 682 683 if (mFirstAccessUnit) { 684 mDoneMsg->setInt32("result", OK); 685 mDoneMsg->post(); 686 mDoneMsg = NULL; 687 688 mFirstAccessUnit = false; 689 mFirstAccessUnitNTP = ntpTime; 690 } 691 692 if (ntpTime >= mFirstAccessUnitNTP) { 693 ntpTime -= mFirstAccessUnitNTP; 694 } else { 695 ntpTime = 0; 696 } 697 698 int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 699 700 accessUnit->meta()->setInt64("timeUs", timeUs); 701 702#if 0 703 int32_t damaged; 704 if (accessUnit->meta()->findInt32("damaged", &damaged) 705 && damaged != 0) { 706 LOGI("ignoring damaged AU"); 707 } else 708#endif 709 { 710 TrackInfo *track = &mTracks.editItemAt(trackIndex); 711 track->mPacketSource->queueAccessUnit(accessUnit); 712 } 713 break; 714 } 715 716 case 'seek': 717 { 718 sp<AMessage> doneMsg; 719 CHECK(msg->findMessage("doneMsg", &doneMsg)); 720 721 if (mSeekPending) { 722 doneMsg->post(); 723 break; 724 } 725 726 if (!mSeekable) { 727 LOGW("This is a live stream, ignoring seek request."); 728 doneMsg->post(); 729 break; 730 } 731 732 int64_t timeUs; 733 CHECK(msg->findInt64("time", &timeUs)); 734 735 mSeekPending = true; 736 737 // Disable the access unit timeout until we resumed 738 // playback again. 739 mCheckPending = true; 740 ++mCheckGeneration; 741 742 AString request = "PAUSE "; 743 request.append(mSessionURL); 744 request.append(" RTSP/1.0\r\n"); 745 746 request.append("Session: "); 747 request.append(mSessionID); 748 request.append("\r\n"); 749 750 request.append("\r\n"); 751 752 sp<AMessage> reply = new AMessage('see1', id()); 753 reply->setInt64("time", timeUs); 754 reply->setMessage("doneMsg", doneMsg); 755 mConn->sendRequest(request.c_str(), reply); 756 break; 757 } 758 759 case 'see1': 760 { 761 // Session is paused now. 762 for (size_t i = 0; i < mTracks.size(); ++i) { 763 mTracks.editItemAt(i).mPacketSource->flushQueue(); 764 } 765 766 int64_t timeUs; 767 CHECK(msg->findInt64("time", &timeUs)); 768 769 AString request = "PLAY "; 770 request.append(mSessionURL); 771 request.append(" RTSP/1.0\r\n"); 772 773 request.append("Session: "); 774 request.append(mSessionID); 775 request.append("\r\n"); 776 777 request.append( 778 StringPrintf( 779 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 780 781 request.append("\r\n"); 782 783 sp<AMessage> doneMsg; 784 CHECK(msg->findMessage("doneMsg", &doneMsg)); 785 786 sp<AMessage> reply = new AMessage('see2', id()); 787 reply->setMessage("doneMsg", doneMsg); 788 mConn->sendRequest(request.c_str(), reply); 789 break; 790 } 791 792 case 'see2': 793 { 794 CHECK(mSeekPending); 795 796 int32_t result; 797 CHECK(msg->findInt32("result", &result)); 798 799 LOGI("PLAY completed with result %d (%s)", 800 result, strerror(-result)); 801 802 mCheckPending = false; 803 postAccessUnitTimeoutCheck(); 804 805 if (result == OK) { 806 sp<RefBase> obj; 807 CHECK(msg->findObject("response", &obj)); 808 sp<ARTSPResponse> response = 809 static_cast<ARTSPResponse *>(obj.get()); 810 811 if (response->mStatusCode != 200) { 812 result = UNKNOWN_ERROR; 813 } else { 814 parsePlayResponse(response); 815 816 LOGI("seek completed."); 817 } 818 } 819 820 if (result != OK) { 821 LOGE("seek failed, aborting."); 822 (new AMessage('abor', id()))->post(); 823 } 824 825 mSeekPending = false; 826 827 sp<AMessage> doneMsg; 828 CHECK(msg->findMessage("doneMsg", &doneMsg)); 829 830 doneMsg->post(); 831 break; 832 } 833 834 case 'biny': 835 { 836 sp<RefBase> obj; 837 CHECK(msg->findObject("buffer", &obj)); 838 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 839 840 int32_t index; 841 CHECK(buffer->meta()->findInt32("index", &index)); 842 843 mRTPConn->injectPacket(index, buffer); 844 break; 845 } 846 847 case 'tiou': 848 { 849 if (!mReceivedFirstRTCPPacket) { 850 if (mTryFakeRTCP) { 851 LOGW("Never received any data, disconnecting."); 852 (new AMessage('abor', id()))->post(); 853 } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) { 854 LOGW("We received RTP packets but no RTCP packets, " 855 "using fake timestamps."); 856 857 mTryFakeRTCP = true; 858 859 mReceivedFirstRTCPPacket = true; 860 mRTPConn->fakeTimestamps(); 861 } else { 862 LOGW("Never received any data, switching transports."); 863 864 mTryTCPInterleaving = true; 865 866 sp<AMessage> msg = new AMessage('abor', id()); 867 msg->setInt32("reconnect", true); 868 msg->post(); 869 } 870 } 871 break; 872 } 873 874 default: 875 TRESPASS(); 876 break; 877 } 878 } 879 880 void postAccessUnitTimeoutCheck() { 881 if (mCheckPending) { 882 return; 883 } 884 885 mCheckPending = true; 886 sp<AMessage> check = new AMessage('chek', id()); 887 check->setInt32("generation", mCheckGeneration); 888 check->post(kAccessUnitTimeoutUs); 889 } 890 891 static void SplitString( 892 const AString &s, const char *separator, List<AString> *items) { 893 items->clear(); 894 size_t start = 0; 895 while (start < s.size()) { 896 ssize_t offset = s.find(separator, start); 897 898 if (offset < 0) { 899 items->push_back(AString(s, start, s.size() - start)); 900 break; 901 } 902 903 items->push_back(AString(s, start, offset - start)); 904 start = offset + strlen(separator); 905 } 906 } 907 908 void parsePlayResponse(const sp<ARTSPResponse> &response) { 909 mSeekable = false; 910 911 ssize_t i = response->mHeaders.indexOfKey("range"); 912 if (i < 0) { 913 // Server doesn't even tell use what range it is going to 914 // play, therefore we won't support seeking. 915 return; 916 } 917 918 AString range = response->mHeaders.valueAt(i); 919 LOGV("Range: %s", range.c_str()); 920 921 AString val; 922 CHECK(GetAttribute(range.c_str(), "npt", &val)); 923 float npt1, npt2; 924 925 if (val == "now-") { 926 // This is a live stream and therefore not seekable. 927 return; 928 } else { 929 CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2); 930 } 931 932 i = response->mHeaders.indexOfKey("rtp-info"); 933 CHECK_GE(i, 0); 934 935 AString rtpInfo = response->mHeaders.valueAt(i); 936 List<AString> streamInfos; 937 SplitString(rtpInfo, ",", &streamInfos); 938 939 int n = 1; 940 for (List<AString>::iterator it = streamInfos.begin(); 941 it != streamInfos.end(); ++it) { 942 (*it).trim(); 943 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 944 945 CHECK(GetAttribute((*it).c_str(), "url", &val)); 946 947 size_t trackIndex = 0; 948 while (trackIndex < mTracks.size() 949 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 950 ++trackIndex; 951 } 952 CHECK_LT(trackIndex, mTracks.size()); 953 954 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 955 956 char *end; 957 unsigned long seq = strtoul(val.c_str(), &end, 10); 958 959 TrackInfo *info = &mTracks.editItemAt(trackIndex); 960 info->mFirstSeqNumInSegment = seq; 961 info->mNewSegment = true; 962 963 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 964 965 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 966 967 LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1); 968 969 info->mPacketSource->setNormalPlayTimeMapping( 970 rtpTime, (int64_t)(npt1 * 1E6)); 971 972 ++n; 973 } 974 975 mSeekable = true; 976 } 977 978 sp<APacketSource> getPacketSource(size_t index) { 979 CHECK_GE(index, 0u); 980 CHECK_LT(index, mTracks.size()); 981 982 return mTracks.editItemAt(index).mPacketSource; 983 } 984 985 size_t countTracks() const { 986 return mTracks.size(); 987 } 988 989private: 990 sp<ALooper> mLooper; 991 sp<ALooper> mNetLooper; 992 sp<ARTSPConnection> mConn; 993 sp<ARTPConnection> mRTPConn; 994 sp<ASessionDescription> mSessionDesc; 995 AString mSessionURL; 996 AString mBaseURL; 997 AString mSessionID; 998 bool mSetupTracksSuccessful; 999 bool mSeekPending; 1000 bool mFirstAccessUnit; 1001 uint64_t mFirstAccessUnitNTP; 1002 int64_t mNumAccessUnitsReceived; 1003 bool mCheckPending; 1004 int32_t mCheckGeneration; 1005 bool mTryTCPInterleaving; 1006 bool mTryFakeRTCP; 1007 bool mReceivedFirstRTCPPacket; 1008 bool mReceivedFirstRTPPacket; 1009 bool mSeekable; 1010 1011 struct TrackInfo { 1012 AString mURL; 1013 int mRTPSocket; 1014 int mRTCPSocket; 1015 bool mUsingInterleavedTCP; 1016 uint32_t mFirstSeqNumInSegment; 1017 bool mNewSegment; 1018 1019 sp<APacketSource> mPacketSource; 1020 }; 1021 Vector<TrackInfo> mTracks; 1022 1023 sp<AMessage> mDoneMsg; 1024 1025 void setupTrack(size_t index) { 1026 sp<APacketSource> source = 1027 new APacketSource(mSessionDesc, index); 1028 1029 if (source->initCheck() != OK) { 1030 LOGW("Unsupported format. Ignoring track #%d.", index); 1031 1032 sp<AMessage> reply = new AMessage('setu', id()); 1033 reply->setSize("index", index); 1034 reply->setInt32("result", ERROR_UNSUPPORTED); 1035 reply->post(); 1036 return; 1037 } 1038 1039 AString url; 1040 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1041 1042 AString trackURL; 1043 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1044 1045 mTracks.push(TrackInfo()); 1046 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1047 info->mURL = trackURL; 1048 info->mPacketSource = source; 1049 info->mUsingInterleavedTCP = false; 1050 info->mFirstSeqNumInSegment = 0; 1051 info->mNewSegment = true; 1052 1053 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1054 1055 AString request = "SETUP "; 1056 request.append(trackURL); 1057 request.append(" RTSP/1.0\r\n"); 1058 1059 if (mTryTCPInterleaving) { 1060 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1061 info->mUsingInterleavedTCP = true; 1062 info->mRTPSocket = interleaveIndex; 1063 info->mRTCPSocket = interleaveIndex + 1; 1064 1065 request.append("Transport: RTP/AVP/TCP;interleaved="); 1066 request.append(interleaveIndex); 1067 request.append("-"); 1068 request.append(interleaveIndex + 1); 1069 } else { 1070 unsigned rtpPort; 1071 ARTPConnection::MakePortPair( 1072 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1073 1074 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1075 request.append(rtpPort); 1076 request.append("-"); 1077 request.append(rtpPort + 1); 1078 } 1079 1080 request.append("\r\n"); 1081 1082 if (index > 1) { 1083 request.append("Session: "); 1084 request.append(mSessionID); 1085 request.append("\r\n"); 1086 } 1087 1088 request.append("\r\n"); 1089 1090 sp<AMessage> reply = new AMessage('setu', id()); 1091 reply->setSize("index", index); 1092 reply->setSize("track-index", mTracks.size() - 1); 1093 mConn->sendRequest(request.c_str(), reply); 1094 } 1095 1096 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1097 out->clear(); 1098 1099 if (strncasecmp("rtsp://", baseURL, 7)) { 1100 // Base URL must be absolute 1101 return false; 1102 } 1103 1104 if (!strncasecmp("rtsp://", url, 7)) { 1105 // "url" is already an absolute URL, ignore base URL. 1106 out->setTo(url); 1107 return true; 1108 } 1109 1110 size_t n = strlen(baseURL); 1111 if (baseURL[n - 1] == '/') { 1112 out->setTo(baseURL); 1113 out->append(url); 1114 } else { 1115 const char *slashPos = strrchr(baseURL, '/'); 1116 1117 if (slashPos > &baseURL[6]) { 1118 out->setTo(baseURL, slashPos - baseURL); 1119 } else { 1120 out->setTo(baseURL); 1121 } 1122 1123 out->append("/"); 1124 out->append(url); 1125 } 1126 1127 return true; 1128 } 1129 1130 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1131}; 1132 1133} // namespace android 1134 1135#endif // MY_HANDLER_H_ 1136