MyHandler.h revision ec29a2bfb5364a5968b77559fd13821b827d173a
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21//#define LOG_NDEBUG 0 22#define LOG_TAG "MyHandler" 23#include <utils/Log.h> 24 25#include "APacketSource.h" 26#include "ARTPConnection.h" 27#include "ARTSPConnection.h" 28#include "ASessionDescription.h" 29 30#include <ctype.h> 31#include <cutils/properties.h> 32 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/ALooper.h> 36#include <media/stagefright/foundation/AMessage.h> 37#include <media/stagefright/MediaErrors.h> 38 39#include <arpa/inet.h> 40#include <sys/socket.h> 41#include <netdb.h> 42 43#include "HTTPBase.h" 44 45// If no access units are received within 5 secs, assume that the rtp 46// stream has ended and signal end of stream. 47static int64_t kAccessUnitTimeoutUs = 10000000ll; 48 49// If no access units arrive for the first 10 secs after starting the 50// stream, assume none ever will and signal EOS or switch transports. 51static int64_t kStartupTimeoutUs = 10000000ll; 52 53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 54 55namespace android { 56 57static void MakeUserAgentString(AString *s) { 58 s->setTo("stagefright/1.1 (Linux;Android "); 59 60#if (PROPERTY_VALUE_MAX < 8) 61#error "PROPERTY_VALUE_MAX must be at least 8" 62#endif 63 64 char value[PROPERTY_VALUE_MAX]; 65 property_get("ro.build.version.release", value, "Unknown"); 66 s->append(value); 67 s->append(")"); 68} 69 70static bool GetAttribute(const char *s, const char *key, AString *value) { 71 value->clear(); 72 73 size_t keyLen = strlen(key); 74 75 for (;;) { 76 while (isspace(*s)) { 77 ++s; 78 } 79 80 const char *colonPos = strchr(s, ';'); 81 82 size_t len = 83 (colonPos == NULL) ? strlen(s) : colonPos - s; 84 85 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 86 value->setTo(&s[keyLen + 1], len - keyLen - 1); 87 return true; 88 } 89 90 if (colonPos == NULL) { 91 return false; 92 } 93 94 s = colonPos + 1; 95 } 96} 97 98struct MyHandler : public AHandler { 99 enum { 100 kWhatConnected = 'conn', 101 kWhatDisconnected = 'disc', 102 kWhatSeekDone = 'sdon', 103 104 kWhatAccessUnit = 'accU', 105 kWhatEOS = 'eos!', 106 kWhatSeekDiscontinuity = 'seeD', 107 kWhatNormalPlayTimeMapping = 'nptM', 108 }; 109 110 MyHandler( 111 const char *url, 112 const sp<AMessage> ¬ify, 113 bool uidValid = false, uid_t uid = 0) 114 : mNotify(notify), 115 mUIDValid(uidValid), 116 mUID(uid), 117 mNetLooper(new ALooper), 118 mConn(new ARTSPConnection(mUIDValid, mUID)), 119 mRTPConn(new ARTPConnection), 120 mOriginalSessionURL(url), 121 mSessionURL(url), 122 mSetupTracksSuccessful(false), 123 mSeekPending(false), 124 mFirstAccessUnit(true), 125 mAllTracksHaveTime(false), 126 mNTPAnchorUs(-1), 127 mMediaAnchorUs(-1), 128 mLastMediaTimeUs(0), 129 mNumAccessUnitsReceived(0), 130 mCheckPending(false), 131 mCheckGeneration(0), 132 mTryTCPInterleaving(false), 133 mTryFakeRTCP(false), 134 mReceivedFirstRTCPPacket(false), 135 mReceivedFirstRTPPacket(false), 136 mSeekable(true), 137 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 138 mKeepAliveGeneration(0) { 139 mNetLooper->setName("rtsp net"); 140 mNetLooper->start(false /* runOnCallingThread */, 141 false /* canCallJava */, 142 PRIORITY_HIGHEST); 143 144 // Strip any authentication info from the session url, we don't 145 // want to transmit user/pass in cleartext. 146 AString host, path, user, pass; 147 unsigned port; 148 CHECK(ARTSPConnection::ParseURL( 149 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 150 151 if (user.size() > 0) { 152 mSessionURL.clear(); 153 mSessionURL.append("rtsp://"); 154 mSessionURL.append(host); 155 mSessionURL.append(":"); 156 mSessionURL.append(StringPrintf("%u", port)); 157 mSessionURL.append(path); 158 159 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 160 } 161 162 mSessionHost = host; 163 } 164 165 void connect() { 166 looper()->registerHandler(mConn); 167 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 168 169 sp<AMessage> notify = new AMessage('biny', id()); 170 mConn->observeBinaryData(notify); 171 172 sp<AMessage> reply = new AMessage('conn', id()); 173 mConn->connect(mOriginalSessionURL.c_str(), reply); 174 } 175 176 void loadSDP(const sp<ASessionDescription>& desc) { 177 looper()->registerHandler(mConn); 178 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 179 180 sp<AMessage> notify = new AMessage('biny', id()); 181 mConn->observeBinaryData(notify); 182 183 sp<AMessage> reply = new AMessage('sdpl', id()); 184 reply->setObject("description", desc); 185 mConn->connect(mOriginalSessionURL.c_str(), reply); 186 } 187 188 void disconnect() { 189 (new AMessage('abor', id()))->post(); 190 } 191 192 void seek(int64_t timeUs) { 193 sp<AMessage> msg = new AMessage('seek', id()); 194 msg->setInt64("time", timeUs); 195 msg->post(); 196 } 197 198 static void addRR(const sp<ABuffer> &buf) { 199 uint8_t *ptr = buf->data() + buf->size(); 200 ptr[0] = 0x80 | 0; 201 ptr[1] = 201; // RR 202 ptr[2] = 0; 203 ptr[3] = 1; 204 ptr[4] = 0xde; // SSRC 205 ptr[5] = 0xad; 206 ptr[6] = 0xbe; 207 ptr[7] = 0xef; 208 209 buf->setRange(0, buf->size() + 8); 210 } 211 212 static void addSDES(int s, const sp<ABuffer> &buffer) { 213 struct sockaddr_in addr; 214 socklen_t addrSize = sizeof(addr); 215 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 216 217 uint8_t *data = buffer->data() + buffer->size(); 218 data[0] = 0x80 | 1; 219 data[1] = 202; // SDES 220 data[4] = 0xde; // SSRC 221 data[5] = 0xad; 222 data[6] = 0xbe; 223 data[7] = 0xef; 224 225 size_t offset = 8; 226 227 data[offset++] = 1; // CNAME 228 229 AString cname = "stagefright@"; 230 cname.append(inet_ntoa(addr.sin_addr)); 231 data[offset++] = cname.size(); 232 233 memcpy(&data[offset], cname.c_str(), cname.size()); 234 offset += cname.size(); 235 236 data[offset++] = 6; // TOOL 237 238 AString tool; 239 MakeUserAgentString(&tool); 240 241 data[offset++] = tool.size(); 242 243 memcpy(&data[offset], tool.c_str(), tool.size()); 244 offset += tool.size(); 245 246 data[offset++] = 0; 247 248 if ((offset % 4) > 0) { 249 size_t count = 4 - (offset % 4); 250 switch (count) { 251 case 3: 252 data[offset++] = 0; 253 case 2: 254 data[offset++] = 0; 255 case 1: 256 data[offset++] = 0; 257 } 258 } 259 260 size_t numWords = (offset / 4) - 1; 261 data[2] = numWords >> 8; 262 data[3] = numWords & 0xff; 263 264 buffer->setRange(buffer->offset(), buffer->size() + offset); 265 } 266 267 // In case we're behind NAT, fire off two UDP packets to the remote 268 // rtp/rtcp ports to poke a hole into the firewall for future incoming 269 // packets. We're going to send an RR/SDES RTCP packet to both of them. 270 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 271 struct sockaddr_in addr; 272 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 273 addr.sin_family = AF_INET; 274 275 AString source; 276 AString server_port; 277 if (!GetAttribute(transport.c_str(), 278 "source", 279 &source)) { 280 ALOGW("Missing 'source' field in Transport response. Using " 281 "RTSP endpoint address."); 282 283 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 284 if (ent == NULL) { 285 ALOGE("Failed to look up address of session host '%s'", 286 mSessionHost.c_str()); 287 288 return false; 289 } 290 291 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 292 } else { 293 addr.sin_addr.s_addr = inet_addr(source.c_str()); 294 } 295 296 if (!GetAttribute(transport.c_str(), 297 "server_port", 298 &server_port)) { 299 ALOGI("Missing 'server_port' field in Transport response."); 300 return false; 301 } 302 303 int rtpPort, rtcpPort; 304 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 305 || rtpPort <= 0 || rtpPort > 65535 306 || rtcpPort <=0 || rtcpPort > 65535 307 || rtcpPort != rtpPort + 1) { 308 ALOGE("Server picked invalid RTP/RTCP port pair %s," 309 " RTP port must be even, RTCP port must be one higher.", 310 server_port.c_str()); 311 312 return false; 313 } 314 315 if (rtpPort & 1) { 316 ALOGW("Server picked an odd RTP port, it should've picked an " 317 "even one, we'll let it pass for now, but this may break " 318 "in the future."); 319 } 320 321 if (addr.sin_addr.s_addr == INADDR_NONE) { 322 return true; 323 } 324 325 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 326 // No firewalls to traverse on the loopback interface. 327 return true; 328 } 329 330 // Make up an RR/SDES RTCP packet. 331 sp<ABuffer> buf = new ABuffer(65536); 332 buf->setRange(0, 0); 333 addRR(buf); 334 addSDES(rtpSocket, buf); 335 336 addr.sin_port = htons(rtpPort); 337 338 ssize_t n = sendto( 339 rtpSocket, buf->data(), buf->size(), 0, 340 (const sockaddr *)&addr, sizeof(addr)); 341 342 if (n < (ssize_t)buf->size()) { 343 ALOGE("failed to poke a hole for RTP packets"); 344 return false; 345 } 346 347 addr.sin_port = htons(rtcpPort); 348 349 n = sendto( 350 rtcpSocket, buf->data(), buf->size(), 0, 351 (const sockaddr *)&addr, sizeof(addr)); 352 353 if (n < (ssize_t)buf->size()) { 354 ALOGE("failed to poke a hole for RTCP packets"); 355 return false; 356 } 357 358 ALOGV("successfully poked holes."); 359 360 return true; 361 } 362 363 static bool isLiveStream(const sp<ASessionDescription> &desc) { 364 AString attrLiveStream; 365 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 366 ssize_t semicolonPos = attrLiveStream.find(";", 2); 367 368 const char* liveStreamValue; 369 if (semicolonPos < 0) { 370 liveStreamValue = attrLiveStream.c_str(); 371 } else { 372 AString valString; 373 valString.setTo(attrLiveStream, 374 semicolonPos + 1, 375 attrLiveStream.size() - semicolonPos - 1); 376 liveStreamValue = valString.c_str(); 377 } 378 379 uint32_t value = strtoul(liveStreamValue, NULL, 10); 380 if (value == 1) { 381 ALOGV("found live stream"); 382 return true; 383 } 384 } else { 385 // It is a live stream if no duration is returned 386 int64_t durationUs; 387 if (!desc->getDurationUs(&durationUs)) { 388 ALOGV("No duration found, assume live stream"); 389 return true; 390 } 391 } 392 393 return false; 394 } 395 396 virtual void onMessageReceived(const sp<AMessage> &msg) { 397 switch (msg->what()) { 398 case 'conn': 399 { 400 int32_t result; 401 CHECK(msg->findInt32("result", &result)); 402 403 ALOGI("connection request completed with result %d (%s)", 404 result, strerror(-result)); 405 406 if (result == OK) { 407 AString request; 408 request = "DESCRIBE "; 409 request.append(mSessionURL); 410 request.append(" RTSP/1.0\r\n"); 411 request.append("Accept: application/sdp\r\n"); 412 request.append("\r\n"); 413 414 sp<AMessage> reply = new AMessage('desc', id()); 415 mConn->sendRequest(request.c_str(), reply); 416 } else { 417 (new AMessage('disc', id()))->post(); 418 } 419 break; 420 } 421 422 case 'disc': 423 { 424 ++mKeepAliveGeneration; 425 426 int32_t reconnect; 427 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 428 sp<AMessage> reply = new AMessage('conn', id()); 429 mConn->connect(mOriginalSessionURL.c_str(), reply); 430 } else { 431 (new AMessage('quit', id()))->post(); 432 } 433 break; 434 } 435 436 case 'desc': 437 { 438 int32_t result; 439 CHECK(msg->findInt32("result", &result)); 440 441 ALOGI("DESCRIBE completed with result %d (%s)", 442 result, strerror(-result)); 443 444 if (result == OK) { 445 sp<RefBase> obj; 446 CHECK(msg->findObject("response", &obj)); 447 sp<ARTSPResponse> response = 448 static_cast<ARTSPResponse *>(obj.get()); 449 450 if (response->mStatusCode == 302) { 451 ssize_t i = response->mHeaders.indexOfKey("location"); 452 CHECK_GE(i, 0); 453 454 mSessionURL = response->mHeaders.valueAt(i); 455 456 AString request; 457 request = "DESCRIBE "; 458 request.append(mSessionURL); 459 request.append(" RTSP/1.0\r\n"); 460 request.append("Accept: application/sdp\r\n"); 461 request.append("\r\n"); 462 463 sp<AMessage> reply = new AMessage('desc', id()); 464 mConn->sendRequest(request.c_str(), reply); 465 break; 466 } 467 468 if (response->mStatusCode != 200) { 469 result = UNKNOWN_ERROR; 470 } else { 471 mSessionDesc = new ASessionDescription; 472 473 mSessionDesc->setTo( 474 response->mContent->data(), 475 response->mContent->size()); 476 477 if (!mSessionDesc->isValid()) { 478 ALOGE("Failed to parse session description."); 479 result = ERROR_MALFORMED; 480 } else { 481 ssize_t i = response->mHeaders.indexOfKey("content-base"); 482 if (i >= 0) { 483 mBaseURL = response->mHeaders.valueAt(i); 484 } else { 485 i = response->mHeaders.indexOfKey("content-location"); 486 if (i >= 0) { 487 mBaseURL = response->mHeaders.valueAt(i); 488 } else { 489 mBaseURL = mSessionURL; 490 } 491 } 492 493 mSeekable = !isLiveStream(mSessionDesc); 494 495 if (!mBaseURL.startsWith("rtsp://")) { 496 // Some misbehaving servers specify a relative 497 // URL in one of the locations above, combine 498 // it with the absolute session URL to get 499 // something usable... 500 501 ALOGW("Server specified a non-absolute base URL" 502 ", combining it with the session URL to " 503 "get something usable..."); 504 505 AString tmp; 506 CHECK(MakeURL( 507 mSessionURL.c_str(), 508 mBaseURL.c_str(), 509 &tmp)); 510 511 mBaseURL = tmp; 512 } 513 514 if (mSessionDesc->countTracks() < 2) { 515 // There's no actual tracks in this session. 516 // The first "track" is merely session meta 517 // data. 518 519 ALOGW("Session doesn't contain any playable " 520 "tracks. Aborting."); 521 result = ERROR_UNSUPPORTED; 522 } else { 523 setupTrack(1); 524 } 525 } 526 } 527 } 528 529 if (result != OK) { 530 sp<AMessage> reply = new AMessage('disc', id()); 531 mConn->disconnect(reply); 532 } 533 break; 534 } 535 536 case 'sdpl': 537 { 538 int32_t result; 539 CHECK(msg->findInt32("result", &result)); 540 541 ALOGI("SDP connection request completed with result %d (%s)", 542 result, strerror(-result)); 543 544 if (result == OK) { 545 sp<RefBase> obj; 546 CHECK(msg->findObject("description", &obj)); 547 mSessionDesc = 548 static_cast<ASessionDescription *>(obj.get()); 549 550 if (!mSessionDesc->isValid()) { 551 ALOGE("Failed to parse session description."); 552 result = ERROR_MALFORMED; 553 } else { 554 mBaseURL = mSessionURL; 555 556 mSeekable = !isLiveStream(mSessionDesc); 557 558 if (mSessionDesc->countTracks() < 2) { 559 // There's no actual tracks in this session. 560 // The first "track" is merely session meta 561 // data. 562 563 ALOGW("Session doesn't contain any playable " 564 "tracks. Aborting."); 565 result = ERROR_UNSUPPORTED; 566 } else { 567 setupTrack(1); 568 } 569 } 570 } 571 572 if (result != OK) { 573 sp<AMessage> reply = new AMessage('disc', id()); 574 mConn->disconnect(reply); 575 } 576 break; 577 } 578 579 case 'setu': 580 { 581 size_t index; 582 CHECK(msg->findSize("index", &index)); 583 584 TrackInfo *track = NULL; 585 size_t trackIndex; 586 if (msg->findSize("track-index", &trackIndex)) { 587 track = &mTracks.editItemAt(trackIndex); 588 } 589 590 int32_t result; 591 CHECK(msg->findInt32("result", &result)); 592 593 ALOGI("SETUP(%d) completed with result %d (%s)", 594 index, result, strerror(-result)); 595 596 if (result == OK) { 597 CHECK(track != NULL); 598 599 sp<RefBase> obj; 600 CHECK(msg->findObject("response", &obj)); 601 sp<ARTSPResponse> response = 602 static_cast<ARTSPResponse *>(obj.get()); 603 604 if (response->mStatusCode != 200) { 605 result = UNKNOWN_ERROR; 606 } else { 607 ssize_t i = response->mHeaders.indexOfKey("session"); 608 CHECK_GE(i, 0); 609 610 mSessionID = response->mHeaders.valueAt(i); 611 612 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 613 AString timeoutStr; 614 if (GetAttribute( 615 mSessionID.c_str(), "timeout", &timeoutStr)) { 616 char *end; 617 unsigned long timeoutSecs = 618 strtoul(timeoutStr.c_str(), &end, 10); 619 620 if (end == timeoutStr.c_str() || *end != '\0') { 621 ALOGW("server specified malformed timeout '%s'", 622 timeoutStr.c_str()); 623 624 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 625 } else if (timeoutSecs < 15) { 626 ALOGW("server specified too short a timeout " 627 "(%lu secs), using default.", 628 timeoutSecs); 629 630 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 631 } else { 632 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 633 634 ALOGI("server specified timeout of %lu secs.", 635 timeoutSecs); 636 } 637 } 638 639 i = mSessionID.find(";"); 640 if (i >= 0) { 641 // Remove options, i.e. ";timeout=90" 642 mSessionID.erase(i, mSessionID.size() - i); 643 } 644 645 sp<AMessage> notify = new AMessage('accu', id()); 646 notify->setSize("track-index", trackIndex); 647 648 i = response->mHeaders.indexOfKey("transport"); 649 CHECK_GE(i, 0); 650 651 if (!track->mUsingInterleavedTCP) { 652 AString transport = response->mHeaders.valueAt(i); 653 654 // We are going to continue even if we were 655 // unable to poke a hole into the firewall... 656 pokeAHole( 657 track->mRTPSocket, 658 track->mRTCPSocket, 659 transport); 660 } 661 662 mRTPConn->addStream( 663 track->mRTPSocket, track->mRTCPSocket, 664 mSessionDesc, index, 665 notify, track->mUsingInterleavedTCP); 666 667 mSetupTracksSuccessful = true; 668 } 669 } 670 671 if (result != OK) { 672 if (track) { 673 if (!track->mUsingInterleavedTCP) { 674 // Clear the tag 675 if (mUIDValid) { 676 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 677 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 678 } 679 680 close(track->mRTPSocket); 681 close(track->mRTCPSocket); 682 } 683 684 mTracks.removeItemsAt(trackIndex); 685 } 686 } 687 688 ++index; 689 if (index < mSessionDesc->countTracks()) { 690 setupTrack(index); 691 } else if (mSetupTracksSuccessful) { 692 ++mKeepAliveGeneration; 693 postKeepAlive(); 694 695 AString request = "PLAY "; 696 request.append(mSessionURL); 697 request.append(" RTSP/1.0\r\n"); 698 699 request.append("Session: "); 700 request.append(mSessionID); 701 request.append("\r\n"); 702 703 request.append("\r\n"); 704 705 sp<AMessage> reply = new AMessage('play', id()); 706 mConn->sendRequest(request.c_str(), reply); 707 } else { 708 sp<AMessage> reply = new AMessage('disc', id()); 709 mConn->disconnect(reply); 710 } 711 break; 712 } 713 714 case 'play': 715 { 716 int32_t result; 717 CHECK(msg->findInt32("result", &result)); 718 719 ALOGI("PLAY completed with result %d (%s)", 720 result, strerror(-result)); 721 722 if (result == OK) { 723 sp<RefBase> obj; 724 CHECK(msg->findObject("response", &obj)); 725 sp<ARTSPResponse> response = 726 static_cast<ARTSPResponse *>(obj.get()); 727 728 if (response->mStatusCode != 200) { 729 result = UNKNOWN_ERROR; 730 } else { 731 parsePlayResponse(response); 732 733 sp<AMessage> timeout = new AMessage('tiou', id()); 734 timeout->post(kStartupTimeoutUs); 735 } 736 } 737 738 if (result != OK) { 739 sp<AMessage> reply = new AMessage('disc', id()); 740 mConn->disconnect(reply); 741 } 742 743 break; 744 } 745 746 case 'aliv': 747 { 748 int32_t generation; 749 CHECK(msg->findInt32("generation", &generation)); 750 751 if (generation != mKeepAliveGeneration) { 752 // obsolete event. 753 break; 754 } 755 756 AString request; 757 request.append("OPTIONS "); 758 request.append(mSessionURL); 759 request.append(" RTSP/1.0\r\n"); 760 request.append("Session: "); 761 request.append(mSessionID); 762 request.append("\r\n"); 763 request.append("\r\n"); 764 765 sp<AMessage> reply = new AMessage('opts', id()); 766 reply->setInt32("generation", mKeepAliveGeneration); 767 mConn->sendRequest(request.c_str(), reply); 768 break; 769 } 770 771 case 'opts': 772 { 773 int32_t result; 774 CHECK(msg->findInt32("result", &result)); 775 776 ALOGI("OPTIONS completed with result %d (%s)", 777 result, strerror(-result)); 778 779 int32_t generation; 780 CHECK(msg->findInt32("generation", &generation)); 781 782 if (generation != mKeepAliveGeneration) { 783 // obsolete event. 784 break; 785 } 786 787 postKeepAlive(); 788 break; 789 } 790 791 case 'abor': 792 { 793 for (size_t i = 0; i < mTracks.size(); ++i) { 794 TrackInfo *info = &mTracks.editItemAt(i); 795 796 if (!mFirstAccessUnit) { 797 postQueueEOS(i, ERROR_END_OF_STREAM); 798 } 799 800 if (!info->mUsingInterleavedTCP) { 801 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 802 803 // Clear the tag 804 if (mUIDValid) { 805 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 806 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 807 } 808 809 close(info->mRTPSocket); 810 close(info->mRTCPSocket); 811 } 812 } 813 mTracks.clear(); 814 mSetupTracksSuccessful = false; 815 mSeekPending = false; 816 mFirstAccessUnit = true; 817 mAllTracksHaveTime = false; 818 mNTPAnchorUs = -1; 819 mMediaAnchorUs = -1; 820 mNumAccessUnitsReceived = 0; 821 mReceivedFirstRTCPPacket = false; 822 mReceivedFirstRTPPacket = false; 823 mSeekable = true; 824 825 sp<AMessage> reply = new AMessage('tear', id()); 826 827 int32_t reconnect; 828 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 829 reply->setInt32("reconnect", true); 830 } 831 832 AString request; 833 request = "TEARDOWN "; 834 835 // XXX should use aggregate url from SDP here... 836 request.append(mSessionURL); 837 request.append(" RTSP/1.0\r\n"); 838 839 request.append("Session: "); 840 request.append(mSessionID); 841 request.append("\r\n"); 842 843 request.append("\r\n"); 844 845 mConn->sendRequest(request.c_str(), reply); 846 break; 847 } 848 849 case 'tear': 850 { 851 int32_t result; 852 CHECK(msg->findInt32("result", &result)); 853 854 ALOGI("TEARDOWN completed with result %d (%s)", 855 result, strerror(-result)); 856 857 sp<AMessage> reply = new AMessage('disc', id()); 858 859 int32_t reconnect; 860 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 861 reply->setInt32("reconnect", true); 862 } 863 864 mConn->disconnect(reply); 865 break; 866 } 867 868 case 'quit': 869 { 870 sp<AMessage> msg = mNotify->dup(); 871 msg->setInt32("what", kWhatDisconnected); 872 msg->setInt32("result", UNKNOWN_ERROR); 873 msg->post(); 874 break; 875 } 876 877 case 'chek': 878 { 879 int32_t generation; 880 CHECK(msg->findInt32("generation", &generation)); 881 if (generation != mCheckGeneration) { 882 // This is an outdated message. Ignore. 883 break; 884 } 885 886 if (mNumAccessUnitsReceived == 0) { 887#if 1 888 ALOGI("stream ended? aborting."); 889 (new AMessage('abor', id()))->post(); 890 break; 891#else 892 ALOGI("haven't seen an AU in a looong time."); 893#endif 894 } 895 896 mNumAccessUnitsReceived = 0; 897 msg->post(kAccessUnitTimeoutUs); 898 break; 899 } 900 901 case 'accu': 902 { 903 int32_t timeUpdate; 904 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 905 size_t trackIndex; 906 CHECK(msg->findSize("track-index", &trackIndex)); 907 908 uint32_t rtpTime; 909 uint64_t ntpTime; 910 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 911 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 912 913 onTimeUpdate(trackIndex, rtpTime, ntpTime); 914 break; 915 } 916 917 int32_t first; 918 if (msg->findInt32("first-rtcp", &first)) { 919 mReceivedFirstRTCPPacket = true; 920 break; 921 } 922 923 if (msg->findInt32("first-rtp", &first)) { 924 mReceivedFirstRTPPacket = true; 925 break; 926 } 927 928 ++mNumAccessUnitsReceived; 929 postAccessUnitTimeoutCheck(); 930 931 size_t trackIndex; 932 CHECK(msg->findSize("track-index", &trackIndex)); 933 934 if (trackIndex >= mTracks.size()) { 935 ALOGV("late packets ignored."); 936 break; 937 } 938 939 TrackInfo *track = &mTracks.editItemAt(trackIndex); 940 941 int32_t eos; 942 if (msg->findInt32("eos", &eos)) { 943 ALOGI("received BYE on track index %d", trackIndex); 944#if 0 945 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 946#endif 947 return; 948 } 949 950 sp<ABuffer> accessUnit; 951 CHECK(msg->findBuffer("access-unit", &accessUnit)); 952 953 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 954 955 if (mSeekPending) { 956 ALOGV("we're seeking, dropping stale packet."); 957 break; 958 } 959 960 if (seqNum < track->mFirstSeqNumInSegment) { 961 ALOGV("dropping stale access-unit (%d < %d)", 962 seqNum, track->mFirstSeqNumInSegment); 963 break; 964 } 965 966 if (track->mNewSegment) { 967 track->mNewSegment = false; 968 } 969 970 onAccessUnitComplete(trackIndex, accessUnit); 971 break; 972 } 973 974 case 'seek': 975 { 976 if (!mSeekable) { 977 ALOGW("This is a live stream, ignoring seek request."); 978 979 sp<AMessage> msg = mNotify->dup(); 980 msg->setInt32("what", kWhatSeekDone); 981 msg->post(); 982 break; 983 } 984 985 int64_t timeUs; 986 CHECK(msg->findInt64("time", &timeUs)); 987 988 mSeekPending = true; 989 990 // Disable the access unit timeout until we resumed 991 // playback again. 992 mCheckPending = true; 993 ++mCheckGeneration; 994 995 AString request = "PAUSE "; 996 request.append(mSessionURL); 997 request.append(" RTSP/1.0\r\n"); 998 999 request.append("Session: "); 1000 request.append(mSessionID); 1001 request.append("\r\n"); 1002 1003 request.append("\r\n"); 1004 1005 sp<AMessage> reply = new AMessage('see1', id()); 1006 reply->setInt64("time", timeUs); 1007 mConn->sendRequest(request.c_str(), reply); 1008 break; 1009 } 1010 1011 case 'see1': 1012 { 1013 // Session is paused now. 1014 for (size_t i = 0; i < mTracks.size(); ++i) { 1015 TrackInfo *info = &mTracks.editItemAt(i); 1016 1017 postQueueSeekDiscontinuity(i); 1018 1019 info->mRTPAnchor = 0; 1020 info->mNTPAnchorUs = -1; 1021 } 1022 1023 mAllTracksHaveTime = false; 1024 mNTPAnchorUs = -1; 1025 1026 int64_t timeUs; 1027 CHECK(msg->findInt64("time", &timeUs)); 1028 1029 AString request = "PLAY "; 1030 request.append(mSessionURL); 1031 request.append(" RTSP/1.0\r\n"); 1032 1033 request.append("Session: "); 1034 request.append(mSessionID); 1035 request.append("\r\n"); 1036 1037 request.append( 1038 StringPrintf( 1039 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1040 1041 request.append("\r\n"); 1042 1043 sp<AMessage> reply = new AMessage('see2', id()); 1044 mConn->sendRequest(request.c_str(), reply); 1045 break; 1046 } 1047 1048 case 'see2': 1049 { 1050 CHECK(mSeekPending); 1051 1052 int32_t result; 1053 CHECK(msg->findInt32("result", &result)); 1054 1055 ALOGI("PLAY completed with result %d (%s)", 1056 result, strerror(-result)); 1057 1058 mCheckPending = false; 1059 postAccessUnitTimeoutCheck(); 1060 1061 if (result == OK) { 1062 sp<RefBase> obj; 1063 CHECK(msg->findObject("response", &obj)); 1064 sp<ARTSPResponse> response = 1065 static_cast<ARTSPResponse *>(obj.get()); 1066 1067 if (response->mStatusCode != 200) { 1068 result = UNKNOWN_ERROR; 1069 } else { 1070 parsePlayResponse(response); 1071 1072 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1073 CHECK_GE(i, 0); 1074 1075 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1076 1077 ALOGI("seek completed."); 1078 } 1079 } 1080 1081 if (result != OK) { 1082 ALOGE("seek failed, aborting."); 1083 (new AMessage('abor', id()))->post(); 1084 } 1085 1086 mSeekPending = false; 1087 1088 sp<AMessage> msg = mNotify->dup(); 1089 msg->setInt32("what", kWhatSeekDone); 1090 msg->post(); 1091 break; 1092 } 1093 1094 case 'biny': 1095 { 1096 sp<ABuffer> buffer; 1097 CHECK(msg->findBuffer("buffer", &buffer)); 1098 1099 int32_t index; 1100 CHECK(buffer->meta()->findInt32("index", &index)); 1101 1102 mRTPConn->injectPacket(index, buffer); 1103 break; 1104 } 1105 1106 case 'tiou': 1107 { 1108 if (!mReceivedFirstRTCPPacket) { 1109 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 1110 ALOGW("We received RTP packets but no RTCP packets, " 1111 "using fake timestamps."); 1112 1113 mTryFakeRTCP = true; 1114 1115 mReceivedFirstRTCPPacket = true; 1116 1117 fakeTimestamps(); 1118 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1119 ALOGW("Never received any data, switching transports."); 1120 1121 mTryTCPInterleaving = true; 1122 1123 sp<AMessage> msg = new AMessage('abor', id()); 1124 msg->setInt32("reconnect", true); 1125 msg->post(); 1126 } else { 1127 ALOGW("Never received any data, disconnecting."); 1128 (new AMessage('abor', id()))->post(); 1129 } 1130 } else { 1131 if (!mAllTracksHaveTime) { 1132 ALOGW("We received some RTCP packets, but time " 1133 "could not be established on all tracks, now " 1134 "using fake timestamps"); 1135 1136 fakeTimestamps(); 1137 } 1138 } 1139 break; 1140 } 1141 1142 default: 1143 TRESPASS(); 1144 break; 1145 } 1146 } 1147 1148 void postKeepAlive() { 1149 sp<AMessage> msg = new AMessage('aliv', id()); 1150 msg->setInt32("generation", mKeepAliveGeneration); 1151 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1152 } 1153 1154 void postAccessUnitTimeoutCheck() { 1155 if (mCheckPending) { 1156 return; 1157 } 1158 1159 mCheckPending = true; 1160 sp<AMessage> check = new AMessage('chek', id()); 1161 check->setInt32("generation", mCheckGeneration); 1162 check->post(kAccessUnitTimeoutUs); 1163 } 1164 1165 static void SplitString( 1166 const AString &s, const char *separator, List<AString> *items) { 1167 items->clear(); 1168 size_t start = 0; 1169 while (start < s.size()) { 1170 ssize_t offset = s.find(separator, start); 1171 1172 if (offset < 0) { 1173 items->push_back(AString(s, start, s.size() - start)); 1174 break; 1175 } 1176 1177 items->push_back(AString(s, start, offset - start)); 1178 start = offset + strlen(separator); 1179 } 1180 } 1181 1182 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1183 if (mTracks.size() == 0) { 1184 ALOGV("parsePlayResponse: late packets ignored."); 1185 return; 1186 } 1187 1188 ssize_t i = response->mHeaders.indexOfKey("range"); 1189 if (i < 0) { 1190 // Server doesn't even tell use what range it is going to 1191 // play, therefore we won't support seeking. 1192 return; 1193 } 1194 1195 AString range = response->mHeaders.valueAt(i); 1196 ALOGV("Range: %s", range.c_str()); 1197 1198 AString val; 1199 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1200 1201 float npt1, npt2; 1202 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1203 // This is a live stream and therefore not seekable. 1204 1205 ALOGI("This is a live stream"); 1206 return; 1207 } 1208 1209 i = response->mHeaders.indexOfKey("rtp-info"); 1210 CHECK_GE(i, 0); 1211 1212 AString rtpInfo = response->mHeaders.valueAt(i); 1213 List<AString> streamInfos; 1214 SplitString(rtpInfo, ",", &streamInfos); 1215 1216 int n = 1; 1217 for (List<AString>::iterator it = streamInfos.begin(); 1218 it != streamInfos.end(); ++it) { 1219 (*it).trim(); 1220 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1221 1222 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1223 1224 size_t trackIndex = 0; 1225 while (trackIndex < mTracks.size() 1226 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1227 ++trackIndex; 1228 } 1229 CHECK_LT(trackIndex, mTracks.size()); 1230 1231 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1232 1233 char *end; 1234 unsigned long seq = strtoul(val.c_str(), &end, 10); 1235 1236 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1237 info->mFirstSeqNumInSegment = seq; 1238 info->mNewSegment = true; 1239 1240 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1241 1242 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1243 1244 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1245 1246 info->mNormalPlayTimeRTP = rtpTime; 1247 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1248 1249 if (!mFirstAccessUnit) { 1250 postNormalPlayTimeMapping( 1251 trackIndex, 1252 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1253 } 1254 1255 ++n; 1256 } 1257 } 1258 1259 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1260 CHECK_GE(index, 0u); 1261 CHECK_LT(index, mTracks.size()); 1262 1263 const TrackInfo &info = mTracks.itemAt(index); 1264 1265 *timeScale = info.mTimeScale; 1266 1267 return info.mPacketSource->getFormat(); 1268 } 1269 1270 size_t countTracks() const { 1271 return mTracks.size(); 1272 } 1273 1274private: 1275 struct TrackInfo { 1276 AString mURL; 1277 int mRTPSocket; 1278 int mRTCPSocket; 1279 bool mUsingInterleavedTCP; 1280 uint32_t mFirstSeqNumInSegment; 1281 bool mNewSegment; 1282 1283 uint32_t mRTPAnchor; 1284 int64_t mNTPAnchorUs; 1285 int32_t mTimeScale; 1286 1287 uint32_t mNormalPlayTimeRTP; 1288 int64_t mNormalPlayTimeUs; 1289 1290 sp<APacketSource> mPacketSource; 1291 1292 // Stores packets temporarily while no notion of time 1293 // has been established yet. 1294 List<sp<ABuffer> > mPackets; 1295 }; 1296 1297 sp<AMessage> mNotify; 1298 bool mUIDValid; 1299 uid_t mUID; 1300 sp<ALooper> mNetLooper; 1301 sp<ARTSPConnection> mConn; 1302 sp<ARTPConnection> mRTPConn; 1303 sp<ASessionDescription> mSessionDesc; 1304 AString mOriginalSessionURL; // This one still has user:pass@ 1305 AString mSessionURL; 1306 AString mSessionHost; 1307 AString mBaseURL; 1308 AString mSessionID; 1309 bool mSetupTracksSuccessful; 1310 bool mSeekPending; 1311 bool mFirstAccessUnit; 1312 1313 bool mAllTracksHaveTime; 1314 int64_t mNTPAnchorUs; 1315 int64_t mMediaAnchorUs; 1316 int64_t mLastMediaTimeUs; 1317 1318 int64_t mNumAccessUnitsReceived; 1319 bool mCheckPending; 1320 int32_t mCheckGeneration; 1321 bool mTryTCPInterleaving; 1322 bool mTryFakeRTCP; 1323 bool mReceivedFirstRTCPPacket; 1324 bool mReceivedFirstRTPPacket; 1325 bool mSeekable; 1326 int64_t mKeepAliveTimeoutUs; 1327 int32_t mKeepAliveGeneration; 1328 1329 Vector<TrackInfo> mTracks; 1330 1331 void setupTrack(size_t index) { 1332 sp<APacketSource> source = 1333 new APacketSource(mSessionDesc, index); 1334 1335 if (source->initCheck() != OK) { 1336 ALOGW("Unsupported format. Ignoring track #%d.", index); 1337 1338 sp<AMessage> reply = new AMessage('setu', id()); 1339 reply->setSize("index", index); 1340 reply->setInt32("result", ERROR_UNSUPPORTED); 1341 reply->post(); 1342 return; 1343 } 1344 1345 AString url; 1346 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1347 1348 AString trackURL; 1349 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1350 1351 mTracks.push(TrackInfo()); 1352 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1353 info->mURL = trackURL; 1354 info->mPacketSource = source; 1355 info->mUsingInterleavedTCP = false; 1356 info->mFirstSeqNumInSegment = 0; 1357 info->mNewSegment = true; 1358 info->mRTPAnchor = 0; 1359 info->mNTPAnchorUs = -1; 1360 info->mNormalPlayTimeRTP = 0; 1361 info->mNormalPlayTimeUs = 0ll; 1362 1363 unsigned long PT; 1364 AString formatDesc; 1365 AString formatParams; 1366 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1367 1368 int32_t timescale; 1369 int32_t numChannels; 1370 ASessionDescription::ParseFormatDesc( 1371 formatDesc.c_str(), ×cale, &numChannels); 1372 1373 info->mTimeScale = timescale; 1374 1375 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1376 1377 AString request = "SETUP "; 1378 request.append(trackURL); 1379 request.append(" RTSP/1.0\r\n"); 1380 1381 if (mTryTCPInterleaving) { 1382 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1383 info->mUsingInterleavedTCP = true; 1384 info->mRTPSocket = interleaveIndex; 1385 info->mRTCPSocket = interleaveIndex + 1; 1386 1387 request.append("Transport: RTP/AVP/TCP;interleaved="); 1388 request.append(interleaveIndex); 1389 request.append("-"); 1390 request.append(interleaveIndex + 1); 1391 } else { 1392 unsigned rtpPort; 1393 ARTPConnection::MakePortPair( 1394 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1395 1396 if (mUIDValid) { 1397 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1398 (uint32_t)*(uint32_t*) "RTP_"); 1399 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1400 (uint32_t)*(uint32_t*) "RTP_"); 1401 } 1402 1403 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1404 request.append(rtpPort); 1405 request.append("-"); 1406 request.append(rtpPort + 1); 1407 } 1408 1409 request.append("\r\n"); 1410 1411 if (index > 1) { 1412 request.append("Session: "); 1413 request.append(mSessionID); 1414 request.append("\r\n"); 1415 } 1416 1417 request.append("\r\n"); 1418 1419 sp<AMessage> reply = new AMessage('setu', id()); 1420 reply->setSize("index", index); 1421 reply->setSize("track-index", mTracks.size() - 1); 1422 mConn->sendRequest(request.c_str(), reply); 1423 } 1424 1425 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1426 out->clear(); 1427 1428 if (strncasecmp("rtsp://", baseURL, 7)) { 1429 // Base URL must be absolute 1430 return false; 1431 } 1432 1433 if (!strncasecmp("rtsp://", url, 7)) { 1434 // "url" is already an absolute URL, ignore base URL. 1435 out->setTo(url); 1436 return true; 1437 } 1438 1439 size_t n = strlen(baseURL); 1440 if (baseURL[n - 1] == '/') { 1441 out->setTo(baseURL); 1442 out->append(url); 1443 } else { 1444 const char *slashPos = strrchr(baseURL, '/'); 1445 1446 if (slashPos > &baseURL[6]) { 1447 out->setTo(baseURL, slashPos - baseURL); 1448 } else { 1449 out->setTo(baseURL); 1450 } 1451 1452 out->append("/"); 1453 out->append(url); 1454 } 1455 1456 return true; 1457 } 1458 1459 void fakeTimestamps() { 1460 mNTPAnchorUs = -1ll; 1461 for (size_t i = 0; i < mTracks.size(); ++i) { 1462 onTimeUpdate(i, 0, 0ll); 1463 } 1464 } 1465 1466 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1467 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1468 trackIndex, rtpTime, ntpTime); 1469 1470 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1471 1472 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1473 1474 track->mRTPAnchor = rtpTime; 1475 track->mNTPAnchorUs = ntpTimeUs; 1476 1477 if (mNTPAnchorUs < 0) { 1478 mNTPAnchorUs = ntpTimeUs; 1479 mMediaAnchorUs = mLastMediaTimeUs; 1480 } 1481 1482 if (!mAllTracksHaveTime) { 1483 bool allTracksHaveTime = true; 1484 for (size_t i = 0; i < mTracks.size(); ++i) { 1485 TrackInfo *track = &mTracks.editItemAt(i); 1486 if (track->mNTPAnchorUs < 0) { 1487 allTracksHaveTime = false; 1488 break; 1489 } 1490 } 1491 if (allTracksHaveTime) { 1492 mAllTracksHaveTime = true; 1493 ALOGI("Time now established for all tracks."); 1494 } 1495 } 1496 } 1497 1498 void onAccessUnitComplete( 1499 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1500 ALOGV("onAccessUnitComplete track %d", trackIndex); 1501 1502 if (mFirstAccessUnit) { 1503 sp<AMessage> msg = mNotify->dup(); 1504 msg->setInt32("what", kWhatConnected); 1505 msg->post(); 1506 1507 if (mSeekable) { 1508 for (size_t i = 0; i < mTracks.size(); ++i) { 1509 TrackInfo *info = &mTracks.editItemAt(i); 1510 1511 postNormalPlayTimeMapping( 1512 i, 1513 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1514 } 1515 } 1516 1517 mFirstAccessUnit = false; 1518 } 1519 1520 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1521 1522 if (!mAllTracksHaveTime) { 1523 ALOGV("storing accessUnit, no time established yet"); 1524 track->mPackets.push_back(accessUnit); 1525 return; 1526 } 1527 1528 while (!track->mPackets.empty()) { 1529 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1530 track->mPackets.erase(track->mPackets.begin()); 1531 1532 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1533 postQueueAccessUnit(trackIndex, accessUnit); 1534 } 1535 } 1536 1537 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1538 postQueueAccessUnit(trackIndex, accessUnit); 1539 } 1540 } 1541 1542 bool addMediaTimestamp( 1543 int32_t trackIndex, const TrackInfo *track, 1544 const sp<ABuffer> &accessUnit) { 1545 uint32_t rtpTime; 1546 CHECK(accessUnit->meta()->findInt32( 1547 "rtp-time", (int32_t *)&rtpTime)); 1548 1549 int64_t relRtpTimeUs = 1550 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1551 / track->mTimeScale; 1552 1553 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1554 1555 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1556 1557 if (mediaTimeUs > mLastMediaTimeUs) { 1558 mLastMediaTimeUs = mediaTimeUs; 1559 } 1560 1561 if (mediaTimeUs < 0) { 1562 ALOGV("dropping early accessUnit."); 1563 return false; 1564 } 1565 1566 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1567 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1568 1569 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1570 1571 return true; 1572 } 1573 1574 void postQueueAccessUnit( 1575 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1576 sp<AMessage> msg = mNotify->dup(); 1577 msg->setInt32("what", kWhatAccessUnit); 1578 msg->setSize("trackIndex", trackIndex); 1579 msg->setBuffer("accessUnit", accessUnit); 1580 msg->post(); 1581 } 1582 1583 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1584 sp<AMessage> msg = mNotify->dup(); 1585 msg->setInt32("what", kWhatEOS); 1586 msg->setSize("trackIndex", trackIndex); 1587 msg->setInt32("finalResult", finalResult); 1588 msg->post(); 1589 } 1590 1591 void postQueueSeekDiscontinuity(size_t trackIndex) { 1592 sp<AMessage> msg = mNotify->dup(); 1593 msg->setInt32("what", kWhatSeekDiscontinuity); 1594 msg->setSize("trackIndex", trackIndex); 1595 msg->post(); 1596 } 1597 1598 void postNormalPlayTimeMapping( 1599 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1600 sp<AMessage> msg = mNotify->dup(); 1601 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1602 msg->setSize("trackIndex", trackIndex); 1603 msg->setInt32("rtpTime", rtpTime); 1604 msg->setInt64("nptUs", nptUs); 1605 msg->post(); 1606 } 1607 1608 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1609}; 1610 1611} // namespace android 1612 1613#endif // MY_HANDLER_H_ 1614