ARTPConnection.cpp revision a979ad6739d573b3823b0fe7321f554ef5544753
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "ARTPConnection.h" 18 19#include "ARTPSource.h" 20#include "ASessionDescription.h" 21 22#include <media/stagefright/foundation/ABuffer.h> 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/foundation/AString.h> 26#include <media/stagefright/foundation/hexdump.h> 27 28#include <arpa/inet.h> 29#include <sys/socket.h> 30 31namespace android { 32 33static const size_t kMaxUDPSize = 1500; 34 35static uint16_t u16at(const uint8_t *data) { 36 return data[0] << 8 | data[1]; 37} 38 39static uint32_t u32at(const uint8_t *data) { 40 return u16at(data) << 16 | u16at(&data[2]); 41} 42 43static uint64_t u64at(const uint8_t *data) { 44 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 45} 46 47// static 48const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 49 50struct ARTPConnection::StreamInfo { 51 int mRTPSocket; 52 int mRTCPSocket; 53 sp<ASessionDescription> mSessionDesc; 54 size_t mIndex; 55 sp<AMessage> mNotifyMsg; 56 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 57 58 int32_t mNumRTCPPacketsReceived; 59 struct sockaddr_in mRemoteRTCPAddr; 60}; 61 62ARTPConnection::ARTPConnection(uint32_t flags) 63 : mFlags(flags), 64 mPollEventPending(false), 65 mLastReceiverReportTimeUs(-1) { 66} 67 68ARTPConnection::~ARTPConnection() { 69} 70 71void ARTPConnection::addStream( 72 int rtpSocket, int rtcpSocket, 73 const sp<ASessionDescription> &sessionDesc, 74 size_t index, 75 const sp<AMessage> ¬ify) { 76 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 77 msg->setInt32("rtp-socket", rtpSocket); 78 msg->setInt32("rtcp-socket", rtcpSocket); 79 msg->setObject("session-desc", sessionDesc); 80 msg->setSize("index", index); 81 msg->setMessage("notify", notify); 82 msg->post(); 83} 84 85void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 86 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 87 msg->setInt32("rtp-socket", rtpSocket); 88 msg->setInt32("rtcp-socket", rtcpSocket); 89 msg->post(); 90} 91 92static void bumpSocketBufferSize(int s) { 93 int size = 256 * 1024; 94 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 95} 96 97// static 98void ARTPConnection::MakePortPair( 99 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 100 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 101 CHECK_GE(*rtpSocket, 0); 102 103 bumpSocketBufferSize(*rtpSocket); 104 105 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 106 CHECK_GE(*rtcpSocket, 0); 107 108 bumpSocketBufferSize(*rtcpSocket); 109 110 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 111 start &= ~1; 112 113 for (unsigned port = start; port < 65536; port += 2) { 114 struct sockaddr_in addr; 115 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 116 addr.sin_family = AF_INET; 117 addr.sin_addr.s_addr = INADDR_ANY; 118 addr.sin_port = htons(port); 119 120 if (bind(*rtpSocket, 121 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 122 continue; 123 } 124 125 addr.sin_port = htons(port + 1); 126 127 if (bind(*rtcpSocket, 128 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 129 *rtpPort = port; 130 return; 131 } 132 } 133 134 TRESPASS(); 135} 136 137void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 138 switch (msg->what()) { 139 case kWhatAddStream: 140 { 141 onAddStream(msg); 142 break; 143 } 144 145 case kWhatRemoveStream: 146 { 147 onRemoveStream(msg); 148 break; 149 } 150 151 case kWhatPollStreams: 152 { 153 onPollStreams(); 154 break; 155 } 156 157 default: 158 { 159 TRESPASS(); 160 break; 161 } 162 } 163} 164 165void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 166 mStreams.push_back(StreamInfo()); 167 StreamInfo *info = &*--mStreams.end(); 168 169 int32_t s; 170 CHECK(msg->findInt32("rtp-socket", &s)); 171 info->mRTPSocket = s; 172 CHECK(msg->findInt32("rtcp-socket", &s)); 173 info->mRTCPSocket = s; 174 175 sp<RefBase> obj; 176 CHECK(msg->findObject("session-desc", &obj)); 177 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 178 179 CHECK(msg->findSize("index", &info->mIndex)); 180 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 181 182 info->mNumRTCPPacketsReceived = 0; 183 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 184 185 postPollEvent(); 186} 187 188void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 189 int32_t rtpSocket, rtcpSocket; 190 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 191 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 192 193 List<StreamInfo>::iterator it = mStreams.begin(); 194 while (it != mStreams.end() 195 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 196 ++it; 197 } 198 199 if (it == mStreams.end()) { 200 TRESPASS(); 201 } 202 203 mStreams.erase(it); 204} 205 206void ARTPConnection::postPollEvent() { 207 if (mPollEventPending) { 208 return; 209 } 210 211 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 212 msg->post(); 213 214 mPollEventPending = true; 215} 216 217void ARTPConnection::onPollStreams() { 218 mPollEventPending = false; 219 220 if (mStreams.empty()) { 221 return; 222 } 223 224 struct timeval tv; 225 tv.tv_sec = 0; 226 tv.tv_usec = kSelectTimeoutUs; 227 228 fd_set rs; 229 FD_ZERO(&rs); 230 231 int maxSocket = -1; 232 for (List<StreamInfo>::iterator it = mStreams.begin(); 233 it != mStreams.end(); ++it) { 234 FD_SET(it->mRTPSocket, &rs); 235 FD_SET(it->mRTCPSocket, &rs); 236 237 if (it->mRTPSocket > maxSocket) { 238 maxSocket = it->mRTPSocket; 239 } 240 if (it->mRTCPSocket > maxSocket) { 241 maxSocket = it->mRTCPSocket; 242 } 243 } 244 245 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 246 CHECK_GE(res, 0); 247 248 if (res > 0) { 249 for (List<StreamInfo>::iterator it = mStreams.begin(); 250 it != mStreams.end(); ++it) { 251 if (FD_ISSET(it->mRTPSocket, &rs)) { 252 receive(&*it, true); 253 } 254 if (FD_ISSET(it->mRTCPSocket, &rs)) { 255 receive(&*it, false); 256 } 257 } 258 } 259 260 postPollEvent(); 261 262 int64_t nowUs = ALooper::GetNowUs(); 263 if (mLastReceiverReportTimeUs <= 0 264 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 265 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 266 for (List<StreamInfo>::iterator it = mStreams.begin(); 267 it != mStreams.end(); ++it) { 268 StreamInfo *s = &*it; 269 270 if (s->mNumRTCPPacketsReceived == 0) { 271 // We have never received any RTCP packets on this stream, 272 // we don't even know where to send a report. 273 continue; 274 } 275 276 buffer->setRange(0, 0); 277 278 for (size_t i = 0; i < s->mSources.size(); ++i) { 279 sp<ARTPSource> source = s->mSources.valueAt(i); 280 281 source->addReceiverReport(buffer); 282 283 if (mFlags & kRegularlyRequestFIR) { 284 source->addFIR(buffer); 285 } 286 } 287 288 if (buffer->size() > 0) { 289 LOG(VERBOSE) << "Sending RR..."; 290 291 ssize_t n = sendto( 292 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 293 (const struct sockaddr *)&s->mRemoteRTCPAddr, 294 sizeof(s->mRemoteRTCPAddr)); 295 CHECK_EQ(n, (ssize_t)buffer->size()); 296 297 mLastReceiverReportTimeUs = nowUs; 298 } 299 } 300 } 301} 302 303status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 304 sp<ABuffer> buffer = new ABuffer(65536); 305 306 socklen_t remoteAddrLen = 307 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 308 ? sizeof(s->mRemoteRTCPAddr) : 0; 309 310 ssize_t nbytes = recvfrom( 311 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 312 buffer->data(), 313 buffer->capacity(), 314 0, 315 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 316 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 317 318 if (nbytes < 0) { 319 return -1; 320 } 321 322 buffer->setRange(0, nbytes); 323 324 // LOG(INFO) << "received " << buffer->size() << " bytes."; 325 326 status_t err; 327 if (receiveRTP) { 328 err = parseRTP(s, buffer); 329 } else { 330 ++s->mNumRTCPPacketsReceived; 331 err = parseRTCP(s, buffer); 332 } 333 334 return err; 335} 336 337status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 338 size_t size = buffer->size(); 339 340 if (size < 12) { 341 // Too short to be a valid RTP header. 342 return -1; 343 } 344 345 const uint8_t *data = buffer->data(); 346 347 if ((data[0] >> 6) != 2) { 348 // Unsupported version. 349 return -1; 350 } 351 352 if (data[0] & 0x20) { 353 // Padding present. 354 355 size_t paddingLength = data[size - 1]; 356 357 if (paddingLength + 12 > size) { 358 // If we removed this much padding we'd end up with something 359 // that's too short to be a valid RTP header. 360 return -1; 361 } 362 363 size -= paddingLength; 364 } 365 366 int numCSRCs = data[0] & 0x0f; 367 368 size_t payloadOffset = 12 + 4 * numCSRCs; 369 370 if (size < payloadOffset) { 371 // Not enough data to fit the basic header and all the CSRC entries. 372 return -1; 373 } 374 375 if (data[0] & 0x10) { 376 // Header eXtension present. 377 378 if (size < payloadOffset + 4) { 379 // Not enough data to fit the basic header, all CSRC entries 380 // and the first 4 bytes of the extension header. 381 382 return -1; 383 } 384 385 const uint8_t *extensionData = &data[payloadOffset]; 386 387 size_t extensionLength = 388 4 * (extensionData[2] << 8 | extensionData[3]); 389 390 if (size < payloadOffset + 4 + extensionLength) { 391 return -1; 392 } 393 394 payloadOffset += 4 + extensionLength; 395 } 396 397 uint32_t srcId = u32at(&data[8]); 398 399 sp<ARTPSource> source = findSource(s, srcId); 400 401 uint32_t rtpTime = u32at(&data[4]); 402 403 sp<AMessage> meta = buffer->meta(); 404 meta->setInt32("ssrc", srcId); 405 meta->setInt32("rtp-time", rtpTime); 406 meta->setInt32("PT", data[1] & 0x7f); 407 meta->setInt32("M", data[1] >> 7); 408 409 buffer->setInt32Data(u16at(&data[2])); 410 buffer->setRange(payloadOffset, size - payloadOffset); 411 412 if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) { 413 source->timeUpdate(rtpTime, 0); 414 source->timeUpdate(rtpTime + 90000, 0x100000000ll); 415 CHECK(source->timeEstablished()); 416 } 417 418 source->processRTPPacket(buffer); 419 420 return OK; 421} 422 423status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 424 const uint8_t *data = buffer->data(); 425 size_t size = buffer->size(); 426 427 while (size > 0) { 428 if (size < 8) { 429 // Too short to be a valid RTCP header 430 return -1; 431 } 432 433 if ((data[0] >> 6) != 2) { 434 // Unsupported version. 435 return -1; 436 } 437 438 if (data[0] & 0x20) { 439 // Padding present. 440 441 size_t paddingLength = data[size - 1]; 442 443 if (paddingLength + 12 > size) { 444 // If we removed this much padding we'd end up with something 445 // that's too short to be a valid RTP header. 446 return -1; 447 } 448 449 size -= paddingLength; 450 } 451 452 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 453 454 if (size < headerLength) { 455 // Only received a partial packet? 456 return -1; 457 } 458 459 switch (data[1]) { 460 case 200: 461 { 462 parseSR(s, data, headerLength); 463 break; 464 } 465 466 case 201: // RR 467 case 202: // SDES 468 case 204: // APP 469 break; 470 471 case 205: // TSFB (transport layer specific feedback) 472 case 206: // PSFB (payload specific feedback) 473 // hexdump(data, headerLength); 474 break; 475 476 case 203: 477 { 478 parseBYE(s, data, headerLength); 479 break; 480 } 481 482 default: 483 { 484 LOG(WARNING) << "Unknown RTCP packet type " 485 << (unsigned)data[1] 486 << " of size " << headerLength; 487 break; 488 } 489 } 490 491 data += headerLength; 492 size -= headerLength; 493 } 494 495 return OK; 496} 497 498status_t ARTPConnection::parseBYE( 499 StreamInfo *s, const uint8_t *data, size_t size) { 500 size_t SC = data[0] & 0x3f; 501 502 if (SC == 0 || size < (4 + SC * 4)) { 503 // Packet too short for the minimal BYE header. 504 return -1; 505 } 506 507 uint32_t id = u32at(&data[4]); 508 509 sp<ARTPSource> source = findSource(s, id); 510 511 source->byeReceived(); 512 513 return OK; 514} 515 516status_t ARTPConnection::parseSR( 517 StreamInfo *s, const uint8_t *data, size_t size) { 518 size_t RC = data[0] & 0x1f; 519 520 if (size < (7 + RC * 6) * 4) { 521 // Packet too short for the minimal SR header. 522 return -1; 523 } 524 525 uint32_t id = u32at(&data[4]); 526 uint64_t ntpTime = u64at(&data[8]); 527 uint32_t rtpTime = u32at(&data[16]); 528 529#if 0 530 LOG(INFO) << StringPrintf( 531 "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 532 id, 533 rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 534#endif 535 536 sp<ARTPSource> source = findSource(s, id); 537 538 if ((mFlags & kFakeTimestamps) == 0) { 539 source->timeUpdate(rtpTime, ntpTime); 540 } 541 542 return 0; 543} 544 545sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 546 sp<ARTPSource> source; 547 ssize_t index = info->mSources.indexOfKey(srcId); 548 if (index < 0) { 549 index = info->mSources.size(); 550 551 source = new ARTPSource( 552 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 553 554 info->mSources.add(srcId, source); 555 } else { 556 source = info->mSources.valueAt(index); 557 } 558 559 return source; 560} 561 562} // namespace android 563 564