1/* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "Sender" 19#include <utils/Log.h> 20 21#include "Sender.h" 22 23#include "ANetworkSession.h" 24 25#include <media/stagefright/foundation/ABuffer.h> 26#include <media/stagefright/foundation/ADebug.h> 27#include <media/stagefright/foundation/AMessage.h> 28#include <media/stagefright/foundation/hexdump.h> 29#include <media/stagefright/MediaErrors.h> 30#include <media/stagefright/Utils.h> 31 32#include <math.h> 33 34#define DEBUG_JITTER 0 35 36namespace android { 37 38//////////////////////////////////////////////////////////////////////////////// 39 40#if DEBUG_JITTER 41struct TimeSeries { 42 TimeSeries(); 43 44 void add(double val); 45 46 double mean() const; 47 double sdev() const; 48 49private: 50 enum { 51 kHistorySize = 20 52 }; 53 double mValues[kHistorySize]; 54 55 size_t mCount; 56 double mSum; 57}; 58 59TimeSeries::TimeSeries() 60 : mCount(0), 61 mSum(0.0) { 62} 63 64void TimeSeries::add(double val) { 65 if (mCount < kHistorySize) { 66 mValues[mCount++] = val; 67 mSum += val; 68 } else { 69 mSum -= mValues[0]; 70 memmove(&mValues[0], &mValues[1], (kHistorySize - 1) * sizeof(double)); 71 mValues[kHistorySize - 1] = val; 72 mSum += val; 73 } 74} 75 76double TimeSeries::mean() const { 77 if (mCount < 1) { 78 return 0.0; 79 } 80 81 return mSum / mCount; 82} 83 84double TimeSeries::sdev() const { 85 if (mCount < 1) { 86 return 0.0; 87 } 88 89 double m = mean(); 90 91 double sum = 0.0; 92 for (size_t i = 0; i < mCount; ++i) { 93 double tmp = mValues[i] - m; 94 tmp *= tmp; 95 96 sum += tmp; 97 } 98 99 return sqrt(sum / mCount); 100} 101#endif // DEBUG_JITTER 102 103//////////////////////////////////////////////////////////////////////////////// 104 105static size_t kMaxRTPPacketSize = 1500; 106static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188; 107 108Sender::Sender( 109 const sp<ANetworkSession> &netSession, 110 const sp<AMessage> ¬ify) 111 : mNetSession(netSession), 112 mNotify(notify), 113 mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)), 114 mTransportMode(TRANSPORT_UDP), 115 mRTPChannel(0), 116 mRTCPChannel(0), 117 mRTPPort(0), 118 mRTPSessionID(0), 119 mRTCPSessionID(0), 120#if ENABLE_RETRANSMISSION 121 mRTPRetransmissionSessionID(0), 122 mRTCPRetransmissionSessionID(0), 123#endif 124 mClientRTPPort(0), 125 mClientRTCPPort(0), 126 mRTPConnected(false), 127 mRTCPConnected(false), 128 mFirstOutputBufferReadyTimeUs(-1ll), 129 mFirstOutputBufferSentTimeUs(-1ll), 130 mRTPSeqNo(0), 131#if ENABLE_RETRANSMISSION 132 mRTPRetransmissionSeqNo(0), 133#endif 134 mLastNTPTime(0), 135 mLastRTPTime(0), 136 mNumRTPSent(0), 137 mNumRTPOctetsSent(0), 138 mNumSRsSent(0), 139 mSendSRPending(false) 140#if ENABLE_RETRANSMISSION 141 ,mHistoryLength(0) 142#endif 143#if TRACK_BANDWIDTH 144 ,mFirstPacketTimeUs(-1ll) 145 ,mTotalBytesSent(0ll) 146#endif 147#if LOG_TRANSPORT_STREAM 148 ,mLogFile(NULL) 149#endif 150{ 151 mTSQueue->setRange(0, 12); 152 153#if LOG_TRANSPORT_STREAM 154 mLogFile = fopen("/system/etc/log.ts", "wb"); 155#endif 156} 157 158Sender::~Sender() { 159#if ENABLE_RETRANSMISSION 160 if (mRTCPRetransmissionSessionID != 0) { 161 mNetSession->destroySession(mRTCPRetransmissionSessionID); 162 } 163 164 if (mRTPRetransmissionSessionID != 0) { 165 mNetSession->destroySession(mRTPRetransmissionSessionID); 166 } 167#endif 168 169 if (mRTCPSessionID != 0) { 170 mNetSession->destroySession(mRTCPSessionID); 171 } 172 173 if (mRTPSessionID != 0) { 174 mNetSession->destroySession(mRTPSessionID); 175 } 176 177#if LOG_TRANSPORT_STREAM 178 if (mLogFile != NULL) { 179 fclose(mLogFile); 180 mLogFile = NULL; 181 } 182#endif 183} 184 185status_t Sender::init( 186 const char *clientIP, int32_t clientRtp, int32_t clientRtcp, 187 TransportMode transportMode) { 188 mClientIP = clientIP; 189 mTransportMode = transportMode; 190 191 if (transportMode == TRANSPORT_TCP_INTERLEAVED) { 192 mRTPChannel = clientRtp; 193 mRTCPChannel = clientRtcp; 194 mRTPPort = 0; 195 mRTPSessionID = 0; 196 mRTCPSessionID = 0; 197 return OK; 198 } 199 200 mRTPChannel = 0; 201 mRTCPChannel = 0; 202 203 if (mTransportMode == TRANSPORT_TCP) { 204 // XXX This is wrong, we need to allocate sockets here, we only 205 // need to do this because the dongles are not establishing their 206 // end until after PLAY instead of before SETUP. 207 mRTPPort = 20000; 208 mRTPSessionID = 0; 209 mRTCPSessionID = 0; 210 mClientRTPPort = clientRtp; 211 mClientRTCPPort = clientRtcp; 212 return OK; 213 } 214 215 int serverRtp; 216 217 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id()); 218 sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id()); 219 220#if ENABLE_RETRANSMISSION 221 sp<AMessage> rtpRetransmissionNotify = 222 new AMessage(kWhatRTPRetransmissionNotify, id()); 223 224 sp<AMessage> rtcpRetransmissionNotify = 225 new AMessage(kWhatRTCPRetransmissionNotify, id()); 226#endif 227 228 status_t err; 229 for (serverRtp = 15550;; serverRtp += 2) { 230 int32_t rtpSession; 231 if (mTransportMode == TRANSPORT_UDP) { 232 err = mNetSession->createUDPSession( 233 serverRtp, clientIP, clientRtp, 234 rtpNotify, &rtpSession); 235 } else { 236 err = mNetSession->createTCPDatagramSession( 237 serverRtp, clientIP, clientRtp, 238 rtpNotify, &rtpSession); 239 } 240 241 if (err != OK) { 242 ALOGI("failed to create RTP socket on port %d", serverRtp); 243 continue; 244 } 245 246 int32_t rtcpSession = 0; 247 248 if (clientRtcp >= 0) { 249 if (mTransportMode == TRANSPORT_UDP) { 250 err = mNetSession->createUDPSession( 251 serverRtp + 1, clientIP, clientRtcp, 252 rtcpNotify, &rtcpSession); 253 } else { 254 err = mNetSession->createTCPDatagramSession( 255 serverRtp + 1, clientIP, clientRtcp, 256 rtcpNotify, &rtcpSession); 257 } 258 259 if (err != OK) { 260 ALOGI("failed to create RTCP socket on port %d", serverRtp + 1); 261 262 mNetSession->destroySession(rtpSession); 263 continue; 264 } 265 } 266 267#if ENABLE_RETRANSMISSION 268 if (mTransportMode == TRANSPORT_UDP) { 269 int32_t rtpRetransmissionSession; 270 271 err = mNetSession->createUDPSession( 272 serverRtp + kRetransmissionPortOffset, 273 clientIP, 274 clientRtp + kRetransmissionPortOffset, 275 rtpRetransmissionNotify, 276 &rtpRetransmissionSession); 277 278 if (err != OK) { 279 mNetSession->destroySession(rtcpSession); 280 mNetSession->destroySession(rtpSession); 281 continue; 282 } 283 284 CHECK_GE(clientRtcp, 0); 285 286 int32_t rtcpRetransmissionSession; 287 err = mNetSession->createUDPSession( 288 serverRtp + 1 + kRetransmissionPortOffset, 289 clientIP, 290 clientRtp + 1 + kRetransmissionPortOffset, 291 rtcpRetransmissionNotify, 292 &rtcpRetransmissionSession); 293 294 if (err != OK) { 295 mNetSession->destroySession(rtpRetransmissionSession); 296 mNetSession->destroySession(rtcpSession); 297 mNetSession->destroySession(rtpSession); 298 continue; 299 } 300 301 mRTPRetransmissionSessionID = rtpRetransmissionSession; 302 mRTCPRetransmissionSessionID = rtcpRetransmissionSession; 303 304 ALOGI("rtpRetransmissionSessionID = %d, " 305 "rtcpRetransmissionSessionID = %d", 306 rtpRetransmissionSession, rtcpRetransmissionSession); 307 } 308#endif 309 310 mRTPPort = serverRtp; 311 mRTPSessionID = rtpSession; 312 mRTCPSessionID = rtcpSession; 313 314 ALOGI("rtpSessionID = %d, rtcpSessionID = %d", rtpSession, rtcpSession); 315 break; 316 } 317 318 if (mRTPPort == 0) { 319 return UNKNOWN_ERROR; 320 } 321 322 return OK; 323} 324 325status_t Sender::finishInit() { 326 if (mTransportMode != TRANSPORT_TCP) { 327 notifyInitDone(); 328 return OK; 329 } 330 331 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id()); 332 333 status_t err = mNetSession->createTCPDatagramSession( 334 mRTPPort, mClientIP.c_str(), mClientRTPPort, 335 rtpNotify, &mRTPSessionID); 336 337 if (err != OK) { 338 return err; 339 } 340 341 if (mClientRTCPPort >= 0) { 342 sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id()); 343 344 err = mNetSession->createTCPDatagramSession( 345 mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort, 346 rtcpNotify, &mRTCPSessionID); 347 348 if (err != OK) { 349 return err; 350 } 351 } 352 353 return OK; 354} 355 356int32_t Sender::getRTPPort() const { 357 return mRTPPort; 358} 359 360void Sender::queuePackets( 361 int64_t timeUs, const sp<ABuffer> &packets) { 362 bool isVideo = false; 363 364 int32_t dummy; 365 if (packets->meta()->findInt32("isVideo", &dummy)) { 366 isVideo = true; 367 } 368 369 int64_t delayUs; 370 int64_t whenUs; 371 372 if (mFirstOutputBufferReadyTimeUs < 0ll) { 373 mFirstOutputBufferReadyTimeUs = timeUs; 374 mFirstOutputBufferSentTimeUs = whenUs = ALooper::GetNowUs(); 375 delayUs = 0ll; 376 } else { 377 int64_t nowUs = ALooper::GetNowUs(); 378 379 whenUs = (timeUs - mFirstOutputBufferReadyTimeUs) 380 + mFirstOutputBufferSentTimeUs; 381 382 delayUs = whenUs - nowUs; 383 } 384 385 sp<AMessage> msg = new AMessage(kWhatQueuePackets, id()); 386 msg->setBuffer("packets", packets); 387 388 packets->meta()->setInt64("timeUs", timeUs); 389 packets->meta()->setInt64("whenUs", whenUs); 390 packets->meta()->setInt64("delayUs", delayUs); 391 msg->post(delayUs > 0 ? delayUs : 0); 392} 393 394void Sender::onMessageReceived(const sp<AMessage> &msg) { 395 switch (msg->what()) { 396 case kWhatRTPNotify: 397 case kWhatRTCPNotify: 398#if ENABLE_RETRANSMISSION 399 case kWhatRTPRetransmissionNotify: 400 case kWhatRTCPRetransmissionNotify: 401#endif 402 { 403 int32_t reason; 404 CHECK(msg->findInt32("reason", &reason)); 405 406 switch (reason) { 407 case ANetworkSession::kWhatError: 408 { 409 int32_t sessionID; 410 CHECK(msg->findInt32("sessionID", &sessionID)); 411 412 int32_t err; 413 CHECK(msg->findInt32("err", &err)); 414 415 int32_t errorOccuredDuringSend; 416 CHECK(msg->findInt32("send", &errorOccuredDuringSend)); 417 418 AString detail; 419 CHECK(msg->findString("detail", &detail)); 420 421 if ((msg->what() == kWhatRTPNotify 422#if ENABLE_RETRANSMISSION 423 || msg->what() == kWhatRTPRetransmissionNotify 424#endif 425 ) && !errorOccuredDuringSend) { 426 // This is ok, we don't expect to receive anything on 427 // the RTP socket. 428 break; 429 } 430 431 ALOGE("An error occurred during %s in session %d " 432 "(%d, '%s' (%s)).", 433 errorOccuredDuringSend ? "send" : "receive", 434 sessionID, 435 err, 436 detail.c_str(), 437 strerror(-err)); 438 439 mNetSession->destroySession(sessionID); 440 441 if (sessionID == mRTPSessionID) { 442 mRTPSessionID = 0; 443 } else if (sessionID == mRTCPSessionID) { 444 mRTCPSessionID = 0; 445 } 446#if ENABLE_RETRANSMISSION 447 else if (sessionID == mRTPRetransmissionSessionID) { 448 mRTPRetransmissionSessionID = 0; 449 } else if (sessionID == mRTCPRetransmissionSessionID) { 450 mRTCPRetransmissionSessionID = 0; 451 } 452#endif 453 454 notifySessionDead(); 455 break; 456 } 457 458 case ANetworkSession::kWhatDatagram: 459 { 460 int32_t sessionID; 461 CHECK(msg->findInt32("sessionID", &sessionID)); 462 463 sp<ABuffer> data; 464 CHECK(msg->findBuffer("data", &data)); 465 466 status_t err; 467 if (msg->what() == kWhatRTCPNotify 468#if ENABLE_RETRANSMISSION 469 || msg->what() == kWhatRTCPRetransmissionNotify 470#endif 471 ) 472 { 473 err = parseRTCP(data); 474 } 475 break; 476 } 477 478 case ANetworkSession::kWhatConnected: 479 { 480 CHECK_EQ(mTransportMode, TRANSPORT_TCP); 481 482 int32_t sessionID; 483 CHECK(msg->findInt32("sessionID", &sessionID)); 484 485 if (sessionID == mRTPSessionID) { 486 CHECK(!mRTPConnected); 487 mRTPConnected = true; 488 ALOGI("RTP Session now connected."); 489 } else if (sessionID == mRTCPSessionID) { 490 CHECK(!mRTCPConnected); 491 mRTCPConnected = true; 492 ALOGI("RTCP Session now connected."); 493 } else { 494 TRESPASS(); 495 } 496 497 if (mRTPConnected 498 && (mClientRTCPPort < 0 || mRTCPConnected)) { 499 notifyInitDone(); 500 } 501 break; 502 } 503 504 default: 505 TRESPASS(); 506 } 507 break; 508 } 509 510 case kWhatQueuePackets: 511 { 512 sp<ABuffer> packets; 513 CHECK(msg->findBuffer("packets", &packets)); 514 515 onQueuePackets(packets); 516 break; 517 } 518 519 case kWhatSendSR: 520 { 521 mSendSRPending = false; 522 523 if (mRTCPSessionID == 0) { 524 break; 525 } 526 527 onSendSR(); 528 529 scheduleSendSR(); 530 break; 531 } 532 } 533} 534 535void Sender::onQueuePackets(const sp<ABuffer> &packets) { 536#if DEBUG_JITTER 537 int32_t dummy; 538 if (packets->meta()->findInt32("isVideo", &dummy)) { 539 static int64_t lastTimeUs = 0ll; 540 int64_t nowUs = ALooper::GetNowUs(); 541 542 static TimeSeries series; 543 series.add((double)(nowUs - lastTimeUs)); 544 545 ALOGI("deltaTimeUs = %lld us, mean %.2f, sdev %.2f", 546 nowUs - lastTimeUs, series.mean(), series.sdev()); 547 548 lastTimeUs = nowUs; 549 } 550#endif 551 552 int64_t startTimeUs = ALooper::GetNowUs(); 553 554 for (size_t offset = 0; 555 offset < packets->size(); offset += 188) { 556 bool lastTSPacket = (offset + 188 >= packets->size()); 557 558 appendTSData( 559 packets->data() + offset, 560 188, 561 true /* timeDiscontinuity */, 562 lastTSPacket /* flush */); 563 } 564 565#if 0 566 int64_t netTimeUs = ALooper::GetNowUs() - startTimeUs; 567 568 int64_t whenUs; 569 CHECK(packets->meta()->findInt64("whenUs", &whenUs)); 570 571 int64_t delayUs; 572 CHECK(packets->meta()->findInt64("delayUs", &delayUs)); 573 574 bool isVideo = false; 575 int32_t dummy; 576 if (packets->meta()->findInt32("isVideo", &dummy)) { 577 isVideo = true; 578 } 579 580 int64_t nowUs = ALooper::GetNowUs(); 581 582 if (nowUs - whenUs > 2000) { 583 ALOGI("[%s] delayUs = %lld us, delta = %lld us", 584 isVideo ? "video" : "audio", delayUs, nowUs - netTimeUs - whenUs); 585 } 586#endif 587 588#if LOG_TRANSPORT_STREAM 589 if (mLogFile != NULL) { 590 fwrite(packets->data(), 1, packets->size(), mLogFile); 591 } 592#endif 593} 594 595ssize_t Sender::appendTSData( 596 const void *data, size_t size, bool timeDiscontinuity, bool flush) { 597 CHECK_EQ(size, 188); 598 599 CHECK_LE(mTSQueue->size() + size, mTSQueue->capacity()); 600 601 memcpy(mTSQueue->data() + mTSQueue->size(), data, size); 602 mTSQueue->setRange(0, mTSQueue->size() + size); 603 604 if (flush || mTSQueue->size() == mTSQueue->capacity()) { 605 // flush 606 607 int64_t nowUs = ALooper::GetNowUs(); 608 609#if TRACK_BANDWIDTH 610 if (mFirstPacketTimeUs < 0ll) { 611 mFirstPacketTimeUs = nowUs; 612 } 613#endif 614 615 // 90kHz time scale 616 uint32_t rtpTime = (nowUs * 9ll) / 100ll; 617 618 uint8_t *rtp = mTSQueue->data(); 619 rtp[0] = 0x80; 620 rtp[1] = 33 | (timeDiscontinuity ? (1 << 7) : 0); // M-bit 621 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 622 rtp[3] = mRTPSeqNo & 0xff; 623 rtp[4] = rtpTime >> 24; 624 rtp[5] = (rtpTime >> 16) & 0xff; 625 rtp[6] = (rtpTime >> 8) & 0xff; 626 rtp[7] = rtpTime & 0xff; 627 rtp[8] = kSourceID >> 24; 628 rtp[9] = (kSourceID >> 16) & 0xff; 629 rtp[10] = (kSourceID >> 8) & 0xff; 630 rtp[11] = kSourceID & 0xff; 631 632 ++mRTPSeqNo; 633 ++mNumRTPSent; 634 mNumRTPOctetsSent += mTSQueue->size() - 12; 635 636 mLastRTPTime = rtpTime; 637 mLastNTPTime = GetNowNTP(); 638 639 if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { 640 sp<AMessage> notify = mNotify->dup(); 641 notify->setInt32("what", kWhatBinaryData); 642 643 sp<ABuffer> data = new ABuffer(mTSQueue->size()); 644 memcpy(data->data(), rtp, mTSQueue->size()); 645 646 notify->setInt32("channel", mRTPChannel); 647 notify->setBuffer("data", data); 648 notify->post(); 649 } else { 650 sendPacket(mRTPSessionID, rtp, mTSQueue->size()); 651 652#if TRACK_BANDWIDTH 653 mTotalBytesSent += mTSQueue->size(); 654 int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs; 655 656 if (delayUs > 0ll) { 657 ALOGI("approx. net bandwidth used: %.2f Mbit/sec", 658 mTotalBytesSent * 8.0 / delayUs); 659 } 660#endif 661 } 662 663#if ENABLE_RETRANSMISSION 664 mTSQueue->setInt32Data(mRTPSeqNo - 1); 665 666 mHistory.push_back(mTSQueue); 667 ++mHistoryLength; 668 669 if (mHistoryLength > kMaxHistoryLength) { 670 mTSQueue = *mHistory.begin(); 671 mHistory.erase(mHistory.begin()); 672 673 --mHistoryLength; 674 } else { 675 mTSQueue = new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); 676 } 677#endif 678 679 mTSQueue->setRange(0, 12); 680 } 681 682 return size; 683} 684 685void Sender::scheduleSendSR() { 686 if (mSendSRPending || mRTCPSessionID == 0) { 687 return; 688 } 689 690 mSendSRPending = true; 691 (new AMessage(kWhatSendSR, id()))->post(kSendSRIntervalUs); 692} 693 694void Sender::addSR(const sp<ABuffer> &buffer) { 695 uint8_t *data = buffer->data() + buffer->size(); 696 697 // TODO: Use macros/utility functions to clean up all the bitshifts below. 698 699 data[0] = 0x80 | 0; 700 data[1] = 200; // SR 701 data[2] = 0; 702 data[3] = 6; 703 data[4] = kSourceID >> 24; 704 data[5] = (kSourceID >> 16) & 0xff; 705 data[6] = (kSourceID >> 8) & 0xff; 706 data[7] = kSourceID & 0xff; 707 708 data[8] = mLastNTPTime >> (64 - 8); 709 data[9] = (mLastNTPTime >> (64 - 16)) & 0xff; 710 data[10] = (mLastNTPTime >> (64 - 24)) & 0xff; 711 data[11] = (mLastNTPTime >> 32) & 0xff; 712 data[12] = (mLastNTPTime >> 24) & 0xff; 713 data[13] = (mLastNTPTime >> 16) & 0xff; 714 data[14] = (mLastNTPTime >> 8) & 0xff; 715 data[15] = mLastNTPTime & 0xff; 716 717 data[16] = (mLastRTPTime >> 24) & 0xff; 718 data[17] = (mLastRTPTime >> 16) & 0xff; 719 data[18] = (mLastRTPTime >> 8) & 0xff; 720 data[19] = mLastRTPTime & 0xff; 721 722 data[20] = mNumRTPSent >> 24; 723 data[21] = (mNumRTPSent >> 16) & 0xff; 724 data[22] = (mNumRTPSent >> 8) & 0xff; 725 data[23] = mNumRTPSent & 0xff; 726 727 data[24] = mNumRTPOctetsSent >> 24; 728 data[25] = (mNumRTPOctetsSent >> 16) & 0xff; 729 data[26] = (mNumRTPOctetsSent >> 8) & 0xff; 730 data[27] = mNumRTPOctetsSent & 0xff; 731 732 buffer->setRange(buffer->offset(), buffer->size() + 28); 733} 734 735void Sender::addSDES(const sp<ABuffer> &buffer) { 736 uint8_t *data = buffer->data() + buffer->size(); 737 data[0] = 0x80 | 1; 738 data[1] = 202; // SDES 739 data[4] = kSourceID >> 24; 740 data[5] = (kSourceID >> 16) & 0xff; 741 data[6] = (kSourceID >> 8) & 0xff; 742 data[7] = kSourceID & 0xff; 743 744 size_t offset = 8; 745 746 data[offset++] = 1; // CNAME 747 748 static const char *kCNAME = "someone@somewhere"; 749 data[offset++] = strlen(kCNAME); 750 751 memcpy(&data[offset], kCNAME, strlen(kCNAME)); 752 offset += strlen(kCNAME); 753 754 data[offset++] = 7; // NOTE 755 756 static const char *kNOTE = "Hell's frozen over."; 757 data[offset++] = strlen(kNOTE); 758 759 memcpy(&data[offset], kNOTE, strlen(kNOTE)); 760 offset += strlen(kNOTE); 761 762 data[offset++] = 0; 763 764 if ((offset % 4) > 0) { 765 size_t count = 4 - (offset % 4); 766 switch (count) { 767 case 3: 768 data[offset++] = 0; 769 case 2: 770 data[offset++] = 0; 771 case 1: 772 data[offset++] = 0; 773 } 774 } 775 776 size_t numWords = (offset / 4) - 1; 777 data[2] = numWords >> 8; 778 data[3] = numWords & 0xff; 779 780 buffer->setRange(buffer->offset(), buffer->size() + offset); 781} 782 783// static 784uint64_t Sender::GetNowNTP() { 785 uint64_t nowUs = ALooper::GetNowUs(); 786 787 nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; 788 789 uint64_t hi = nowUs / 1000000ll; 790 uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; 791 792 return (hi << 32) | lo; 793} 794 795void Sender::onSendSR() { 796 sp<ABuffer> buffer = new ABuffer(1500); 797 buffer->setRange(0, 0); 798 799 addSR(buffer); 800 addSDES(buffer); 801 802 if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) { 803 sp<AMessage> notify = mNotify->dup(); 804 notify->setInt32("what", kWhatBinaryData); 805 notify->setInt32("channel", mRTCPChannel); 806 notify->setBuffer("data", buffer); 807 notify->post(); 808 } else { 809 sendPacket(mRTCPSessionID, buffer->data(), buffer->size()); 810 } 811 812 ++mNumSRsSent; 813} 814 815#if ENABLE_RETRANSMISSION 816status_t Sender::parseTSFB( 817 const uint8_t *data, size_t size) { 818 if ((data[0] & 0x1f) != 1) { 819 return ERROR_UNSUPPORTED; // We only support NACK for now. 820 } 821 822 uint32_t srcId = U32_AT(&data[8]); 823 if (srcId != kSourceID) { 824 return ERROR_MALFORMED; 825 } 826 827 for (size_t i = 12; i < size; i += 4) { 828 uint16_t seqNo = U16_AT(&data[i]); 829 uint16_t blp = U16_AT(&data[i + 2]); 830 831 List<sp<ABuffer> >::iterator it = mHistory.begin(); 832 bool foundSeqNo = false; 833 while (it != mHistory.end()) { 834 const sp<ABuffer> &buffer = *it; 835 836 uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; 837 838 bool retransmit = false; 839 if (bufferSeqNo == seqNo) { 840 retransmit = true; 841 } else if (blp != 0) { 842 for (size_t i = 0; i < 16; ++i) { 843 if ((blp & (1 << i)) 844 && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { 845 blp &= ~(1 << i); 846 retransmit = true; 847 } 848 } 849 } 850 851 if (retransmit) { 852 ALOGI("retransmitting seqNo %d", bufferSeqNo); 853 854 sp<ABuffer> retransRTP = new ABuffer(2 + buffer->size()); 855 uint8_t *rtp = retransRTP->data(); 856 memcpy(rtp, buffer->data(), 12); 857 rtp[2] = (mRTPRetransmissionSeqNo >> 8) & 0xff; 858 rtp[3] = mRTPRetransmissionSeqNo & 0xff; 859 rtp[12] = (bufferSeqNo >> 8) & 0xff; 860 rtp[13] = bufferSeqNo & 0xff; 861 memcpy(&rtp[14], buffer->data() + 12, buffer->size() - 12); 862 863 ++mRTPRetransmissionSeqNo; 864 865 sendPacket( 866 mRTPRetransmissionSessionID, 867 retransRTP->data(), retransRTP->size()); 868 869 if (bufferSeqNo == seqNo) { 870 foundSeqNo = true; 871 } 872 873 if (foundSeqNo && blp == 0) { 874 break; 875 } 876 } 877 878 ++it; 879 } 880 881 if (!foundSeqNo || blp != 0) { 882 ALOGI("Some sequence numbers were no longer available for " 883 "retransmission"); 884 } 885 } 886 887 return OK; 888} 889#endif 890 891status_t Sender::parseRTCP( 892 const sp<ABuffer> &buffer) { 893 const uint8_t *data = buffer->data(); 894 size_t size = buffer->size(); 895 896 while (size > 0) { 897 if (size < 8) { 898 // Too short to be a valid RTCP header 899 return ERROR_MALFORMED; 900 } 901 902 if ((data[0] >> 6) != 2) { 903 // Unsupported version. 904 return ERROR_UNSUPPORTED; 905 } 906 907 if (data[0] & 0x20) { 908 // Padding present. 909 910 size_t paddingLength = data[size - 1]; 911 912 if (paddingLength + 12 > size) { 913 // If we removed this much padding we'd end up with something 914 // that's too short to be a valid RTP header. 915 return ERROR_MALFORMED; 916 } 917 918 size -= paddingLength; 919 } 920 921 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 922 923 if (size < headerLength) { 924 // Only received a partial packet? 925 return ERROR_MALFORMED; 926 } 927 928 switch (data[1]) { 929 case 200: 930 case 201: // RR 931 case 202: // SDES 932 case 203: 933 case 204: // APP 934 break; 935 936#if ENABLE_RETRANSMISSION 937 case 205: // TSFB (transport layer specific feedback) 938 parseTSFB(data, headerLength); 939 break; 940#endif 941 942 case 206: // PSFB (payload specific feedback) 943 hexdump(data, headerLength); 944 break; 945 946 default: 947 { 948 ALOGW("Unknown RTCP packet type %u of size %d", 949 (unsigned)data[1], headerLength); 950 break; 951 } 952 } 953 954 data += headerLength; 955 size -= headerLength; 956 } 957 958 return OK; 959} 960 961status_t Sender::sendPacket( 962 int32_t sessionID, const void *data, size_t size) { 963 return mNetSession->sendRequest(sessionID, data, size); 964} 965 966void Sender::notifyInitDone() { 967 sp<AMessage> notify = mNotify->dup(); 968 notify->setInt32("what", kWhatInitDone); 969 notify->post(); 970} 971 972void Sender::notifySessionDead() { 973 sp<AMessage> notify = mNotify->dup(); 974 notify->setInt32("what", kWhatSessionDead); 975 notify->post(); 976} 977 978} // namespace android 979 980