1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "ARTPConnection" 19#include <utils/Log.h> 20 21#include "ARTPConnection.h" 22 23#include "ARTPSource.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/AMessage.h> 29#include <media/stagefright/foundation/AString.h> 30#include <media/stagefright/foundation/hexdump.h> 31 32#include <arpa/inet.h> 33#include <sys/socket.h> 34 35namespace android { 36 37static const size_t kMaxUDPSize = 1500; 38 39static uint16_t u16at(const uint8_t *data) { 40 return data[0] << 8 | data[1]; 41} 42 43static uint32_t u32at(const uint8_t *data) { 44 return u16at(data) << 16 | u16at(&data[2]); 45} 46 47static uint64_t u64at(const uint8_t *data) { 48 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 49} 50 51// static 52const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 53 54struct ARTPConnection::StreamInfo { 55 int mRTPSocket; 56 int mRTCPSocket; 57 sp<ASessionDescription> mSessionDesc; 58 size_t mIndex; 59 sp<AMessage> mNotifyMsg; 60 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 61 62 int64_t mNumRTCPPacketsReceived; 63 int64_t mNumRTPPacketsReceived; 64 struct sockaddr_in mRemoteRTCPAddr; 65 66 bool mIsInjected; 67}; 68 69ARTPConnection::ARTPConnection(uint32_t flags) 70 : mFlags(flags), 71 mPollEventPending(false), 72 mLastReceiverReportTimeUs(-1) { 73} 74 75ARTPConnection::~ARTPConnection() { 76} 77 78void ARTPConnection::addStream( 79 int rtpSocket, int rtcpSocket, 80 const sp<ASessionDescription> &sessionDesc, 81 size_t index, 82 const sp<AMessage> ¬ify, 83 bool injected) { 84 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 85 msg->setInt32("rtp-socket", rtpSocket); 86 msg->setInt32("rtcp-socket", rtcpSocket); 87 msg->setObject("session-desc", sessionDesc); 88 msg->setSize("index", index); 89 msg->setMessage("notify", notify); 90 msg->setInt32("injected", injected); 91 msg->post(); 92} 93 94void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 95 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 96 msg->setInt32("rtp-socket", rtpSocket); 97 msg->setInt32("rtcp-socket", rtcpSocket); 98 msg->post(); 99} 100 101static void bumpSocketBufferSize(int s) { 102 int size = 256 * 1024; 103 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 104} 105 106// static 107void ARTPConnection::MakePortPair( 108 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 109 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 110 CHECK_GE(*rtpSocket, 0); 111 112 bumpSocketBufferSize(*rtpSocket); 113 114 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 115 CHECK_GE(*rtcpSocket, 0); 116 117 bumpSocketBufferSize(*rtcpSocket); 118 119 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 120 start &= ~1; 121 122 for (unsigned port = start; port < 65536; port += 2) { 123 struct sockaddr_in addr; 124 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 125 addr.sin_family = AF_INET; 126 addr.sin_addr.s_addr = htonl(INADDR_ANY); 127 addr.sin_port = htons(port); 128 129 if (bind(*rtpSocket, 130 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 131 continue; 132 } 133 134 addr.sin_port = htons(port + 1); 135 136 if (bind(*rtcpSocket, 137 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 138 *rtpPort = port; 139 return; 140 } 141 } 142 143 TRESPASS(); 144} 145 146void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 147 switch (msg->what()) { 148 case kWhatAddStream: 149 { 150 onAddStream(msg); 151 break; 152 } 153 154 case kWhatRemoveStream: 155 { 156 onRemoveStream(msg); 157 break; 158 } 159 160 case kWhatPollStreams: 161 { 162 onPollStreams(); 163 break; 164 } 165 166 case kWhatInjectPacket: 167 { 168 onInjectPacket(msg); 169 break; 170 } 171 172 default: 173 { 174 TRESPASS(); 175 break; 176 } 177 } 178} 179 180void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 181 mStreams.push_back(StreamInfo()); 182 StreamInfo *info = &*--mStreams.end(); 183 184 int32_t s; 185 CHECK(msg->findInt32("rtp-socket", &s)); 186 info->mRTPSocket = s; 187 CHECK(msg->findInt32("rtcp-socket", &s)); 188 info->mRTCPSocket = s; 189 190 int32_t injected; 191 CHECK(msg->findInt32("injected", &injected)); 192 193 info->mIsInjected = injected; 194 195 sp<RefBase> obj; 196 CHECK(msg->findObject("session-desc", &obj)); 197 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 198 199 CHECK(msg->findSize("index", &info->mIndex)); 200 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 201 202 info->mNumRTCPPacketsReceived = 0; 203 info->mNumRTPPacketsReceived = 0; 204 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 205 206 if (!injected) { 207 postPollEvent(); 208 } 209} 210 211void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 212 int32_t rtpSocket, rtcpSocket; 213 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 214 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 215 216 List<StreamInfo>::iterator it = mStreams.begin(); 217 while (it != mStreams.end() 218 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 219 ++it; 220 } 221 222 if (it == mStreams.end()) { 223 return; 224 } 225 226 mStreams.erase(it); 227} 228 229void ARTPConnection::postPollEvent() { 230 if (mPollEventPending) { 231 return; 232 } 233 234 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 235 msg->post(); 236 237 mPollEventPending = true; 238} 239 240void ARTPConnection::onPollStreams() { 241 mPollEventPending = false; 242 243 if (mStreams.empty()) { 244 return; 245 } 246 247 struct timeval tv; 248 tv.tv_sec = 0; 249 tv.tv_usec = kSelectTimeoutUs; 250 251 fd_set rs; 252 FD_ZERO(&rs); 253 254 int maxSocket = -1; 255 for (List<StreamInfo>::iterator it = mStreams.begin(); 256 it != mStreams.end(); ++it) { 257 if ((*it).mIsInjected) { 258 continue; 259 } 260 261 FD_SET(it->mRTPSocket, &rs); 262 FD_SET(it->mRTCPSocket, &rs); 263 264 if (it->mRTPSocket > maxSocket) { 265 maxSocket = it->mRTPSocket; 266 } 267 if (it->mRTCPSocket > maxSocket) { 268 maxSocket = it->mRTCPSocket; 269 } 270 } 271 272 if (maxSocket == -1) { 273 return; 274 } 275 276 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 277 278 if (res > 0) { 279 List<StreamInfo>::iterator it = mStreams.begin(); 280 while (it != mStreams.end()) { 281 if ((*it).mIsInjected) { 282 ++it; 283 continue; 284 } 285 286 status_t err = OK; 287 if (FD_ISSET(it->mRTPSocket, &rs)) { 288 err = receive(&*it, true); 289 } 290 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { 291 err = receive(&*it, false); 292 } 293 294 if (err == -ECONNRESET) { 295 // socket failure, this stream is dead, Jim. 296 297 LOGW("failed to receive RTP/RTCP datagram."); 298 it = mStreams.erase(it); 299 continue; 300 } 301 302 ++it; 303 } 304 } 305 306 int64_t nowUs = ALooper::GetNowUs(); 307 if (mLastReceiverReportTimeUs <= 0 308 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 309 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 310 List<StreamInfo>::iterator it = mStreams.begin(); 311 while (it != mStreams.end()) { 312 StreamInfo *s = &*it; 313 314 if (s->mIsInjected) { 315 ++it; 316 continue; 317 } 318 319 if (s->mNumRTCPPacketsReceived == 0) { 320 // We have never received any RTCP packets on this stream, 321 // we don't even know where to send a report. 322 ++it; 323 continue; 324 } 325 326 buffer->setRange(0, 0); 327 328 for (size_t i = 0; i < s->mSources.size(); ++i) { 329 sp<ARTPSource> source = s->mSources.valueAt(i); 330 331 source->addReceiverReport(buffer); 332 333 if (mFlags & kRegularlyRequestFIR) { 334 source->addFIR(buffer); 335 } 336 } 337 338 if (buffer->size() > 0) { 339 LOGV("Sending RR..."); 340 341 ssize_t n; 342 do { 343 n = sendto( 344 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 345 (const struct sockaddr *)&s->mRemoteRTCPAddr, 346 sizeof(s->mRemoteRTCPAddr)); 347 } while (n < 0 && errno == EINTR); 348 349 if (n <= 0) { 350 LOGW("failed to send RTCP receiver report (%s).", 351 n == 0 ? "connection gone" : strerror(errno)); 352 353 it = mStreams.erase(it); 354 continue; 355 } 356 357 CHECK_EQ(n, (ssize_t)buffer->size()); 358 359 mLastReceiverReportTimeUs = nowUs; 360 } 361 362 ++it; 363 } 364 } 365 366 if (!mStreams.empty()) { 367 postPollEvent(); 368 } 369} 370 371status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 372 LOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); 373 374 CHECK(!s->mIsInjected); 375 376 sp<ABuffer> buffer = new ABuffer(65536); 377 378 socklen_t remoteAddrLen = 379 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 380 ? sizeof(s->mRemoteRTCPAddr) : 0; 381 382 ssize_t nbytes; 383 do { 384 nbytes = recvfrom( 385 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 386 buffer->data(), 387 buffer->capacity(), 388 0, 389 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 390 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 391 } while (nbytes < 0 && errno == EINTR); 392 393 if (nbytes <= 0) { 394 return -ECONNRESET; 395 } 396 397 buffer->setRange(0, nbytes); 398 399 // LOGI("received %d bytes.", buffer->size()); 400 401 status_t err; 402 if (receiveRTP) { 403 err = parseRTP(s, buffer); 404 } else { 405 err = parseRTCP(s, buffer); 406 } 407 408 return err; 409} 410 411status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 412 if (s->mNumRTPPacketsReceived++ == 0) { 413 sp<AMessage> notify = s->mNotifyMsg->dup(); 414 notify->setInt32("first-rtp", true); 415 notify->post(); 416 } 417 418 size_t size = buffer->size(); 419 420 if (size < 12) { 421 // Too short to be a valid RTP header. 422 return -1; 423 } 424 425 const uint8_t *data = buffer->data(); 426 427 if ((data[0] >> 6) != 2) { 428 // Unsupported version. 429 return -1; 430 } 431 432 if (data[0] & 0x20) { 433 // Padding present. 434 435 size_t paddingLength = data[size - 1]; 436 437 if (paddingLength + 12 > size) { 438 // If we removed this much padding we'd end up with something 439 // that's too short to be a valid RTP header. 440 return -1; 441 } 442 443 size -= paddingLength; 444 } 445 446 int numCSRCs = data[0] & 0x0f; 447 448 size_t payloadOffset = 12 + 4 * numCSRCs; 449 450 if (size < payloadOffset) { 451 // Not enough data to fit the basic header and all the CSRC entries. 452 return -1; 453 } 454 455 if (data[0] & 0x10) { 456 // Header eXtension present. 457 458 if (size < payloadOffset + 4) { 459 // Not enough data to fit the basic header, all CSRC entries 460 // and the first 4 bytes of the extension header. 461 462 return -1; 463 } 464 465 const uint8_t *extensionData = &data[payloadOffset]; 466 467 size_t extensionLength = 468 4 * (extensionData[2] << 8 | extensionData[3]); 469 470 if (size < payloadOffset + 4 + extensionLength) { 471 return -1; 472 } 473 474 payloadOffset += 4 + extensionLength; 475 } 476 477 uint32_t srcId = u32at(&data[8]); 478 479 sp<ARTPSource> source = findSource(s, srcId); 480 481 uint32_t rtpTime = u32at(&data[4]); 482 483 sp<AMessage> meta = buffer->meta(); 484 meta->setInt32("ssrc", srcId); 485 meta->setInt32("rtp-time", rtpTime); 486 meta->setInt32("PT", data[1] & 0x7f); 487 meta->setInt32("M", data[1] >> 7); 488 489 buffer->setInt32Data(u16at(&data[2])); 490 buffer->setRange(payloadOffset, size - payloadOffset); 491 492 source->processRTPPacket(buffer); 493 494 return OK; 495} 496 497status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 498 if (s->mNumRTCPPacketsReceived++ == 0) { 499 sp<AMessage> notify = s->mNotifyMsg->dup(); 500 notify->setInt32("first-rtcp", true); 501 notify->post(); 502 } 503 504 const uint8_t *data = buffer->data(); 505 size_t size = buffer->size(); 506 507 while (size > 0) { 508 if (size < 8) { 509 // Too short to be a valid RTCP header 510 return -1; 511 } 512 513 if ((data[0] >> 6) != 2) { 514 // Unsupported version. 515 return -1; 516 } 517 518 if (data[0] & 0x20) { 519 // Padding present. 520 521 size_t paddingLength = data[size - 1]; 522 523 if (paddingLength + 12 > size) { 524 // If we removed this much padding we'd end up with something 525 // that's too short to be a valid RTP header. 526 return -1; 527 } 528 529 size -= paddingLength; 530 } 531 532 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 533 534 if (size < headerLength) { 535 // Only received a partial packet? 536 return -1; 537 } 538 539 switch (data[1]) { 540 case 200: 541 { 542 parseSR(s, data, headerLength); 543 break; 544 } 545 546 case 201: // RR 547 case 202: // SDES 548 case 204: // APP 549 break; 550 551 case 205: // TSFB (transport layer specific feedback) 552 case 206: // PSFB (payload specific feedback) 553 // hexdump(data, headerLength); 554 break; 555 556 case 203: 557 { 558 parseBYE(s, data, headerLength); 559 break; 560 } 561 562 default: 563 { 564 LOGW("Unknown RTCP packet type %u of size %d", 565 (unsigned)data[1], headerLength); 566 break; 567 } 568 } 569 570 data += headerLength; 571 size -= headerLength; 572 } 573 574 return OK; 575} 576 577status_t ARTPConnection::parseBYE( 578 StreamInfo *s, const uint8_t *data, size_t size) { 579 size_t SC = data[0] & 0x3f; 580 581 if (SC == 0 || size < (4 + SC * 4)) { 582 // Packet too short for the minimal BYE header. 583 return -1; 584 } 585 586 uint32_t id = u32at(&data[4]); 587 588 sp<ARTPSource> source = findSource(s, id); 589 590 source->byeReceived(); 591 592 return OK; 593} 594 595status_t ARTPConnection::parseSR( 596 StreamInfo *s, const uint8_t *data, size_t size) { 597 size_t RC = data[0] & 0x1f; 598 599 if (size < (7 + RC * 6) * 4) { 600 // Packet too short for the minimal SR header. 601 return -1; 602 } 603 604 uint32_t id = u32at(&data[4]); 605 uint64_t ntpTime = u64at(&data[8]); 606 uint32_t rtpTime = u32at(&data[16]); 607 608#if 0 609 LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 610 id, 611 rtpTime, 612 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 613#endif 614 615 sp<ARTPSource> source = findSource(s, id); 616 617 source->timeUpdate(rtpTime, ntpTime); 618 619 return 0; 620} 621 622sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 623 sp<ARTPSource> source; 624 ssize_t index = info->mSources.indexOfKey(srcId); 625 if (index < 0) { 626 index = info->mSources.size(); 627 628 source = new ARTPSource( 629 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 630 631 info->mSources.add(srcId, source); 632 } else { 633 source = info->mSources.valueAt(index); 634 } 635 636 return source; 637} 638 639void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 640 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 641 msg->setInt32("index", index); 642 msg->setObject("buffer", buffer); 643 msg->post(); 644} 645 646void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 647 int32_t index; 648 CHECK(msg->findInt32("index", &index)); 649 650 sp<RefBase> obj; 651 CHECK(msg->findObject("buffer", &obj)); 652 653 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 654 655 List<StreamInfo>::iterator it = mStreams.begin(); 656 while (it != mStreams.end() 657 && it->mRTPSocket != index && it->mRTCPSocket != index) { 658 ++it; 659 } 660 661 if (it == mStreams.end()) { 662 TRESPASS(); 663 } 664 665 StreamInfo *s = &*it; 666 667 status_t err; 668 if (it->mRTPSocket == index) { 669 err = parseRTP(s, buffer); 670 } else { 671 err = parseRTCP(s, buffer); 672 } 673} 674 675} // namespace android 676 677