ARTPConnection.cpp revision 6e4c5c499999c04c2477b987f9e64f3ff2bf1a06
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "ARTPConnection" 19#include <utils/Log.h> 20 21#include "ARTPConnection.h" 22 23#include "ARTPSource.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/AMessage.h> 29#include <media/stagefright/foundation/AString.h> 30#include <media/stagefright/foundation/hexdump.h> 31 32#include <arpa/inet.h> 33#include <sys/socket.h> 34 35namespace android { 36 37static const size_t kMaxUDPSize = 1500; 38 39static uint16_t u16at(const uint8_t *data) { 40 return data[0] << 8 | data[1]; 41} 42 43static uint32_t u32at(const uint8_t *data) { 44 return u16at(data) << 16 | u16at(&data[2]); 45} 46 47static uint64_t u64at(const uint8_t *data) { 48 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 49} 50 51// static 52const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 53 54struct ARTPConnection::StreamInfo { 55 int mRTPSocket; 56 int mRTCPSocket; 57 sp<ASessionDescription> mSessionDesc; 58 size_t mIndex; 59 sp<AMessage> mNotifyMsg; 60 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 61 62 int32_t mNumRTCPPacketsReceived; 63 struct sockaddr_in mRemoteRTCPAddr; 64 65 bool mIsInjected; 66}; 67 68ARTPConnection::ARTPConnection(uint32_t flags) 69 : mFlags(flags), 70 mPollEventPending(false), 71 mLastReceiverReportTimeUs(-1) { 72} 73 74ARTPConnection::~ARTPConnection() { 75} 76 77void ARTPConnection::addStream( 78 int rtpSocket, int rtcpSocket, 79 const sp<ASessionDescription> &sessionDesc, 80 size_t index, 81 const sp<AMessage> ¬ify, 82 bool injected) { 83 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 84 msg->setInt32("rtp-socket", rtpSocket); 85 msg->setInt32("rtcp-socket", rtcpSocket); 86 msg->setObject("session-desc", sessionDesc); 87 msg->setSize("index", index); 88 msg->setMessage("notify", notify); 89 msg->setInt32("injected", injected); 90 msg->post(); 91} 92 93void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 94 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 95 msg->setInt32("rtp-socket", rtpSocket); 96 msg->setInt32("rtcp-socket", rtcpSocket); 97 msg->post(); 98} 99 100static void bumpSocketBufferSize(int s) { 101 int size = 256 * 1024; 102 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 103} 104 105// static 106void ARTPConnection::MakePortPair( 107 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 108 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 109 CHECK_GE(*rtpSocket, 0); 110 111 bumpSocketBufferSize(*rtpSocket); 112 113 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 114 CHECK_GE(*rtcpSocket, 0); 115 116 bumpSocketBufferSize(*rtcpSocket); 117 118 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 119 start &= ~1; 120 121 for (unsigned port = start; port < 65536; port += 2) { 122 struct sockaddr_in addr; 123 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 124 addr.sin_family = AF_INET; 125 addr.sin_addr.s_addr = INADDR_ANY; 126 addr.sin_port = htons(port); 127 128 if (bind(*rtpSocket, 129 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 130 continue; 131 } 132 133 addr.sin_port = htons(port + 1); 134 135 if (bind(*rtcpSocket, 136 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 137 *rtpPort = port; 138 return; 139 } 140 } 141 142 TRESPASS(); 143} 144 145void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 146 switch (msg->what()) { 147 case kWhatAddStream: 148 { 149 onAddStream(msg); 150 break; 151 } 152 153 case kWhatRemoveStream: 154 { 155 onRemoveStream(msg); 156 break; 157 } 158 159 case kWhatPollStreams: 160 { 161 onPollStreams(); 162 break; 163 } 164 165 case kWhatInjectPacket: 166 { 167 onInjectPacket(msg); 168 break; 169 } 170 171 default: 172 { 173 TRESPASS(); 174 break; 175 } 176 } 177} 178 179void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 180 mStreams.push_back(StreamInfo()); 181 StreamInfo *info = &*--mStreams.end(); 182 183 int32_t s; 184 CHECK(msg->findInt32("rtp-socket", &s)); 185 info->mRTPSocket = s; 186 CHECK(msg->findInt32("rtcp-socket", &s)); 187 info->mRTCPSocket = s; 188 189 int32_t injected; 190 CHECK(msg->findInt32("injected", &injected)); 191 192 info->mIsInjected = injected; 193 194 sp<RefBase> obj; 195 CHECK(msg->findObject("session-desc", &obj)); 196 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 197 198 CHECK(msg->findSize("index", &info->mIndex)); 199 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 200 201 info->mNumRTCPPacketsReceived = 0; 202 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 203 204 if (!injected) { 205 postPollEvent(); 206 } 207} 208 209void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 210 int32_t rtpSocket, rtcpSocket; 211 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 212 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 213 214 List<StreamInfo>::iterator it = mStreams.begin(); 215 while (it != mStreams.end() 216 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 217 ++it; 218 } 219 220 if (it == mStreams.end()) { 221 TRESPASS(); 222 } 223 224 mStreams.erase(it); 225} 226 227void ARTPConnection::postPollEvent() { 228 if (mPollEventPending) { 229 return; 230 } 231 232 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 233 msg->post(); 234 235 mPollEventPending = true; 236} 237 238void ARTPConnection::onPollStreams() { 239 mPollEventPending = false; 240 241 if (mStreams.empty()) { 242 return; 243 } 244 245 struct timeval tv; 246 tv.tv_sec = 0; 247 tv.tv_usec = kSelectTimeoutUs; 248 249 fd_set rs; 250 FD_ZERO(&rs); 251 252 int maxSocket = -1; 253 for (List<StreamInfo>::iterator it = mStreams.begin(); 254 it != mStreams.end(); ++it) { 255 if ((*it).mIsInjected) { 256 continue; 257 } 258 259 FD_SET(it->mRTPSocket, &rs); 260 FD_SET(it->mRTCPSocket, &rs); 261 262 if (it->mRTPSocket > maxSocket) { 263 maxSocket = it->mRTPSocket; 264 } 265 if (it->mRTCPSocket > maxSocket) { 266 maxSocket = it->mRTCPSocket; 267 } 268 } 269 270 if (maxSocket == -1) { 271 return; 272 } 273 274 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 275 CHECK_GE(res, 0); 276 277 if (res > 0) { 278 for (List<StreamInfo>::iterator it = mStreams.begin(); 279 it != mStreams.end(); ++it) { 280 if ((*it).mIsInjected) { 281 continue; 282 } 283 284 if (FD_ISSET(it->mRTPSocket, &rs)) { 285 receive(&*it, true); 286 } 287 if (FD_ISSET(it->mRTCPSocket, &rs)) { 288 receive(&*it, false); 289 } 290 } 291 } 292 293 postPollEvent(); 294 295 int64_t nowUs = ALooper::GetNowUs(); 296 if (mLastReceiverReportTimeUs <= 0 297 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 298 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 299 for (List<StreamInfo>::iterator it = mStreams.begin(); 300 it != mStreams.end(); ++it) { 301 StreamInfo *s = &*it; 302 303 if (s->mIsInjected) { 304 continue; 305 } 306 307 if (s->mNumRTCPPacketsReceived == 0) { 308 // We have never received any RTCP packets on this stream, 309 // we don't even know where to send a report. 310 continue; 311 } 312 313 buffer->setRange(0, 0); 314 315 for (size_t i = 0; i < s->mSources.size(); ++i) { 316 sp<ARTPSource> source = s->mSources.valueAt(i); 317 318 source->addReceiverReport(buffer); 319 320 if (mFlags & kRegularlyRequestFIR) { 321 source->addFIR(buffer); 322 } 323 } 324 325 if (buffer->size() > 0) { 326 LOGV("Sending RR..."); 327 328 ssize_t n = sendto( 329 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 330 (const struct sockaddr *)&s->mRemoteRTCPAddr, 331 sizeof(s->mRemoteRTCPAddr)); 332 CHECK_EQ(n, (ssize_t)buffer->size()); 333 334 mLastReceiverReportTimeUs = nowUs; 335 } 336 } 337 } 338} 339 340status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 341 CHECK(!s->mIsInjected); 342 343 sp<ABuffer> buffer = new ABuffer(65536); 344 345 socklen_t remoteAddrLen = 346 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 347 ? sizeof(s->mRemoteRTCPAddr) : 0; 348 349 ssize_t nbytes = recvfrom( 350 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 351 buffer->data(), 352 buffer->capacity(), 353 0, 354 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 355 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 356 357 if (nbytes < 0) { 358 return -1; 359 } 360 361 buffer->setRange(0, nbytes); 362 363 // LOGI("received %d bytes.", buffer->size()); 364 365 status_t err; 366 if (receiveRTP) { 367 err = parseRTP(s, buffer); 368 } else { 369 err = parseRTCP(s, buffer); 370 } 371 372 return err; 373} 374 375status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 376 size_t size = buffer->size(); 377 378 if (size < 12) { 379 // Too short to be a valid RTP header. 380 return -1; 381 } 382 383 const uint8_t *data = buffer->data(); 384 385 if ((data[0] >> 6) != 2) { 386 // Unsupported version. 387 return -1; 388 } 389 390 if (data[0] & 0x20) { 391 // Padding present. 392 393 size_t paddingLength = data[size - 1]; 394 395 if (paddingLength + 12 > size) { 396 // If we removed this much padding we'd end up with something 397 // that's too short to be a valid RTP header. 398 return -1; 399 } 400 401 size -= paddingLength; 402 } 403 404 int numCSRCs = data[0] & 0x0f; 405 406 size_t payloadOffset = 12 + 4 * numCSRCs; 407 408 if (size < payloadOffset) { 409 // Not enough data to fit the basic header and all the CSRC entries. 410 return -1; 411 } 412 413 if (data[0] & 0x10) { 414 // Header eXtension present. 415 416 if (size < payloadOffset + 4) { 417 // Not enough data to fit the basic header, all CSRC entries 418 // and the first 4 bytes of the extension header. 419 420 return -1; 421 } 422 423 const uint8_t *extensionData = &data[payloadOffset]; 424 425 size_t extensionLength = 426 4 * (extensionData[2] << 8 | extensionData[3]); 427 428 if (size < payloadOffset + 4 + extensionLength) { 429 return -1; 430 } 431 432 payloadOffset += 4 + extensionLength; 433 } 434 435 uint32_t srcId = u32at(&data[8]); 436 437 sp<ARTPSource> source = findSource(s, srcId); 438 439 uint32_t rtpTime = u32at(&data[4]); 440 441 sp<AMessage> meta = buffer->meta(); 442 meta->setInt32("ssrc", srcId); 443 meta->setInt32("rtp-time", rtpTime); 444 meta->setInt32("PT", data[1] & 0x7f); 445 meta->setInt32("M", data[1] >> 7); 446 447 buffer->setInt32Data(u16at(&data[2])); 448 buffer->setRange(payloadOffset, size - payloadOffset); 449 450 if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) { 451 source->timeUpdate(rtpTime, 0); 452 source->timeUpdate(rtpTime + 90000, 0x100000000ll); 453 CHECK(source->timeEstablished()); 454 } 455 456 source->processRTPPacket(buffer); 457 458 return OK; 459} 460 461status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 462 if (s->mNumRTCPPacketsReceived++ == 0) { 463 sp<AMessage> notify = s->mNotifyMsg->dup(); 464 notify->setInt32("first-rtcp", true); 465 notify->post(); 466 } 467 468 const uint8_t *data = buffer->data(); 469 size_t size = buffer->size(); 470 471 while (size > 0) { 472 if (size < 8) { 473 // Too short to be a valid RTCP header 474 return -1; 475 } 476 477 if ((data[0] >> 6) != 2) { 478 // Unsupported version. 479 return -1; 480 } 481 482 if (data[0] & 0x20) { 483 // Padding present. 484 485 size_t paddingLength = data[size - 1]; 486 487 if (paddingLength + 12 > size) { 488 // If we removed this much padding we'd end up with something 489 // that's too short to be a valid RTP header. 490 return -1; 491 } 492 493 size -= paddingLength; 494 } 495 496 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 497 498 if (size < headerLength) { 499 // Only received a partial packet? 500 return -1; 501 } 502 503 switch (data[1]) { 504 case 200: 505 { 506 parseSR(s, data, headerLength); 507 break; 508 } 509 510 case 201: // RR 511 case 202: // SDES 512 case 204: // APP 513 break; 514 515 case 205: // TSFB (transport layer specific feedback) 516 case 206: // PSFB (payload specific feedback) 517 // hexdump(data, headerLength); 518 break; 519 520 case 203: 521 { 522 parseBYE(s, data, headerLength); 523 break; 524 } 525 526 default: 527 { 528 LOGW("Unknown RTCP packet type %u of size %d", 529 (unsigned)data[1], headerLength); 530 break; 531 } 532 } 533 534 data += headerLength; 535 size -= headerLength; 536 } 537 538 return OK; 539} 540 541status_t ARTPConnection::parseBYE( 542 StreamInfo *s, const uint8_t *data, size_t size) { 543 size_t SC = data[0] & 0x3f; 544 545 if (SC == 0 || size < (4 + SC * 4)) { 546 // Packet too short for the minimal BYE header. 547 return -1; 548 } 549 550 uint32_t id = u32at(&data[4]); 551 552 sp<ARTPSource> source = findSource(s, id); 553 554 source->byeReceived(); 555 556 return OK; 557} 558 559status_t ARTPConnection::parseSR( 560 StreamInfo *s, const uint8_t *data, size_t size) { 561 size_t RC = data[0] & 0x1f; 562 563 if (size < (7 + RC * 6) * 4) { 564 // Packet too short for the minimal SR header. 565 return -1; 566 } 567 568 uint32_t id = u32at(&data[4]); 569 uint64_t ntpTime = u64at(&data[8]); 570 uint32_t rtpTime = u32at(&data[16]); 571 572#if 0 573 LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 574 id, 575 rtpTime, 576 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 577#endif 578 579 sp<ARTPSource> source = findSource(s, id); 580 581 if ((mFlags & kFakeTimestamps) == 0) { 582 source->timeUpdate(rtpTime, ntpTime); 583 } 584 585 return 0; 586} 587 588sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 589 sp<ARTPSource> source; 590 ssize_t index = info->mSources.indexOfKey(srcId); 591 if (index < 0) { 592 index = info->mSources.size(); 593 594 source = new ARTPSource( 595 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 596 597 info->mSources.add(srcId, source); 598 } else { 599 source = info->mSources.valueAt(index); 600 } 601 602 return source; 603} 604 605void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 606 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 607 msg->setInt32("index", index); 608 msg->setObject("buffer", buffer); 609 msg->post(); 610} 611 612void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 613 int32_t index; 614 CHECK(msg->findInt32("index", &index)); 615 616 sp<RefBase> obj; 617 CHECK(msg->findObject("buffer", &obj)); 618 619 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 620 621 List<StreamInfo>::iterator it = mStreams.begin(); 622 while (it != mStreams.end() 623 && it->mRTPSocket != index && it->mRTCPSocket != index) { 624 ++it; 625 } 626 627 if (it == mStreams.end()) { 628 TRESPASS(); 629 } 630 631 StreamInfo *s = &*it; 632 633 status_t err; 634 if (it->mRTPSocket == index) { 635 err = parseRTP(s, buffer); 636 } else { 637 err = parseRTCP(s, buffer); 638 } 639} 640 641} // namespace android 642 643