RTPSender.cpp revision 2aea9552aeba92bbaf9e56c666049ea2d14057b5
1/* 2 * Copyright 2013, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "RTPSender" 19#include <utils/Log.h> 20 21#include "RTPSender.h" 22 23#include "ANetworkSession.h" 24 25#include <media/stagefright/foundation/ABuffer.h> 26#include <media/stagefright/foundation/ADebug.h> 27#include <media/stagefright/foundation/AMessage.h> 28#include <media/stagefright/foundation/hexdump.h> 29#include <media/stagefright/MediaErrors.h> 30#include <media/stagefright/Utils.h> 31 32#include "include/avc_utils.h" 33 34namespace android { 35 36RTPSender::RTPSender( 37 const sp<ANetworkSession> &netSession, 38 const sp<AMessage> ¬ify) 39 : mNetSession(netSession), 40 mNotify(notify), 41 mRTPMode(TRANSPORT_UNDEFINED), 42 mRTCPMode(TRANSPORT_UNDEFINED), 43 mRTPSessionID(0), 44 mRTCPSessionID(0), 45 mRTPConnected(false), 46 mRTCPConnected(false), 47 mLastNTPTime(0), 48 mLastRTPTime(0), 49 mNumRTPSent(0), 50 mNumRTPOctetsSent(0), 51 mNumSRsSent(0), 52 mRTPSeqNo(0), 53 mHistorySize(0) { 54} 55 56RTPSender::~RTPSender() { 57 if (mRTCPSessionID != 0) { 58 mNetSession->destroySession(mRTCPSessionID); 59 mRTCPSessionID = 0; 60 } 61 62 if (mRTPSessionID != 0) { 63 mNetSession->destroySession(mRTPSessionID); 64 mRTPSessionID = 0; 65 } 66} 67 68// static 69int32_t RTPBase::PickRandomRTPPort() { 70 // Pick an even integer in range [1024, 65534) 71 72 static const size_t kRange = (65534 - 1024) / 2; 73 74 return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024; 75} 76 77status_t RTPSender::initAsync( 78 const char *remoteHost, 79 int32_t remoteRTPPort, 80 TransportMode rtpMode, 81 int32_t remoteRTCPPort, 82 TransportMode rtcpMode, 83 int32_t *outLocalRTPPort) { 84 if (mRTPMode != TRANSPORT_UNDEFINED 85 || rtpMode == TRANSPORT_UNDEFINED 86 || rtpMode == TRANSPORT_NONE 87 || rtcpMode == TRANSPORT_UNDEFINED) { 88 return INVALID_OPERATION; 89 } 90 91 CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED); 92 CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED); 93 94 if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0) 95 || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) { 96 return INVALID_OPERATION; 97 } 98 99 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id()); 100 101 sp<AMessage> rtcpNotify; 102 if (remoteRTCPPort >= 0) { 103 rtcpNotify = new AMessage(kWhatRTCPNotify, id()); 104 } 105 106 CHECK_EQ(mRTPSessionID, 0); 107 CHECK_EQ(mRTCPSessionID, 0); 108 109 int32_t localRTPPort; 110 111 for (;;) { 112 localRTPPort = PickRandomRTPPort(); 113 114 status_t err; 115 if (rtpMode == TRANSPORT_UDP) { 116 err = mNetSession->createUDPSession( 117 localRTPPort, 118 remoteHost, 119 remoteRTPPort, 120 rtpNotify, 121 &mRTPSessionID); 122 } else { 123 CHECK_EQ(rtpMode, TRANSPORT_TCP); 124 err = mNetSession->createTCPDatagramSession( 125 localRTPPort, 126 remoteHost, 127 remoteRTPPort, 128 rtpNotify, 129 &mRTPSessionID); 130 } 131 132 if (err != OK) { 133 continue; 134 } 135 136 if (remoteRTCPPort < 0) { 137 break; 138 } 139 140 if (rtcpMode == TRANSPORT_UDP) { 141 err = mNetSession->createUDPSession( 142 localRTPPort + 1, 143 remoteHost, 144 remoteRTCPPort, 145 rtcpNotify, 146 &mRTCPSessionID); 147 } else { 148 CHECK_EQ(rtcpMode, TRANSPORT_TCP); 149 err = mNetSession->createTCPDatagramSession( 150 localRTPPort + 1, 151 remoteHost, 152 remoteRTCPPort, 153 rtcpNotify, 154 &mRTCPSessionID); 155 } 156 157 if (err == OK) { 158 break; 159 } 160 161 mNetSession->destroySession(mRTPSessionID); 162 mRTPSessionID = 0; 163 } 164 165 if (rtpMode == TRANSPORT_UDP) { 166 mRTPConnected = true; 167 } 168 169 if (rtcpMode == TRANSPORT_UDP) { 170 mRTCPConnected = true; 171 } 172 173 mRTPMode = rtpMode; 174 mRTCPMode = rtcpMode; 175 *outLocalRTPPort = localRTPPort; 176 177 if (mRTPMode == TRANSPORT_UDP 178 && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) { 179 notifyInitDone(OK); 180 } 181 182 return OK; 183} 184 185status_t RTPSender::queueBuffer( 186 const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) { 187 status_t err; 188 189 switch (mode) { 190 case PACKETIZATION_TRANSPORT_STREAM: 191 err = queueTSPackets(buffer, packetType); 192 break; 193 194 case PACKETIZATION_H264: 195 err = queueAVCBuffer(buffer, packetType); 196 break; 197 198 default: 199 TRESPASS(); 200 } 201 202 return err; 203} 204 205status_t RTPSender::queueTSPackets( 206 const sp<ABuffer> &tsPackets, uint8_t packetType) { 207 CHECK_EQ(0, tsPackets->size() % 188); 208 209 int64_t timeUs; 210 CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs)); 211 212 const size_t numTSPackets = tsPackets->size() / 188; 213 214 size_t srcOffset = 0; 215 while (srcOffset < tsPackets->size()) { 216 sp<ABuffer> udpPacket = 217 new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); 218 219 udpPacket->setInt32Data(mRTPSeqNo); 220 221 uint8_t *rtp = udpPacket->data(); 222 rtp[0] = 0x80; 223 rtp[1] = packetType; 224 225 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 226 rtp[3] = mRTPSeqNo & 0xff; 227 ++mRTPSeqNo; 228 229 int64_t nowUs = ALooper::GetNowUs(); 230 uint32_t rtpTime = (nowUs * 9) / 100ll; 231 232 rtp[4] = rtpTime >> 24; 233 rtp[5] = (rtpTime >> 16) & 0xff; 234 rtp[6] = (rtpTime >> 8) & 0xff; 235 rtp[7] = rtpTime & 0xff; 236 237 rtp[8] = kSourceID >> 24; 238 rtp[9] = (kSourceID >> 16) & 0xff; 239 rtp[10] = (kSourceID >> 8) & 0xff; 240 rtp[11] = kSourceID & 0xff; 241 242 size_t numTSPackets = (tsPackets->size() - srcOffset) / 188; 243 if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) { 244 numTSPackets = kMaxNumTSPacketsPerRTPPacket; 245 } 246 247 memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188); 248 249 udpPacket->setRange(0, 12 + numTSPackets * 188); 250 251 srcOffset += numTSPackets * 188; 252 bool isLastPacket = (srcOffset == tsPackets->size()); 253 254 status_t err = sendRTPPacket( 255 udpPacket, 256 true /* storeInHistory */, 257 isLastPacket /* timeValid */, 258 timeUs); 259 260 if (err != OK) { 261 return err; 262 } 263 } 264 265 return OK; 266} 267 268status_t RTPSender::queueAVCBuffer( 269 const sp<ABuffer> &accessUnit, uint8_t packetType) { 270 int64_t timeUs; 271 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 272 273 uint32_t rtpTime = (timeUs * 9 / 100ll); 274 275 List<sp<ABuffer> > packets; 276 277 sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize); 278 size_t outBytesUsed = 12; // Placeholder for RTP header. 279 280 const uint8_t *data = accessUnit->data(); 281 size_t size = accessUnit->size(); 282 const uint8_t *nalStart; 283 size_t nalSize; 284 while (getNextNALUnit( 285 &data, &size, &nalStart, &nalSize, 286 true /* startCodeFollows */) == OK) { 287 size_t bytesNeeded = nalSize + 2; 288 if (outBytesUsed == 12) { 289 ++bytesNeeded; 290 } 291 292 if (outBytesUsed + bytesNeeded > out->capacity()) { 293 bool emitSingleNALPacket = false; 294 295 if (outBytesUsed == 12 296 && outBytesUsed + nalSize <= out->capacity()) { 297 // We haven't emitted anything into the current packet yet and 298 // this NAL unit fits into a single-NAL-unit-packet while 299 // it wouldn't have fit as part of a STAP-A packet. 300 301 memcpy(out->data() + outBytesUsed, nalStart, nalSize); 302 outBytesUsed += nalSize; 303 304 emitSingleNALPacket = true; 305 } 306 307 if (outBytesUsed > 12) { 308 out->setRange(0, outBytesUsed); 309 packets.push_back(out); 310 out = new ABuffer(kMaxUDPPacketSize); 311 outBytesUsed = 12; // Placeholder for RTP header 312 } 313 314 if (emitSingleNALPacket) { 315 continue; 316 } 317 } 318 319 if (outBytesUsed + bytesNeeded <= out->capacity()) { 320 uint8_t *dst = out->data() + outBytesUsed; 321 322 if (outBytesUsed == 12) { 323 *dst++ = 24; // STAP-A header 324 } 325 326 *dst++ = (nalSize >> 8) & 0xff; 327 *dst++ = nalSize & 0xff; 328 memcpy(dst, nalStart, nalSize); 329 330 outBytesUsed += bytesNeeded; 331 continue; 332 } 333 334 // This single NAL unit does not fit into a single RTP packet, 335 // we need to emit an FU-A. 336 337 CHECK_EQ(outBytesUsed, 12u); 338 339 uint8_t nalType = nalStart[0] & 0x1f; 340 uint8_t nri = (nalStart[0] >> 5) & 3; 341 342 size_t srcOffset = 1; 343 while (srcOffset < nalSize) { 344 size_t copy = out->capacity() - outBytesUsed - 2; 345 if (copy > nalSize - srcOffset) { 346 copy = nalSize - srcOffset; 347 } 348 349 uint8_t *dst = out->data() + outBytesUsed; 350 dst[0] = (nri << 5) | 28; 351 352 dst[1] = nalType; 353 354 if (srcOffset == 1) { 355 dst[1] |= 0x80; 356 } 357 358 if (srcOffset + copy == nalSize) { 359 dst[1] |= 0x40; 360 } 361 362 memcpy(&dst[2], nalStart + srcOffset, copy); 363 srcOffset += copy; 364 365 out->setRange(0, outBytesUsed + copy + 2); 366 367 packets.push_back(out); 368 out = new ABuffer(kMaxUDPPacketSize); 369 outBytesUsed = 12; // Placeholder for RTP header 370 } 371 } 372 373 if (outBytesUsed > 12) { 374 out->setRange(0, outBytesUsed); 375 packets.push_back(out); 376 } 377 378 while (!packets.empty()) { 379 sp<ABuffer> out = *packets.begin(); 380 packets.erase(packets.begin()); 381 382 out->setInt32Data(mRTPSeqNo); 383 384 bool last = packets.empty(); 385 386 uint8_t *dst = out->data(); 387 388 dst[0] = 0x80; 389 390 dst[1] = packetType; 391 if (last) { 392 dst[1] |= 1 << 7; // M-bit 393 } 394 395 dst[2] = (mRTPSeqNo >> 8) & 0xff; 396 dst[3] = mRTPSeqNo & 0xff; 397 ++mRTPSeqNo; 398 399 dst[4] = rtpTime >> 24; 400 dst[5] = (rtpTime >> 16) & 0xff; 401 dst[6] = (rtpTime >> 8) & 0xff; 402 dst[7] = rtpTime & 0xff; 403 dst[8] = kSourceID >> 24; 404 dst[9] = (kSourceID >> 16) & 0xff; 405 dst[10] = (kSourceID >> 8) & 0xff; 406 dst[11] = kSourceID & 0xff; 407 408 status_t err = sendRTPPacket(out, true /* storeInHistory */); 409 410 if (err != OK) { 411 return err; 412 } 413 } 414 415 return OK; 416} 417 418status_t RTPSender::sendRTPPacket( 419 const sp<ABuffer> &buffer, bool storeInHistory, 420 bool timeValid, int64_t timeUs) { 421 CHECK(mRTPConnected); 422 423 status_t err = mNetSession->sendRequest( 424 mRTPSessionID, buffer->data(), buffer->size(), 425 timeValid, timeUs); 426 427 if (err != OK) { 428 return err; 429 } 430 431 mLastNTPTime = GetNowNTP(); 432 mLastRTPTime = U32_AT(buffer->data() + 4); 433 434 ++mNumRTPSent; 435 mNumRTPOctetsSent += buffer->size() - 12; 436 437 if (storeInHistory) { 438 if (mHistorySize == kMaxHistorySize) { 439 mHistory.erase(mHistory.begin()); 440 } else { 441 ++mHistorySize; 442 } 443 mHistory.push_back(buffer); 444 } 445 446 return OK; 447} 448 449// static 450uint64_t RTPSender::GetNowNTP() { 451 struct timeval tv; 452 gettimeofday(&tv, NULL /* timezone */); 453 454 uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; 455 456 nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; 457 458 uint64_t hi = nowUs / 1000000ll; 459 uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; 460 461 return (hi << 32) | lo; 462} 463 464void RTPSender::onMessageReceived(const sp<AMessage> &msg) { 465 switch (msg->what()) { 466 case kWhatRTPNotify: 467 case kWhatRTCPNotify: 468 onNetNotify(msg->what() == kWhatRTPNotify, msg); 469 break; 470 471 default: 472 TRESPASS(); 473 } 474} 475 476void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) { 477 int32_t reason; 478 CHECK(msg->findInt32("reason", &reason)); 479 480 switch (reason) { 481 case ANetworkSession::kWhatError: 482 { 483 int32_t sessionID; 484 CHECK(msg->findInt32("sessionID", &sessionID)); 485 486 int32_t err; 487 CHECK(msg->findInt32("err", &err)); 488 489 int32_t errorOccuredDuringSend; 490 CHECK(msg->findInt32("send", &errorOccuredDuringSend)); 491 492 AString detail; 493 CHECK(msg->findString("detail", &detail)); 494 495 ALOGE("An error occurred during %s in session %d " 496 "(%d, '%s' (%s)).", 497 errorOccuredDuringSend ? "send" : "receive", 498 sessionID, 499 err, 500 detail.c_str(), 501 strerror(-err)); 502 503 mNetSession->destroySession(sessionID); 504 505 if (sessionID == mRTPSessionID) { 506 mRTPSessionID = 0; 507 } else if (sessionID == mRTCPSessionID) { 508 mRTCPSessionID = 0; 509 } 510 511 if (!mRTPConnected 512 || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) { 513 // We haven't completed initialization, attach the error 514 // to the notification instead. 515 notifyInitDone(err); 516 break; 517 } 518 519 notifyError(err); 520 break; 521 } 522 523 case ANetworkSession::kWhatDatagram: 524 { 525 sp<ABuffer> data; 526 CHECK(msg->findBuffer("data", &data)); 527 528 if (isRTP) { 529 ALOGW("Huh? Received data on RTP connection..."); 530 } else { 531 onRTCPData(data); 532 } 533 break; 534 } 535 536 case ANetworkSession::kWhatConnected: 537 { 538 int32_t sessionID; 539 CHECK(msg->findInt32("sessionID", &sessionID)); 540 541 if (isRTP) { 542 CHECK_EQ(mRTPMode, TRANSPORT_TCP); 543 CHECK_EQ(sessionID, mRTPSessionID); 544 mRTPConnected = true; 545 } else { 546 CHECK_EQ(mRTCPMode, TRANSPORT_TCP); 547 CHECK_EQ(sessionID, mRTCPSessionID); 548 mRTCPConnected = true; 549 } 550 551 if (mRTPConnected 552 && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) { 553 notifyInitDone(OK); 554 } 555 break; 556 } 557 558 case ANetworkSession::kWhatNetworkStall: 559 { 560 size_t numBytesQueued; 561 CHECK(msg->findSize("numBytesQueued", &numBytesQueued)); 562 563 notifyNetworkStall(numBytesQueued); 564 break; 565 } 566 567 default: 568 TRESPASS(); 569 } 570} 571 572status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) { 573 const uint8_t *data = buffer->data(); 574 size_t size = buffer->size(); 575 576 while (size > 0) { 577 if (size < 8) { 578 // Too short to be a valid RTCP header 579 return ERROR_MALFORMED; 580 } 581 582 if ((data[0] >> 6) != 2) { 583 // Unsupported version. 584 return ERROR_UNSUPPORTED; 585 } 586 587 if (data[0] & 0x20) { 588 // Padding present. 589 590 size_t paddingLength = data[size - 1]; 591 592 if (paddingLength + 12 > size) { 593 // If we removed this much padding we'd end up with something 594 // that's too short to be a valid RTP header. 595 return ERROR_MALFORMED; 596 } 597 598 size -= paddingLength; 599 } 600 601 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 602 603 if (size < headerLength) { 604 // Only received a partial packet? 605 return ERROR_MALFORMED; 606 } 607 608 switch (data[1]) { 609 case 200: 610 case 201: // RR 611 parseReceiverReport(data, headerLength); 612 break; 613 614 case 202: // SDES 615 case 203: 616 break; 617 618 case 204: // APP 619 parseAPP(data, headerLength); 620 break; 621 622 case 205: // TSFB (transport layer specific feedback) 623 parseTSFB(data, headerLength); 624 break; 625 626 case 206: // PSFB (payload specific feedback) 627 // hexdump(data, headerLength); 628 break; 629 630 default: 631 { 632 ALOGW("Unknown RTCP packet type %u of size %d", 633 (unsigned)data[1], headerLength); 634 break; 635 } 636 } 637 638 data += headerLength; 639 size -= headerLength; 640 } 641 642 return OK; 643} 644 645status_t RTPSender::parseReceiverReport(const uint8_t *data, size_t size) { 646 // hexdump(data, size); 647 648 float fractionLost = data[12] / 256.0f; 649 650 ALOGI("lost %.2f %% of packets during report interval.", 651 100.0f * fractionLost); 652 653 return OK; 654} 655 656status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) { 657 if ((data[0] & 0x1f) != 1) { 658 return ERROR_UNSUPPORTED; // We only support NACK for now. 659 } 660 661 uint32_t srcId = U32_AT(&data[8]); 662 if (srcId != kSourceID) { 663 return ERROR_MALFORMED; 664 } 665 666 for (size_t i = 12; i < size; i += 4) { 667 uint16_t seqNo = U16_AT(&data[i]); 668 uint16_t blp = U16_AT(&data[i + 2]); 669 670 List<sp<ABuffer> >::iterator it = mHistory.begin(); 671 bool foundSeqNo = false; 672 while (it != mHistory.end()) { 673 const sp<ABuffer> &buffer = *it; 674 675 uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; 676 677 bool retransmit = false; 678 if (bufferSeqNo == seqNo) { 679 retransmit = true; 680 } else if (blp != 0) { 681 for (size_t i = 0; i < 16; ++i) { 682 if ((blp & (1 << i)) 683 && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { 684 blp &= ~(1 << i); 685 retransmit = true; 686 } 687 } 688 } 689 690 if (retransmit) { 691 ALOGV("retransmitting seqNo %d", bufferSeqNo); 692 693 CHECK_EQ((status_t)OK, 694 sendRTPPacket(buffer, false /* storeInHistory */)); 695 696 if (bufferSeqNo == seqNo) { 697 foundSeqNo = true; 698 } 699 700 if (foundSeqNo && blp == 0) { 701 break; 702 } 703 } 704 705 ++it; 706 } 707 708 if (!foundSeqNo || blp != 0) { 709 ALOGI("Some sequence numbers were no longer available for " 710 "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)", 711 seqNo, foundSeqNo, blp); 712 713 if (!mHistory.empty()) { 714 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff; 715 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff; 716 717 ALOGI("have seq numbers from %d - %d", earliest, latest); 718 } 719 } 720 } 721 722 return OK; 723} 724 725status_t RTPSender::parseAPP(const uint8_t *data, size_t size) { 726 if (!memcmp("late", &data[8], 4)) { 727 int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]); 728 int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]); 729 730 sp<AMessage> notify = mNotify->dup(); 731 notify->setInt32("what", kWhatInformSender); 732 notify->setInt64("avgLatencyUs", avgLatencyUs); 733 notify->setInt64("maxLatencyUs", maxLatencyUs); 734 notify->post(); 735 } 736 737 return OK; 738} 739 740void RTPSender::notifyInitDone(status_t err) { 741 sp<AMessage> notify = mNotify->dup(); 742 notify->setInt32("what", kWhatInitDone); 743 notify->setInt32("err", err); 744 notify->post(); 745} 746 747void RTPSender::notifyError(status_t err) { 748 sp<AMessage> notify = mNotify->dup(); 749 notify->setInt32("what", kWhatError); 750 notify->setInt32("err", err); 751 notify->post(); 752} 753 754void RTPSender::notifyNetworkStall(size_t numBytesQueued) { 755 sp<AMessage> notify = mNotify->dup(); 756 notify->setInt32("what", kWhatNetworkStall); 757 notify->setSize("numBytesQueued", numBytesQueued); 758 notify->post(); 759} 760 761} // namespace android 762 763