ARTPConnection.cpp revision 0792ce7e0924ebb0dbe7b7cfcd79d12cbdb03ed2
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "ARTPConnection.h" 18 19#include "ARTPSource.h" 20#include "ASessionDescription.h" 21 22#include <media/stagefright/foundation/ABuffer.h> 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/foundation/AString.h> 26#include <media/stagefright/foundation/hexdump.h> 27 28#include <arpa/inet.h> 29#include <sys/socket.h> 30 31namespace android { 32 33static const size_t kMaxUDPSize = 1500; 34 35static uint16_t u16at(const uint8_t *data) { 36 return data[0] << 8 | data[1]; 37} 38 39static uint32_t u32at(const uint8_t *data) { 40 return u16at(data) << 16 | u16at(&data[2]); 41} 42 43static uint64_t u64at(const uint8_t *data) { 44 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 45} 46 47// static 48const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 49 50struct ARTPConnection::StreamInfo { 51 int mRTPSocket; 52 int mRTCPSocket; 53 sp<ASessionDescription> mSessionDesc; 54 size_t mIndex; 55 sp<AMessage> mNotifyMsg; 56 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 57 58 int32_t mNumRTCPPacketsReceived; 59 struct sockaddr_in mRemoteRTCPAddr; 60 61 bool mIsInjected; 62}; 63 64ARTPConnection::ARTPConnection(uint32_t flags) 65 : mFlags(flags), 66 mPollEventPending(false), 67 mLastReceiverReportTimeUs(-1) { 68} 69 70ARTPConnection::~ARTPConnection() { 71} 72 73void ARTPConnection::addStream( 74 int rtpSocket, int rtcpSocket, 75 const sp<ASessionDescription> &sessionDesc, 76 size_t index, 77 const sp<AMessage> ¬ify, 78 bool injected) { 79 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 80 msg->setInt32("rtp-socket", rtpSocket); 81 msg->setInt32("rtcp-socket", rtcpSocket); 82 msg->setObject("session-desc", sessionDesc); 83 msg->setSize("index", index); 84 msg->setMessage("notify", notify); 85 msg->setInt32("injected", injected); 86 msg->post(); 87} 88 89void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 90 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 91 msg->setInt32("rtp-socket", rtpSocket); 92 msg->setInt32("rtcp-socket", rtcpSocket); 93 msg->post(); 94} 95 96static void bumpSocketBufferSize(int s) { 97 int size = 256 * 1024; 98 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 99} 100 101// static 102void ARTPConnection::MakePortPair( 103 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 104 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 105 CHECK_GE(*rtpSocket, 0); 106 107 bumpSocketBufferSize(*rtpSocket); 108 109 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 110 CHECK_GE(*rtcpSocket, 0); 111 112 bumpSocketBufferSize(*rtcpSocket); 113 114 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 115 start &= ~1; 116 117 for (unsigned port = start; port < 65536; port += 2) { 118 struct sockaddr_in addr; 119 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 120 addr.sin_family = AF_INET; 121 addr.sin_addr.s_addr = INADDR_ANY; 122 addr.sin_port = htons(port); 123 124 if (bind(*rtpSocket, 125 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 126 continue; 127 } 128 129 addr.sin_port = htons(port + 1); 130 131 if (bind(*rtcpSocket, 132 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 133 *rtpPort = port; 134 return; 135 } 136 } 137 138 TRESPASS(); 139} 140 141void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 142 switch (msg->what()) { 143 case kWhatAddStream: 144 { 145 onAddStream(msg); 146 break; 147 } 148 149 case kWhatRemoveStream: 150 { 151 onRemoveStream(msg); 152 break; 153 } 154 155 case kWhatPollStreams: 156 { 157 onPollStreams(); 158 break; 159 } 160 161 case kWhatInjectPacket: 162 { 163 onInjectPacket(msg); 164 break; 165 } 166 167 default: 168 { 169 TRESPASS(); 170 break; 171 } 172 } 173} 174 175void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 176 mStreams.push_back(StreamInfo()); 177 StreamInfo *info = &*--mStreams.end(); 178 179 int32_t s; 180 CHECK(msg->findInt32("rtp-socket", &s)); 181 info->mRTPSocket = s; 182 CHECK(msg->findInt32("rtcp-socket", &s)); 183 info->mRTCPSocket = s; 184 185 int32_t injected; 186 CHECK(msg->findInt32("injected", &injected)); 187 188 info->mIsInjected = injected; 189 190 sp<RefBase> obj; 191 CHECK(msg->findObject("session-desc", &obj)); 192 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 193 194 CHECK(msg->findSize("index", &info->mIndex)); 195 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 196 197 info->mNumRTCPPacketsReceived = 0; 198 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 199 200 if (!injected) { 201 postPollEvent(); 202 } 203} 204 205void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 206 int32_t rtpSocket, rtcpSocket; 207 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 208 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 209 210 List<StreamInfo>::iterator it = mStreams.begin(); 211 while (it != mStreams.end() 212 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 213 ++it; 214 } 215 216 if (it == mStreams.end()) { 217 TRESPASS(); 218 } 219 220 mStreams.erase(it); 221} 222 223void ARTPConnection::postPollEvent() { 224 if (mPollEventPending) { 225 return; 226 } 227 228 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 229 msg->post(); 230 231 mPollEventPending = true; 232} 233 234void ARTPConnection::onPollStreams() { 235 mPollEventPending = false; 236 237 if (mStreams.empty()) { 238 return; 239 } 240 241 struct timeval tv; 242 tv.tv_sec = 0; 243 tv.tv_usec = kSelectTimeoutUs; 244 245 fd_set rs; 246 FD_ZERO(&rs); 247 248 int maxSocket = -1; 249 for (List<StreamInfo>::iterator it = mStreams.begin(); 250 it != mStreams.end(); ++it) { 251 if ((*it).mIsInjected) { 252 continue; 253 } 254 255 FD_SET(it->mRTPSocket, &rs); 256 FD_SET(it->mRTCPSocket, &rs); 257 258 if (it->mRTPSocket > maxSocket) { 259 maxSocket = it->mRTPSocket; 260 } 261 if (it->mRTCPSocket > maxSocket) { 262 maxSocket = it->mRTCPSocket; 263 } 264 } 265 266 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 267 CHECK_GE(res, 0); 268 269 if (res > 0) { 270 for (List<StreamInfo>::iterator it = mStreams.begin(); 271 it != mStreams.end(); ++it) { 272 if ((*it).mIsInjected) { 273 continue; 274 } 275 276 if (FD_ISSET(it->mRTPSocket, &rs)) { 277 receive(&*it, true); 278 } 279 if (FD_ISSET(it->mRTCPSocket, &rs)) { 280 receive(&*it, false); 281 } 282 } 283 } 284 285 postPollEvent(); 286 287 int64_t nowUs = ALooper::GetNowUs(); 288 if (mLastReceiverReportTimeUs <= 0 289 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 290 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 291 for (List<StreamInfo>::iterator it = mStreams.begin(); 292 it != mStreams.end(); ++it) { 293 StreamInfo *s = &*it; 294 295 if (s->mNumRTCPPacketsReceived == 0) { 296 // We have never received any RTCP packets on this stream, 297 // we don't even know where to send a report. 298 continue; 299 } 300 301 buffer->setRange(0, 0); 302 303 for (size_t i = 0; i < s->mSources.size(); ++i) { 304 sp<ARTPSource> source = s->mSources.valueAt(i); 305 306 source->addReceiverReport(buffer); 307 308 if (mFlags & kRegularlyRequestFIR) { 309 source->addFIR(buffer); 310 } 311 } 312 313 if (buffer->size() > 0) { 314 LOG(VERBOSE) << "Sending RR..."; 315 316 ssize_t n = sendto( 317 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 318 (const struct sockaddr *)&s->mRemoteRTCPAddr, 319 sizeof(s->mRemoteRTCPAddr)); 320 CHECK_EQ(n, (ssize_t)buffer->size()); 321 322 mLastReceiverReportTimeUs = nowUs; 323 } 324 } 325 } 326} 327 328status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 329 CHECK(!s->mIsInjected); 330 331 sp<ABuffer> buffer = new ABuffer(65536); 332 333 socklen_t remoteAddrLen = 334 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 335 ? sizeof(s->mRemoteRTCPAddr) : 0; 336 337 ssize_t nbytes = recvfrom( 338 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 339 buffer->data(), 340 buffer->capacity(), 341 0, 342 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 343 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 344 345 if (nbytes < 0) { 346 return -1; 347 } 348 349 buffer->setRange(0, nbytes); 350 351 // LOG(INFO) << "received " << buffer->size() << " bytes."; 352 353 status_t err; 354 if (receiveRTP) { 355 err = parseRTP(s, buffer); 356 } else { 357 ++s->mNumRTCPPacketsReceived; 358 err = parseRTCP(s, buffer); 359 } 360 361 return err; 362} 363 364status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 365 size_t size = buffer->size(); 366 367 if (size < 12) { 368 // Too short to be a valid RTP header. 369 return -1; 370 } 371 372 const uint8_t *data = buffer->data(); 373 374 if ((data[0] >> 6) != 2) { 375 // Unsupported version. 376 return -1; 377 } 378 379 if (data[0] & 0x20) { 380 // Padding present. 381 382 size_t paddingLength = data[size - 1]; 383 384 if (paddingLength + 12 > size) { 385 // If we removed this much padding we'd end up with something 386 // that's too short to be a valid RTP header. 387 return -1; 388 } 389 390 size -= paddingLength; 391 } 392 393 int numCSRCs = data[0] & 0x0f; 394 395 size_t payloadOffset = 12 + 4 * numCSRCs; 396 397 if (size < payloadOffset) { 398 // Not enough data to fit the basic header and all the CSRC entries. 399 return -1; 400 } 401 402 if (data[0] & 0x10) { 403 // Header eXtension present. 404 405 if (size < payloadOffset + 4) { 406 // Not enough data to fit the basic header, all CSRC entries 407 // and the first 4 bytes of the extension header. 408 409 return -1; 410 } 411 412 const uint8_t *extensionData = &data[payloadOffset]; 413 414 size_t extensionLength = 415 4 * (extensionData[2] << 8 | extensionData[3]); 416 417 if (size < payloadOffset + 4 + extensionLength) { 418 return -1; 419 } 420 421 payloadOffset += 4 + extensionLength; 422 } 423 424 uint32_t srcId = u32at(&data[8]); 425 426 sp<ARTPSource> source = findSource(s, srcId); 427 428 uint32_t rtpTime = u32at(&data[4]); 429 430 sp<AMessage> meta = buffer->meta(); 431 meta->setInt32("ssrc", srcId); 432 meta->setInt32("rtp-time", rtpTime); 433 meta->setInt32("PT", data[1] & 0x7f); 434 meta->setInt32("M", data[1] >> 7); 435 436 buffer->setInt32Data(u16at(&data[2])); 437 buffer->setRange(payloadOffset, size - payloadOffset); 438 439 if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) { 440 source->timeUpdate(rtpTime, 0); 441 source->timeUpdate(rtpTime + 90000, 0x100000000ll); 442 CHECK(source->timeEstablished()); 443 } 444 445 source->processRTPPacket(buffer); 446 447 return OK; 448} 449 450status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 451 const uint8_t *data = buffer->data(); 452 size_t size = buffer->size(); 453 454 while (size > 0) { 455 if (size < 8) { 456 // Too short to be a valid RTCP header 457 return -1; 458 } 459 460 if ((data[0] >> 6) != 2) { 461 // Unsupported version. 462 return -1; 463 } 464 465 if (data[0] & 0x20) { 466 // Padding present. 467 468 size_t paddingLength = data[size - 1]; 469 470 if (paddingLength + 12 > size) { 471 // If we removed this much padding we'd end up with something 472 // that's too short to be a valid RTP header. 473 return -1; 474 } 475 476 size -= paddingLength; 477 } 478 479 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 480 481 if (size < headerLength) { 482 // Only received a partial packet? 483 return -1; 484 } 485 486 switch (data[1]) { 487 case 200: 488 { 489 parseSR(s, data, headerLength); 490 break; 491 } 492 493 case 201: // RR 494 case 202: // SDES 495 case 204: // APP 496 break; 497 498 case 205: // TSFB (transport layer specific feedback) 499 case 206: // PSFB (payload specific feedback) 500 // hexdump(data, headerLength); 501 break; 502 503 case 203: 504 { 505 parseBYE(s, data, headerLength); 506 break; 507 } 508 509 default: 510 { 511 LOG(WARNING) << "Unknown RTCP packet type " 512 << (unsigned)data[1] 513 << " of size " << headerLength; 514 break; 515 } 516 } 517 518 data += headerLength; 519 size -= headerLength; 520 } 521 522 return OK; 523} 524 525status_t ARTPConnection::parseBYE( 526 StreamInfo *s, const uint8_t *data, size_t size) { 527 size_t SC = data[0] & 0x3f; 528 529 if (SC == 0 || size < (4 + SC * 4)) { 530 // Packet too short for the minimal BYE header. 531 return -1; 532 } 533 534 uint32_t id = u32at(&data[4]); 535 536 sp<ARTPSource> source = findSource(s, id); 537 538 source->byeReceived(); 539 540 return OK; 541} 542 543status_t ARTPConnection::parseSR( 544 StreamInfo *s, const uint8_t *data, size_t size) { 545 size_t RC = data[0] & 0x1f; 546 547 if (size < (7 + RC * 6) * 4) { 548 // Packet too short for the minimal SR header. 549 return -1; 550 } 551 552 uint32_t id = u32at(&data[4]); 553 uint64_t ntpTime = u64at(&data[8]); 554 uint32_t rtpTime = u32at(&data[16]); 555 556#if 0 557 LOG(INFO) << StringPrintf( 558 "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 559 id, 560 rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 561#endif 562 563 sp<ARTPSource> source = findSource(s, id); 564 565 if ((mFlags & kFakeTimestamps) == 0) { 566 source->timeUpdate(rtpTime, ntpTime); 567 } 568 569 return 0; 570} 571 572sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 573 sp<ARTPSource> source; 574 ssize_t index = info->mSources.indexOfKey(srcId); 575 if (index < 0) { 576 index = info->mSources.size(); 577 578 source = new ARTPSource( 579 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 580 581 info->mSources.add(srcId, source); 582 } else { 583 source = info->mSources.valueAt(index); 584 } 585 586 return source; 587} 588 589void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 590 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 591 msg->setInt32("index", index); 592 msg->setObject("buffer", buffer); 593 msg->post(); 594} 595 596void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 597 int32_t index; 598 CHECK(msg->findInt32("index", &index)); 599 600 sp<RefBase> obj; 601 CHECK(msg->findObject("buffer", &obj)); 602 603 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 604 605 List<StreamInfo>::iterator it = mStreams.begin(); 606 while (it != mStreams.end() 607 && it->mRTPSocket != index && it->mRTCPSocket != index) { 608 ++it; 609 } 610 611 if (it == mStreams.end()) { 612 TRESPASS(); 613 } 614 615 StreamInfo *s = &*it; 616 617 status_t err; 618 if (it->mRTPSocket == index) { 619 err = parseRTP(s, buffer); 620 } else { 621 ++s->mNumRTCPPacketsReceived; 622 err = parseRTCP(s, buffer); 623 } 624} 625 626} // namespace android 627 628