ARTPConnection.cpp revision 39ddf8e0f18766f7ba1e3246b774aa6ebd93eea8
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "ARTPConnection.h" 18 19#include "ARTPSource.h" 20#include "ASessionDescription.h" 21 22#include <media/stagefright/foundation/ABuffer.h> 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/foundation/AString.h> 26#include <media/stagefright/foundation/hexdump.h> 27 28#include <arpa/inet.h> 29#include <sys/socket.h> 30 31#define IGNORE_RTCP_TIME 0 32 33namespace android { 34 35static const size_t kMaxUDPSize = 1500; 36 37static uint16_t u16at(const uint8_t *data) { 38 return data[0] << 8 | data[1]; 39} 40 41static uint32_t u32at(const uint8_t *data) { 42 return u16at(data) << 16 | u16at(&data[2]); 43} 44 45static uint64_t u64at(const uint8_t *data) { 46 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 47} 48 49// static 50const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 51 52struct ARTPConnection::StreamInfo { 53 int mRTPSocket; 54 int mRTCPSocket; 55 sp<ASessionDescription> mSessionDesc; 56 size_t mIndex; 57 sp<AMessage> mNotifyMsg; 58 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 59 60 int32_t mNumRTCPPacketsReceived; 61 struct sockaddr_in mRemoteRTCPAddr; 62}; 63 64ARTPConnection::ARTPConnection() 65 : mPollEventPending(false), 66 mLastReceiverReportTimeUs(-1) { 67} 68 69ARTPConnection::~ARTPConnection() { 70} 71 72void ARTPConnection::addStream( 73 int rtpSocket, int rtcpSocket, 74 const sp<ASessionDescription> &sessionDesc, 75 size_t index, 76 const sp<AMessage> ¬ify) { 77 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 78 msg->setInt32("rtp-socket", rtpSocket); 79 msg->setInt32("rtcp-socket", rtcpSocket); 80 msg->setObject("session-desc", sessionDesc); 81 msg->setSize("index", index); 82 msg->setMessage("notify", notify); 83 msg->post(); 84} 85 86void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 87 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 88 msg->setInt32("rtp-socket", rtpSocket); 89 msg->setInt32("rtcp-socket", rtcpSocket); 90 msg->post(); 91} 92 93static void bumpSocketBufferSize(int s) { 94 int size = 256 * 1024; 95 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 96} 97 98// static 99void ARTPConnection::MakePortPair( 100 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 101 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 102 CHECK_GE(*rtpSocket, 0); 103 104 bumpSocketBufferSize(*rtpSocket); 105 106 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 107 CHECK_GE(*rtcpSocket, 0); 108 109 bumpSocketBufferSize(*rtcpSocket); 110 111 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 112 start &= ~1; 113 114 for (unsigned port = start; port < 65536; port += 2) { 115 struct sockaddr_in addr; 116 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 117 addr.sin_family = AF_INET; 118 addr.sin_addr.s_addr = INADDR_ANY; 119 addr.sin_port = htons(port); 120 121 if (bind(*rtpSocket, 122 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 123 continue; 124 } 125 126 addr.sin_port = htons(port + 1); 127 128 if (bind(*rtcpSocket, 129 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 130 *rtpPort = port; 131 return; 132 } 133 } 134 135 TRESPASS(); 136} 137 138void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 139 switch (msg->what()) { 140 case kWhatAddStream: 141 { 142 onAddStream(msg); 143 break; 144 } 145 146 case kWhatRemoveStream: 147 { 148 onRemoveStream(msg); 149 break; 150 } 151 152 case kWhatPollStreams: 153 { 154 onPollStreams(); 155 break; 156 } 157 158 default: 159 { 160 TRESPASS(); 161 break; 162 } 163 } 164} 165 166void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 167 mStreams.push_back(StreamInfo()); 168 StreamInfo *info = &*--mStreams.end(); 169 170 int32_t s; 171 CHECK(msg->findInt32("rtp-socket", &s)); 172 info->mRTPSocket = s; 173 CHECK(msg->findInt32("rtcp-socket", &s)); 174 info->mRTCPSocket = s; 175 176 sp<RefBase> obj; 177 CHECK(msg->findObject("session-desc", &obj)); 178 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 179 180 CHECK(msg->findSize("index", &info->mIndex)); 181 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 182 183 info->mNumRTCPPacketsReceived = 0; 184 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 185 186 postPollEvent(); 187} 188 189void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 190 int32_t rtpSocket, rtcpSocket; 191 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 192 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 193 194 List<StreamInfo>::iterator it = mStreams.begin(); 195 while (it != mStreams.end() 196 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 197 ++it; 198 } 199 200 if (it == mStreams.end()) { 201 TRESPASS(); 202 } 203 204 mStreams.erase(it); 205} 206 207void ARTPConnection::postPollEvent() { 208 if (mPollEventPending) { 209 return; 210 } 211 212 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 213 msg->post(); 214 215 mPollEventPending = true; 216} 217 218void ARTPConnection::onPollStreams() { 219 mPollEventPending = false; 220 221 if (mStreams.empty()) { 222 return; 223 } 224 225 struct timeval tv; 226 tv.tv_sec = 0; 227 tv.tv_usec = kSelectTimeoutUs; 228 229 fd_set rs; 230 FD_ZERO(&rs); 231 232 int maxSocket = -1; 233 for (List<StreamInfo>::iterator it = mStreams.begin(); 234 it != mStreams.end(); ++it) { 235 FD_SET(it->mRTPSocket, &rs); 236 FD_SET(it->mRTCPSocket, &rs); 237 238 if (it->mRTPSocket > maxSocket) { 239 maxSocket = it->mRTPSocket; 240 } 241 if (it->mRTCPSocket > maxSocket) { 242 maxSocket = it->mRTCPSocket; 243 } 244 } 245 246 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 247 CHECK_GE(res, 0); 248 249 if (res > 0) { 250 for (List<StreamInfo>::iterator it = mStreams.begin(); 251 it != mStreams.end(); ++it) { 252 if (FD_ISSET(it->mRTPSocket, &rs)) { 253 receive(&*it, true); 254 } 255 if (FD_ISSET(it->mRTCPSocket, &rs)) { 256 receive(&*it, false); 257 } 258 } 259 } 260 261 postPollEvent(); 262 263 int64_t nowUs = ALooper::GetNowUs(); 264 if (mLastReceiverReportTimeUs <= 0 265 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 266 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 267 for (List<StreamInfo>::iterator it = mStreams.begin(); 268 it != mStreams.end(); ++it) { 269 StreamInfo *s = &*it; 270 271 if (s->mNumRTCPPacketsReceived == 0) { 272 // We have never received any RTCP packets on this stream, 273 // we don't even know where to send a report. 274 continue; 275 } 276 277 buffer->setRange(0, 0); 278 279 for (size_t i = 0; i < s->mSources.size(); ++i) { 280 sp<ARTPSource> source = s->mSources.valueAt(i); 281 282 source->addReceiverReport(buffer); 283 source->addFIR(buffer); 284 } 285 286 if (buffer->size() > 0) { 287 LOG(VERBOSE) << "Sending RR..."; 288 289 ssize_t n = sendto( 290 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 291 (const struct sockaddr *)&s->mRemoteRTCPAddr, 292 sizeof(s->mRemoteRTCPAddr)); 293 CHECK_EQ(n, (ssize_t)buffer->size()); 294 295 mLastReceiverReportTimeUs = nowUs; 296 } 297 } 298 } 299} 300 301status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 302 sp<ABuffer> buffer = new ABuffer(65536); 303 304 socklen_t remoteAddrLen = 305 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 306 ? sizeof(s->mRemoteRTCPAddr) : 0; 307 308 ssize_t nbytes = recvfrom( 309 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 310 buffer->data(), 311 buffer->capacity(), 312 0, 313 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 314 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 315 316 if (nbytes < 0) { 317 return -1; 318 } 319 320 buffer->setRange(0, nbytes); 321 322 status_t err; 323 if (receiveRTP) { 324 err = parseRTP(s, buffer); 325 } else { 326 ++s->mNumRTCPPacketsReceived; 327 err = parseRTCP(s, buffer); 328 } 329 330 return err; 331} 332 333status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 334 size_t size = buffer->size(); 335 336 if (size < 12) { 337 // Too short to be a valid RTP header. 338 return -1; 339 } 340 341 const uint8_t *data = buffer->data(); 342 343 if ((data[0] >> 6) != 2) { 344 // Unsupported version. 345 return -1; 346 } 347 348 if (data[0] & 0x20) { 349 // Padding present. 350 351 size_t paddingLength = data[size - 1]; 352 353 if (paddingLength + 12 > size) { 354 // If we removed this much padding we'd end up with something 355 // that's too short to be a valid RTP header. 356 return -1; 357 } 358 359 size -= paddingLength; 360 } 361 362 int numCSRCs = data[0] & 0x0f; 363 364 size_t payloadOffset = 12 + 4 * numCSRCs; 365 366 if (size < payloadOffset) { 367 // Not enough data to fit the basic header and all the CSRC entries. 368 return -1; 369 } 370 371 if (data[0] & 0x10) { 372 // Header eXtension present. 373 374 if (size < payloadOffset + 4) { 375 // Not enough data to fit the basic header, all CSRC entries 376 // and the first 4 bytes of the extension header. 377 378 return -1; 379 } 380 381 const uint8_t *extensionData = &data[payloadOffset]; 382 383 size_t extensionLength = 384 4 * (extensionData[2] << 8 | extensionData[3]); 385 386 if (size < payloadOffset + 4 + extensionLength) { 387 return -1; 388 } 389 390 payloadOffset += 4 + extensionLength; 391 } 392 393 uint32_t srcId = u32at(&data[8]); 394 395 sp<ARTPSource> source = findSource(s, srcId); 396 397 uint32_t rtpTime = u32at(&data[4]); 398 399 sp<AMessage> meta = buffer->meta(); 400 meta->setInt32("ssrc", srcId); 401 meta->setInt32("rtp-time", rtpTime); 402 meta->setInt32("PT", data[1] & 0x7f); 403 meta->setInt32("M", data[1] >> 7); 404 405 buffer->setInt32Data(u16at(&data[2])); 406 buffer->setRange(payloadOffset, size - payloadOffset); 407 408 source->processRTPPacket(buffer); 409 410 return OK; 411} 412 413status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 414 const uint8_t *data = buffer->data(); 415 size_t size = buffer->size(); 416 417 while (size > 0) { 418 if (size < 8) { 419 // Too short to be a valid RTCP header 420 return -1; 421 } 422 423 if ((data[0] >> 6) != 2) { 424 // Unsupported version. 425 return -1; 426 } 427 428 if (data[0] & 0x20) { 429 // Padding present. 430 431 size_t paddingLength = data[size - 1]; 432 433 if (paddingLength + 12 > size) { 434 // If we removed this much padding we'd end up with something 435 // that's too short to be a valid RTP header. 436 return -1; 437 } 438 439 size -= paddingLength; 440 } 441 442 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 443 444 if (size < headerLength) { 445 // Only received a partial packet? 446 return -1; 447 } 448 449 switch (data[1]) { 450 case 200: 451 { 452 parseSR(s, data, headerLength); 453 break; 454 } 455 456 case 201: // RR 457 case 202: // SDES 458 case 204: // APP 459 break; 460 461 case 205: // TSFB (transport layer specific feedback) 462 case 206: // PSFB (payload specific feedback) 463 // hexdump(data, headerLength); 464 break; 465 466 case 203: 467 { 468 parseBYE(s, data, headerLength); 469 break; 470 } 471 472 default: 473 { 474 LOG(WARNING) << "Unknown RTCP packet type " 475 << (unsigned)data[1] 476 << " of size " << headerLength; 477 break; 478 } 479 } 480 481 data += headerLength; 482 size -= headerLength; 483 } 484 485 return OK; 486} 487 488status_t ARTPConnection::parseBYE( 489 StreamInfo *s, const uint8_t *data, size_t size) { 490 size_t SC = data[0] & 0x3f; 491 492 if (SC == 0 || size < (4 + SC * 4)) { 493 // Packet too short for the minimal BYE header. 494 return -1; 495 } 496 497 uint32_t id = u32at(&data[4]); 498 499 sp<ARTPSource> source = findSource(s, id); 500 501 source->byeReceived(); 502 503 return OK; 504} 505 506status_t ARTPConnection::parseSR( 507 StreamInfo *s, const uint8_t *data, size_t size) { 508 size_t RC = data[0] & 0x1f; 509 510 if (size < (7 + RC * 6) * 4) { 511 // Packet too short for the minimal SR header. 512 return -1; 513 } 514 515 uint32_t id = u32at(&data[4]); 516 uint64_t ntpTime = u64at(&data[8]); 517 uint32_t rtpTime = u32at(&data[16]); 518 519#if 0 520 LOG(INFO) << StringPrintf( 521 "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 522 id, 523 rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 524#endif 525 526 sp<ARTPSource> source = findSource(s, id); 527 528#if !IGNORE_RTCP_TIME 529 source->timeUpdate(rtpTime, ntpTime); 530#endif 531 532 return 0; 533} 534 535sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 536 sp<ARTPSource> source; 537 ssize_t index = info->mSources.indexOfKey(srcId); 538 if (index < 0) { 539 index = info->mSources.size(); 540 541 source = new ARTPSource( 542 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 543 544#if IGNORE_RTCP_TIME 545 // For H.263 gtalk to work... 546 source->timeUpdate(0, 0); 547 source->timeUpdate(30, 0x100000000ll); 548#endif 549 550 info->mSources.add(srcId, source); 551 } else { 552 source = info->mSources.valueAt(index); 553 } 554 555 return source; 556} 557 558} // namespace android 559 560