ARTPConnection.cpp revision 7aef03379179c109c2547c33c410bfc93c8db576
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "ARTPConnection.h" 18 19#include "ARTPSource.h" 20#include "ASessionDescription.h" 21 22#include <media/stagefright/foundation/ABuffer.h> 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/foundation/AString.h> 26#include <media/stagefright/foundation/hexdump.h> 27 28#include <arpa/inet.h> 29#include <sys/socket.h> 30 31namespace android { 32 33static const size_t kMaxUDPSize = 1500; 34 35static uint16_t u16at(const uint8_t *data) { 36 return data[0] << 8 | data[1]; 37} 38 39static uint32_t u32at(const uint8_t *data) { 40 return u16at(data) << 16 | u16at(&data[2]); 41} 42 43static uint64_t u64at(const uint8_t *data) { 44 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 45} 46 47// static 48const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 49 50struct ARTPConnection::StreamInfo { 51 int mRTPSocket; 52 int mRTCPSocket; 53 sp<ASessionDescription> mSessionDesc; 54 size_t mIndex; 55 sp<AMessage> mNotifyMsg; 56 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 57 58 int32_t mNumRTCPPacketsReceived; 59 struct sockaddr_in mRemoteRTCPAddr; 60 61 bool mIsInjected; 62}; 63 64ARTPConnection::ARTPConnection(uint32_t flags) 65 : mFlags(flags), 66 mPollEventPending(false), 67 mLastReceiverReportTimeUs(-1) { 68} 69 70ARTPConnection::~ARTPConnection() { 71} 72 73void ARTPConnection::addStream( 74 int rtpSocket, int rtcpSocket, 75 const sp<ASessionDescription> &sessionDesc, 76 size_t index, 77 const sp<AMessage> ¬ify, 78 bool injected) { 79 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 80 msg->setInt32("rtp-socket", rtpSocket); 81 msg->setInt32("rtcp-socket", rtcpSocket); 82 msg->setObject("session-desc", sessionDesc); 83 msg->setSize("index", index); 84 msg->setMessage("notify", notify); 85 msg->setInt32("injected", injected); 86 msg->post(); 87} 88 89void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 90 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 91 msg->setInt32("rtp-socket", rtpSocket); 92 msg->setInt32("rtcp-socket", rtcpSocket); 93 msg->post(); 94} 95 96static void bumpSocketBufferSize(int s) { 97 int size = 256 * 1024; 98 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 99} 100 101// static 102void ARTPConnection::MakePortPair( 103 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 104 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 105 CHECK_GE(*rtpSocket, 0); 106 107 bumpSocketBufferSize(*rtpSocket); 108 109 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 110 CHECK_GE(*rtcpSocket, 0); 111 112 bumpSocketBufferSize(*rtcpSocket); 113 114 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 115 start &= ~1; 116 117 for (unsigned port = start; port < 65536; port += 2) { 118 struct sockaddr_in addr; 119 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 120 addr.sin_family = AF_INET; 121 addr.sin_addr.s_addr = INADDR_ANY; 122 addr.sin_port = htons(port); 123 124 if (bind(*rtpSocket, 125 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 126 continue; 127 } 128 129 addr.sin_port = htons(port + 1); 130 131 if (bind(*rtcpSocket, 132 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 133 *rtpPort = port; 134 return; 135 } 136 } 137 138 TRESPASS(); 139} 140 141void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 142 switch (msg->what()) { 143 case kWhatAddStream: 144 { 145 onAddStream(msg); 146 break; 147 } 148 149 case kWhatRemoveStream: 150 { 151 onRemoveStream(msg); 152 break; 153 } 154 155 case kWhatPollStreams: 156 { 157 onPollStreams(); 158 break; 159 } 160 161 case kWhatInjectPacket: 162 { 163 onInjectPacket(msg); 164 break; 165 } 166 167 default: 168 { 169 TRESPASS(); 170 break; 171 } 172 } 173} 174 175void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 176 mStreams.push_back(StreamInfo()); 177 StreamInfo *info = &*--mStreams.end(); 178 179 int32_t s; 180 CHECK(msg->findInt32("rtp-socket", &s)); 181 info->mRTPSocket = s; 182 CHECK(msg->findInt32("rtcp-socket", &s)); 183 info->mRTCPSocket = s; 184 185 int32_t injected; 186 CHECK(msg->findInt32("injected", &injected)); 187 188 info->mIsInjected = injected; 189 190 sp<RefBase> obj; 191 CHECK(msg->findObject("session-desc", &obj)); 192 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 193 194 CHECK(msg->findSize("index", &info->mIndex)); 195 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 196 197 info->mNumRTCPPacketsReceived = 0; 198 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 199 200 if (!injected) { 201 postPollEvent(); 202 } 203} 204 205void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 206 int32_t rtpSocket, rtcpSocket; 207 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 208 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 209 210 List<StreamInfo>::iterator it = mStreams.begin(); 211 while (it != mStreams.end() 212 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 213 ++it; 214 } 215 216 if (it == mStreams.end()) { 217 TRESPASS(); 218 } 219 220 mStreams.erase(it); 221} 222 223void ARTPConnection::postPollEvent() { 224 if (mPollEventPending) { 225 return; 226 } 227 228 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 229 msg->post(); 230 231 mPollEventPending = true; 232} 233 234void ARTPConnection::onPollStreams() { 235 mPollEventPending = false; 236 237 if (mStreams.empty()) { 238 return; 239 } 240 241 struct timeval tv; 242 tv.tv_sec = 0; 243 tv.tv_usec = kSelectTimeoutUs; 244 245 fd_set rs; 246 FD_ZERO(&rs); 247 248 int maxSocket = -1; 249 for (List<StreamInfo>::iterator it = mStreams.begin(); 250 it != mStreams.end(); ++it) { 251 if ((*it).mIsInjected) { 252 continue; 253 } 254 255 FD_SET(it->mRTPSocket, &rs); 256 FD_SET(it->mRTCPSocket, &rs); 257 258 if (it->mRTPSocket > maxSocket) { 259 maxSocket = it->mRTPSocket; 260 } 261 if (it->mRTCPSocket > maxSocket) { 262 maxSocket = it->mRTCPSocket; 263 } 264 } 265 266 if (maxSocket == -1) { 267 return; 268 } 269 270 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 271 CHECK_GE(res, 0); 272 273 if (res > 0) { 274 for (List<StreamInfo>::iterator it = mStreams.begin(); 275 it != mStreams.end(); ++it) { 276 if ((*it).mIsInjected) { 277 continue; 278 } 279 280 if (FD_ISSET(it->mRTPSocket, &rs)) { 281 receive(&*it, true); 282 } 283 if (FD_ISSET(it->mRTCPSocket, &rs)) { 284 receive(&*it, false); 285 } 286 } 287 } 288 289 postPollEvent(); 290 291 int64_t nowUs = ALooper::GetNowUs(); 292 if (mLastReceiverReportTimeUs <= 0 293 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 294 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 295 for (List<StreamInfo>::iterator it = mStreams.begin(); 296 it != mStreams.end(); ++it) { 297 StreamInfo *s = &*it; 298 299 if (s->mIsInjected) { 300 continue; 301 } 302 303 if (s->mNumRTCPPacketsReceived == 0) { 304 // We have never received any RTCP packets on this stream, 305 // we don't even know where to send a report. 306 continue; 307 } 308 309 buffer->setRange(0, 0); 310 311 for (size_t i = 0; i < s->mSources.size(); ++i) { 312 sp<ARTPSource> source = s->mSources.valueAt(i); 313 314 source->addReceiverReport(buffer); 315 316 if (mFlags & kRegularlyRequestFIR) { 317 source->addFIR(buffer); 318 } 319 } 320 321 if (buffer->size() > 0) { 322 LOG(VERBOSE) << "Sending RR..."; 323 324 ssize_t n = sendto( 325 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 326 (const struct sockaddr *)&s->mRemoteRTCPAddr, 327 sizeof(s->mRemoteRTCPAddr)); 328 CHECK_EQ(n, (ssize_t)buffer->size()); 329 330 mLastReceiverReportTimeUs = nowUs; 331 } 332 } 333 } 334} 335 336status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 337 CHECK(!s->mIsInjected); 338 339 sp<ABuffer> buffer = new ABuffer(65536); 340 341 socklen_t remoteAddrLen = 342 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 343 ? sizeof(s->mRemoteRTCPAddr) : 0; 344 345 ssize_t nbytes = recvfrom( 346 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 347 buffer->data(), 348 buffer->capacity(), 349 0, 350 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 351 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 352 353 if (nbytes < 0) { 354 return -1; 355 } 356 357 buffer->setRange(0, nbytes); 358 359 // LOG(INFO) << "received " << buffer->size() << " bytes."; 360 361 status_t err; 362 if (receiveRTP) { 363 err = parseRTP(s, buffer); 364 } else { 365 ++s->mNumRTCPPacketsReceived; 366 err = parseRTCP(s, buffer); 367 } 368 369 return err; 370} 371 372status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 373 size_t size = buffer->size(); 374 375 if (size < 12) { 376 // Too short to be a valid RTP header. 377 return -1; 378 } 379 380 const uint8_t *data = buffer->data(); 381 382 if ((data[0] >> 6) != 2) { 383 // Unsupported version. 384 return -1; 385 } 386 387 if (data[0] & 0x20) { 388 // Padding present. 389 390 size_t paddingLength = data[size - 1]; 391 392 if (paddingLength + 12 > size) { 393 // If we removed this much padding we'd end up with something 394 // that's too short to be a valid RTP header. 395 return -1; 396 } 397 398 size -= paddingLength; 399 } 400 401 int numCSRCs = data[0] & 0x0f; 402 403 size_t payloadOffset = 12 + 4 * numCSRCs; 404 405 if (size < payloadOffset) { 406 // Not enough data to fit the basic header and all the CSRC entries. 407 return -1; 408 } 409 410 if (data[0] & 0x10) { 411 // Header eXtension present. 412 413 if (size < payloadOffset + 4) { 414 // Not enough data to fit the basic header, all CSRC entries 415 // and the first 4 bytes of the extension header. 416 417 return -1; 418 } 419 420 const uint8_t *extensionData = &data[payloadOffset]; 421 422 size_t extensionLength = 423 4 * (extensionData[2] << 8 | extensionData[3]); 424 425 if (size < payloadOffset + 4 + extensionLength) { 426 return -1; 427 } 428 429 payloadOffset += 4 + extensionLength; 430 } 431 432 uint32_t srcId = u32at(&data[8]); 433 434 sp<ARTPSource> source = findSource(s, srcId); 435 436 uint32_t rtpTime = u32at(&data[4]); 437 438 sp<AMessage> meta = buffer->meta(); 439 meta->setInt32("ssrc", srcId); 440 meta->setInt32("rtp-time", rtpTime); 441 meta->setInt32("PT", data[1] & 0x7f); 442 meta->setInt32("M", data[1] >> 7); 443 444 buffer->setInt32Data(u16at(&data[2])); 445 buffer->setRange(payloadOffset, size - payloadOffset); 446 447 if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) { 448 source->timeUpdate(rtpTime, 0); 449 source->timeUpdate(rtpTime + 90000, 0x100000000ll); 450 CHECK(source->timeEstablished()); 451 } 452 453 source->processRTPPacket(buffer); 454 455 return OK; 456} 457 458status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 459 const uint8_t *data = buffer->data(); 460 size_t size = buffer->size(); 461 462 while (size > 0) { 463 if (size < 8) { 464 // Too short to be a valid RTCP header 465 return -1; 466 } 467 468 if ((data[0] >> 6) != 2) { 469 // Unsupported version. 470 return -1; 471 } 472 473 if (data[0] & 0x20) { 474 // Padding present. 475 476 size_t paddingLength = data[size - 1]; 477 478 if (paddingLength + 12 > size) { 479 // If we removed this much padding we'd end up with something 480 // that's too short to be a valid RTP header. 481 return -1; 482 } 483 484 size -= paddingLength; 485 } 486 487 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 488 489 if (size < headerLength) { 490 // Only received a partial packet? 491 return -1; 492 } 493 494 switch (data[1]) { 495 case 200: 496 { 497 parseSR(s, data, headerLength); 498 break; 499 } 500 501 case 201: // RR 502 case 202: // SDES 503 case 204: // APP 504 break; 505 506 case 205: // TSFB (transport layer specific feedback) 507 case 206: // PSFB (payload specific feedback) 508 // hexdump(data, headerLength); 509 break; 510 511 case 203: 512 { 513 parseBYE(s, data, headerLength); 514 break; 515 } 516 517 default: 518 { 519 LOG(WARNING) << "Unknown RTCP packet type " 520 << (unsigned)data[1] 521 << " of size " << headerLength; 522 break; 523 } 524 } 525 526 data += headerLength; 527 size -= headerLength; 528 } 529 530 return OK; 531} 532 533status_t ARTPConnection::parseBYE( 534 StreamInfo *s, const uint8_t *data, size_t size) { 535 size_t SC = data[0] & 0x3f; 536 537 if (SC == 0 || size < (4 + SC * 4)) { 538 // Packet too short for the minimal BYE header. 539 return -1; 540 } 541 542 uint32_t id = u32at(&data[4]); 543 544 sp<ARTPSource> source = findSource(s, id); 545 546 source->byeReceived(); 547 548 return OK; 549} 550 551status_t ARTPConnection::parseSR( 552 StreamInfo *s, const uint8_t *data, size_t size) { 553 size_t RC = data[0] & 0x1f; 554 555 if (size < (7 + RC * 6) * 4) { 556 // Packet too short for the minimal SR header. 557 return -1; 558 } 559 560 uint32_t id = u32at(&data[4]); 561 uint64_t ntpTime = u64at(&data[8]); 562 uint32_t rtpTime = u32at(&data[16]); 563 564#if 0 565 LOG(INFO) << StringPrintf( 566 "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 567 id, 568 rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 569#endif 570 571 sp<ARTPSource> source = findSource(s, id); 572 573 if ((mFlags & kFakeTimestamps) == 0) { 574 source->timeUpdate(rtpTime, ntpTime); 575 } 576 577 return 0; 578} 579 580sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 581 sp<ARTPSource> source; 582 ssize_t index = info->mSources.indexOfKey(srcId); 583 if (index < 0) { 584 index = info->mSources.size(); 585 586 source = new ARTPSource( 587 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 588 589 info->mSources.add(srcId, source); 590 } else { 591 source = info->mSources.valueAt(index); 592 } 593 594 return source; 595} 596 597void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 598 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 599 msg->setInt32("index", index); 600 msg->setObject("buffer", buffer); 601 msg->post(); 602} 603 604void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 605 int32_t index; 606 CHECK(msg->findInt32("index", &index)); 607 608 sp<RefBase> obj; 609 CHECK(msg->findObject("buffer", &obj)); 610 611 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 612 613 List<StreamInfo>::iterator it = mStreams.begin(); 614 while (it != mStreams.end() 615 && it->mRTPSocket != index && it->mRTCPSocket != index) { 616 ++it; 617 } 618 619 if (it == mStreams.end()) { 620 TRESPASS(); 621 } 622 623 StreamInfo *s = &*it; 624 625 status_t err; 626 if (it->mRTPSocket == index) { 627 err = parseRTP(s, buffer); 628 } else { 629 ++s->mNumRTCPPacketsReceived; 630 err = parseRTCP(s, buffer); 631 } 632} 633 634} // namespace android 635 636