ARTPConnection.cpp revision cf7b9c7aae758ac0b99833915053c63c2ac46e09
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "ARTPConnection.h" 18 19#include "ARTPSource.h" 20#include "ASessionDescription.h" 21 22#include <media/stagefright/foundation/ABuffer.h> 23#include <media/stagefright/foundation/ADebug.h> 24#include <media/stagefright/foundation/AMessage.h> 25#include <media/stagefright/foundation/AString.h> 26 27#include <arpa/inet.h> 28#include <sys/socket.h> 29 30#define VERBOSE 0 31 32#if VERBOSE 33#include "hexdump.h" 34#endif 35 36namespace android { 37 38static uint16_t u16at(const uint8_t *data) { 39 return data[0] << 8 | data[1]; 40} 41 42static uint32_t u32at(const uint8_t *data) { 43 return u16at(data) << 16 | u16at(&data[2]); 44} 45 46static uint64_t u64at(const uint8_t *data) { 47 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 48} 49 50// static 51const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 52 53struct ARTPConnection::StreamInfo { 54 int mRTPSocket; 55 int mRTCPSocket; 56 sp<ASessionDescription> mSessionDesc; 57 size_t mIndex; 58 sp<AMessage> mNotifyMsg; 59}; 60 61ARTPConnection::ARTPConnection() 62 : mPollEventPending(false) { 63} 64 65ARTPConnection::~ARTPConnection() { 66} 67 68void ARTPConnection::addStream( 69 int rtpSocket, int rtcpSocket, 70 const sp<ASessionDescription> &sessionDesc, 71 size_t index, 72 const sp<AMessage> ¬ify) { 73 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 74 msg->setInt32("rtp-socket", rtpSocket); 75 msg->setInt32("rtcp-socket", rtcpSocket); 76 msg->setObject("session-desc", sessionDesc); 77 msg->setSize("index", index); 78 msg->setMessage("notify", notify); 79 msg->post(); 80} 81 82void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 83 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 84 msg->setInt32("rtp-socket", rtpSocket); 85 msg->setInt32("rtcp-socket", rtcpSocket); 86 msg->post(); 87} 88 89static void bumpSocketBufferSize(int s) { 90 int size = 256 * 1024; 91 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 92} 93 94// static 95void ARTPConnection::MakePortPair( 96 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 97 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 98 CHECK_GE(*rtpSocket, 0); 99 100 bumpSocketBufferSize(*rtpSocket); 101 102 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 103 CHECK_GE(*rtcpSocket, 0); 104 105 bumpSocketBufferSize(*rtcpSocket); 106 107 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 108 start &= ~1; 109 110 for (unsigned port = start; port < 65536; port += 2) { 111 struct sockaddr_in addr; 112 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 113 addr.sin_family = AF_INET; 114 addr.sin_addr.s_addr = INADDR_ANY; 115 addr.sin_port = htons(port); 116 117 if (bind(*rtpSocket, 118 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 119 continue; 120 } 121 122 addr.sin_port = htons(port + 1); 123 124 if (bind(*rtcpSocket, 125 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 126 *rtpPort = port; 127 return; 128 } 129 } 130 131 TRESPASS(); 132} 133 134void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 135 switch (msg->what()) { 136 case kWhatAddStream: 137 { 138 onAddStream(msg); 139 break; 140 } 141 142 case kWhatRemoveStream: 143 { 144 onRemoveStream(msg); 145 break; 146 } 147 148 case kWhatPollStreams: 149 { 150 onPollStreams(); 151 break; 152 } 153 154 default: 155 { 156 TRESPASS(); 157 break; 158 } 159 } 160} 161 162void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 163 mStreams.push_back(StreamInfo()); 164 StreamInfo *info = &*--mStreams.end(); 165 166 int32_t s; 167 CHECK(msg->findInt32("rtp-socket", &s)); 168 info->mRTPSocket = s; 169 CHECK(msg->findInt32("rtcp-socket", &s)); 170 info->mRTCPSocket = s; 171 172 sp<RefBase> obj; 173 CHECK(msg->findObject("session-desc", &obj)); 174 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 175 176 CHECK(msg->findSize("index", &info->mIndex)); 177 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 178 179 postPollEvent(); 180} 181 182void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 183 int32_t rtpSocket, rtcpSocket; 184 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 185 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 186 187 List<StreamInfo>::iterator it = mStreams.begin(); 188 while (it != mStreams.end() 189 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 190 ++it; 191 } 192 193 if (it == mStreams.end()) { 194 TRESPASS(); 195 } 196 197 mStreams.erase(it); 198} 199 200void ARTPConnection::postPollEvent() { 201 if (mPollEventPending) { 202 return; 203 } 204 205 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 206 msg->post(); 207 208 mPollEventPending = true; 209} 210 211void ARTPConnection::onPollStreams() { 212 mPollEventPending = false; 213 214 if (mStreams.empty()) { 215 return; 216 } 217 218 struct timeval tv; 219 tv.tv_sec = 0; 220 tv.tv_usec = kSelectTimeoutUs; 221 222 fd_set rs; 223 FD_ZERO(&rs); 224 225 int maxSocket = -1; 226 for (List<StreamInfo>::iterator it = mStreams.begin(); 227 it != mStreams.end(); ++it) { 228 FD_SET(it->mRTPSocket, &rs); 229 FD_SET(it->mRTCPSocket, &rs); 230 231 if (it->mRTPSocket > maxSocket) { 232 maxSocket = it->mRTPSocket; 233 } 234 if (it->mRTCPSocket > maxSocket) { 235 maxSocket = it->mRTCPSocket; 236 } 237 } 238 239 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 240 CHECK_GE(res, 0); 241 242 if (res > 0) { 243 for (List<StreamInfo>::iterator it = mStreams.begin(); 244 it != mStreams.end(); ++it) { 245 if (FD_ISSET(it->mRTPSocket, &rs)) { 246 receive(&*it, true); 247 } 248 if (FD_ISSET(it->mRTCPSocket, &rs)) { 249 receive(&*it, false); 250 } 251 } 252 } 253 254 postPollEvent(); 255} 256 257status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 258 sp<ABuffer> buffer = new ABuffer(65536); 259 260 struct sockaddr_in from; 261 socklen_t fromSize = sizeof(from); 262 263 ssize_t nbytes = recvfrom( 264 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 265 buffer->data(), 266 buffer->capacity(), 267 0, 268 (struct sockaddr *)&from, 269 &fromSize); 270 271 if (nbytes < 0) { 272 return -1; 273 } 274 275 buffer->setRange(0, nbytes); 276 277 status_t err; 278 if (receiveRTP) { 279 err = parseRTP(s, buffer); 280 } else { 281 err = parseRTCP(s, buffer); 282 } 283 284 return err; 285} 286 287status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 288 size_t size = buffer->size(); 289 290 if (size < 12) { 291 // Too short to be a valid RTP header. 292 return -1; 293 } 294 295 const uint8_t *data = buffer->data(); 296 297 if ((data[0] >> 6) != 2) { 298 // Unsupported version. 299 return -1; 300 } 301 302 if (data[0] & 0x20) { 303 // Padding present. 304 305 size_t paddingLength = data[size - 1]; 306 307 if (paddingLength + 12 > size) { 308 // If we removed this much padding we'd end up with something 309 // that's too short to be a valid RTP header. 310 return -1; 311 } 312 313 size -= paddingLength; 314 } 315 316 int numCSRCs = data[0] & 0x0f; 317 318 size_t payloadOffset = 12 + 4 * numCSRCs; 319 320 if (size < payloadOffset) { 321 // Not enough data to fit the basic header and all the CSRC entries. 322 return -1; 323 } 324 325 if (data[0] & 0x10) { 326 // Header eXtension present. 327 328 if (size < payloadOffset + 4) { 329 // Not enough data to fit the basic header, all CSRC entries 330 // and the first 4 bytes of the extension header. 331 332 return -1; 333 } 334 335 const uint8_t *extensionData = &data[payloadOffset]; 336 337 size_t extensionLength = 338 4 * (extensionData[2] << 8 | extensionData[3]); 339 340 if (size < payloadOffset + 4 + extensionLength) { 341 return -1; 342 } 343 344 payloadOffset += 4 + extensionLength; 345 } 346 347 uint32_t srcId = u32at(&data[8]); 348 349 sp<ARTPSource> source; 350 ssize_t index = mSources.indexOfKey(srcId); 351 if (index < 0) { 352 index = mSources.size(); 353 354 source = new ARTPSource( 355 srcId, s->mSessionDesc, s->mIndex, s->mNotifyMsg); 356 357 mSources.add(srcId, source); 358 } else { 359 source = mSources.valueAt(index); 360 } 361 362 uint32_t rtpTime = u32at(&data[4]); 363 364 sp<AMessage> meta = buffer->meta(); 365 meta->setInt32("ssrc", srcId); 366 meta->setInt32("rtp-time", rtpTime); 367 meta->setInt32("PT", data[1] & 0x7f); 368 meta->setInt32("M", data[1] >> 7); 369 370 buffer->setInt32Data(u16at(&data[2])); 371 372#if VERBOSE 373 printf("RTP = {\n" 374 " PT: %d\n" 375 " sequence number: %d\n" 376 " RTP-time: 0x%08x\n" 377 " M: %d\n" 378 " SSRC: 0x%08x\n" 379 "}\n", 380 data[1] & 0x7f, 381 u16at(&data[2]), 382 rtpTime, 383 data[1] >> 7, 384 srcId); 385 386 // hexdump(&data[payloadOffset], size - payloadOffset); 387#endif 388 389 buffer->setRange(payloadOffset, size - payloadOffset); 390 391 source->processRTPPacket(buffer); 392 393 return OK; 394} 395 396status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 397 const uint8_t *data = buffer->data(); 398 size_t size = buffer->size(); 399 400 while (size > 0) { 401 if (size < 8) { 402 // Too short to be a valid RTCP header 403 return -1; 404 } 405 406 if ((data[0] >> 6) != 2) { 407 // Unsupported version. 408 return -1; 409 } 410 411 if (data[0] & 0x20) { 412 // Padding present. 413 414 size_t paddingLength = data[size - 1]; 415 416 if (paddingLength + 12 > size) { 417 // If we removed this much padding we'd end up with something 418 // that's too short to be a valid RTP header. 419 return -1; 420 } 421 422 size -= paddingLength; 423 } 424 425 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 426 427 if (size < headerLength) { 428 // Only received a partial packet? 429 return -1; 430 } 431 432 switch (data[1]) { 433 case 200: 434 { 435 parseSR(s, data, headerLength); 436 break; 437 } 438 439 default: 440 { 441#if VERBOSE 442 printf("Unknown RTCP packet type %d of size %ld\n", 443 data[1], headerLength); 444 445 hexdump(data, headerLength); 446#endif 447 break; 448 } 449 } 450 451 data += headerLength; 452 size -= headerLength; 453 } 454 455 return OK; 456} 457 458status_t ARTPConnection::parseSR( 459 StreamInfo *s, const uint8_t *data, size_t size) { 460 size_t RC = data[0] & 0x1f; 461 462 if (size < (7 + RC * 6) * 4) { 463 // Packet too short for the minimal SR header. 464 return -1; 465 } 466 467 uint32_t id = u32at(&data[4]); 468 uint64_t ntpTime = u64at(&data[8]); 469 uint32_t rtpTime = u32at(&data[16]); 470 471#if VERBOSE 472 printf("SR = {\n" 473 " SSRC: 0x%08x\n" 474 " NTP-time: 0x%016llx\n" 475 " RTP-time: 0x%08x\n" 476 "}\n", 477 id, ntpTime, rtpTime); 478#endif 479 480 sp<ARTPSource> source; 481 ssize_t index = mSources.indexOfKey(id); 482 if (index < 0) { 483 index = mSources.size(); 484 485 source = new ARTPSource( 486 id, s->mSessionDesc, s->mIndex, s->mNotifyMsg); 487 488 mSources.add(id, source); 489 } else { 490 source = mSources.valueAt(index); 491 } 492 493 source->timeUpdate(rtpTime, ntpTime); 494 495 return 0; 496} 497 498} // namespace android 499 500