ARTSPConnection.cpp revision 6e3fa444c5b3970666707bb2b6d25e2615dafe80
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "ARTSPConnection" 19#include <utils/Log.h> 20 21#include "ARTSPConnection.h" 22 23#include <media/stagefright/foundation/ABuffer.h> 24#include <media/stagefright/foundation/ADebug.h> 25#include <media/stagefright/foundation/AMessage.h> 26#include <media/stagefright/MediaErrors.h> 27 28#include <arpa/inet.h> 29#include <fcntl.h> 30#include <netdb.h> 31#include <sys/socket.h> 32 33namespace android { 34 35// static 36const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll; 37 38ARTSPConnection::ARTSPConnection() 39 : mState(DISCONNECTED), 40 mSocket(-1), 41 mConnectionID(0), 42 mNextCSeq(0), 43 mReceiveResponseEventPending(false) { 44} 45 46ARTSPConnection::~ARTSPConnection() { 47 if (mSocket >= 0) { 48 LOGE("Connection is still open, closing the socket."); 49 close(mSocket); 50 mSocket = -1; 51 } 52} 53 54void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) { 55 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 56 msg->setString("url", url); 57 msg->setMessage("reply", reply); 58 msg->post(); 59} 60 61void ARTSPConnection::disconnect(const sp<AMessage> &reply) { 62 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 63 msg->setMessage("reply", reply); 64 msg->post(); 65} 66 67void ARTSPConnection::sendRequest( 68 const char *request, const sp<AMessage> &reply) { 69 sp<AMessage> msg = new AMessage(kWhatSendRequest, id()); 70 msg->setString("request", request); 71 msg->setMessage("reply", reply); 72 msg->post(); 73} 74 75void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) { 76 sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id()); 77 msg->setMessage("reply", reply); 78 msg->post(); 79} 80 81void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { 82 switch (msg->what()) { 83 case kWhatConnect: 84 onConnect(msg); 85 break; 86 87 case kWhatDisconnect: 88 onDisconnect(msg); 89 break; 90 91 case kWhatCompleteConnection: 92 onCompleteConnection(msg); 93 break; 94 95 case kWhatSendRequest: 96 onSendRequest(msg); 97 break; 98 99 case kWhatReceiveResponse: 100 onReceiveResponse(); 101 break; 102 103 case kWhatObserveBinaryData: 104 { 105 CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); 106 break; 107 } 108 109 default: 110 TRESPASS(); 111 break; 112 } 113} 114 115// static 116bool ARTSPConnection::ParseURL( 117 const char *url, AString *host, unsigned *port, AString *path) { 118 host->clear(); 119 *port = 0; 120 path->clear(); 121 122 if (strncasecmp("rtsp://", url, 7)) { 123 return false; 124 } 125 126 const char *slashPos = strchr(&url[7], '/'); 127 128 if (slashPos == NULL) { 129 host->setTo(&url[7]); 130 path->setTo("/"); 131 } else { 132 host->setTo(&url[7], slashPos - &url[7]); 133 path->setTo(slashPos); 134 } 135 136 char *colonPos = strchr(host->c_str(), ':'); 137 138 if (colonPos != NULL) { 139 unsigned long x; 140 if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) { 141 return false; 142 } 143 144 *port = x; 145 146 size_t colonOffset = colonPos - host->c_str(); 147 size_t trailing = host->size() - colonOffset; 148 host->erase(colonOffset, trailing); 149 } else { 150 *port = 554; 151 } 152 153 return true; 154} 155 156static void MakeSocketBlocking(int s, bool blocking) { 157 // Make socket non-blocking. 158 int flags = fcntl(s, F_GETFL, 0); 159 CHECK_NE(flags, -1); 160 161 if (blocking) { 162 flags &= ~O_NONBLOCK; 163 } else { 164 flags |= O_NONBLOCK; 165 } 166 167 CHECK_NE(fcntl(s, F_SETFL, flags), -1); 168} 169 170void ARTSPConnection::onConnect(const sp<AMessage> &msg) { 171 ++mConnectionID; 172 173 if (mState != DISCONNECTED) { 174 close(mSocket); 175 mSocket = -1; 176 177 flushPendingRequests(); 178 } 179 180 mState = CONNECTING; 181 182 AString url; 183 CHECK(msg->findString("url", &url)); 184 185 sp<AMessage> reply; 186 CHECK(msg->findMessage("reply", &reply)); 187 188 AString host, path; 189 unsigned port; 190 if (!ParseURL(url.c_str(), &host, &port, &path)) { 191 LOGE("Malformed rtsp url %s", url.c_str()); 192 193 reply->setInt32("result", ERROR_MALFORMED); 194 reply->post(); 195 196 mState = DISCONNECTED; 197 return; 198 } 199 200 struct hostent *ent = gethostbyname(host.c_str()); 201 if (ent == NULL) { 202 LOGE("Unknown host %s", host.c_str()); 203 204 reply->setInt32("result", -ENOENT); 205 reply->post(); 206 207 mState = DISCONNECTED; 208 return; 209 } 210 211 mSocket = socket(AF_INET, SOCK_STREAM, 0); 212 213 MakeSocketBlocking(mSocket, false); 214 215 struct sockaddr_in remote; 216 memset(remote.sin_zero, 0, sizeof(remote.sin_zero)); 217 remote.sin_family = AF_INET; 218 remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 219 remote.sin_port = htons(port); 220 221 int err = ::connect( 222 mSocket, (const struct sockaddr *)&remote, sizeof(remote)); 223 224 reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr)); 225 226 if (err < 0) { 227 if (errno == EINPROGRESS) { 228 sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id()); 229 msg->setMessage("reply", reply); 230 msg->setInt32("connection-id", mConnectionID); 231 msg->post(); 232 return; 233 } 234 235 reply->setInt32("result", -errno); 236 mState = DISCONNECTED; 237 238 close(mSocket); 239 mSocket = -1; 240 } else { 241 reply->setInt32("result", OK); 242 mState = CONNECTED; 243 mNextCSeq = 1; 244 245 postReceiveReponseEvent(); 246 } 247 248 reply->post(); 249} 250 251void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) { 252 if (mState == CONNECTED || mState == CONNECTING) { 253 close(mSocket); 254 mSocket = -1; 255 256 flushPendingRequests(); 257 } 258 259 sp<AMessage> reply; 260 CHECK(msg->findMessage("reply", &reply)); 261 262 reply->setInt32("result", OK); 263 mState = DISCONNECTED; 264 265 reply->post(); 266} 267 268void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) { 269 sp<AMessage> reply; 270 CHECK(msg->findMessage("reply", &reply)); 271 272 int32_t connectionID; 273 CHECK(msg->findInt32("connection-id", &connectionID)); 274 275 if ((connectionID != mConnectionID) || mState != CONNECTING) { 276 // While we were attempting to connect, the attempt was 277 // cancelled. 278 reply->setInt32("result", -ECONNABORTED); 279 reply->post(); 280 return; 281 } 282 283 struct timeval tv; 284 tv.tv_sec = 0; 285 tv.tv_usec = kSelectTimeoutUs; 286 287 fd_set ws; 288 FD_ZERO(&ws); 289 FD_SET(mSocket, &ws); 290 291 int res = select(mSocket + 1, NULL, &ws, NULL, &tv); 292 CHECK_GE(res, 0); 293 294 if (res == 0) { 295 // Timed out. Not yet connected. 296 297 msg->post(); 298 return; 299 } 300 301 int err; 302 socklen_t optionLen = sizeof(err); 303 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 304 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 305 306 if (err != 0) { 307 LOGE("err = %d (%s)", err, strerror(err)); 308 309 reply->setInt32("result", -err); 310 311 mState = DISCONNECTED; 312 close(mSocket); 313 mSocket = -1; 314 } else { 315 reply->setInt32("result", OK); 316 mState = CONNECTED; 317 mNextCSeq = 1; 318 319 postReceiveReponseEvent(); 320 } 321 322 reply->post(); 323} 324 325void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) { 326 sp<AMessage> reply; 327 CHECK(msg->findMessage("reply", &reply)); 328 329 if (mState != CONNECTED) { 330 reply->setInt32("result", -ENOTCONN); 331 reply->post(); 332 return; 333 } 334 335 AString request; 336 CHECK(msg->findString("request", &request)); 337 338 // Find the boundary between headers and the body. 339 ssize_t i = request.find("\r\n\r\n"); 340 CHECK_GE(i, 0); 341 342 int32_t cseq = mNextCSeq++; 343 344 AString cseqHeader = "CSeq: "; 345 cseqHeader.append(cseq); 346 cseqHeader.append("\r\n"); 347 348 request.insert(cseqHeader, i + 2); 349 350 LOGV("%s", request.c_str()); 351 352 size_t numBytesSent = 0; 353 while (numBytesSent < request.size()) { 354 ssize_t n = 355 send(mSocket, request.c_str() + numBytesSent, 356 request.size() - numBytesSent, 0); 357 358 if (n == 0) { 359 // Server closed the connection. 360 LOGE("Server unexpectedly closed the connection."); 361 362 reply->setInt32("result", ERROR_IO); 363 reply->post(); 364 return; 365 } else if (n < 0) { 366 if (errno == EINTR) { 367 continue; 368 } 369 370 LOGE("Error sending rtsp request."); 371 reply->setInt32("result", -errno); 372 reply->post(); 373 return; 374 } 375 376 numBytesSent += (size_t)n; 377 } 378 379 mPendingRequests.add(cseq, reply); 380} 381 382void ARTSPConnection::onReceiveResponse() { 383 mReceiveResponseEventPending = false; 384 385 if (mState != CONNECTED) { 386 return; 387 } 388 389 struct timeval tv; 390 tv.tv_sec = 0; 391 tv.tv_usec = kSelectTimeoutUs; 392 393 fd_set rs; 394 FD_ZERO(&rs); 395 FD_SET(mSocket, &rs); 396 397 int res = select(mSocket + 1, &rs, NULL, NULL, &tv); 398 CHECK_GE(res, 0); 399 400 if (res == 1) { 401 MakeSocketBlocking(mSocket, true); 402 403 bool success = receiveRTSPReponse(); 404 405 MakeSocketBlocking(mSocket, false); 406 407 if (!success) { 408 // Something horrible, irreparable has happened. 409 flushPendingRequests(); 410 return; 411 } 412 } 413 414 postReceiveReponseEvent(); 415} 416 417void ARTSPConnection::flushPendingRequests() { 418 for (size_t i = 0; i < mPendingRequests.size(); ++i) { 419 sp<AMessage> reply = mPendingRequests.valueAt(i); 420 421 reply->setInt32("result", -ECONNABORTED); 422 reply->post(); 423 } 424 425 mPendingRequests.clear(); 426} 427 428void ARTSPConnection::postReceiveReponseEvent() { 429 if (mReceiveResponseEventPending) { 430 return; 431 } 432 433 sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id()); 434 msg->post(); 435 436 mReceiveResponseEventPending = true; 437} 438 439status_t ARTSPConnection::receive(void *data, size_t size) { 440 size_t offset = 0; 441 while (offset < size) { 442 ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0); 443 if (n == 0) { 444 // Server closed the connection. 445 LOGE("Server unexpectedly closed the connection."); 446 return ERROR_IO; 447 } else if (n < 0) { 448 if (errno == EINTR) { 449 continue; 450 } 451 452 LOGE("Error reading rtsp response."); 453 return -errno; 454 } 455 456 offset += (size_t)n; 457 } 458 459 return OK; 460} 461 462bool ARTSPConnection::receiveLine(AString *line) { 463 line->clear(); 464 465 bool sawCR = false; 466 for (;;) { 467 char c; 468 if (receive(&c, 1) != OK) { 469 return false; 470 } 471 472 if (sawCR && c == '\n') { 473 line->erase(line->size() - 1, 1); 474 return true; 475 } 476 477 line->append(&c, 1); 478 479 if (c == '$' && line->size() == 1) { 480 // Special-case for interleaved binary data. 481 return true; 482 } 483 484 sawCR = (c == '\r'); 485 } 486} 487 488sp<ABuffer> ARTSPConnection::receiveBinaryData() { 489 uint8_t x[3]; 490 if (receive(x, 3) != OK) { 491 return NULL; 492 } 493 494 sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]); 495 if (receive(buffer->data(), buffer->size()) != OK) { 496 return NULL; 497 } 498 499 buffer->meta()->setInt32("index", (int32_t)x[0]); 500 501 return buffer; 502} 503 504bool ARTSPConnection::receiveRTSPReponse() { 505 AString statusLine; 506 507 if (!receiveLine(&statusLine)) { 508 return false; 509 } 510 511 if (statusLine == "$") { 512 sp<ABuffer> buffer = receiveBinaryData(); 513 514 if (buffer == NULL) { 515 return false; 516 } 517 518 if (mObserveBinaryMessage != NULL) { 519 sp<AMessage> notify = mObserveBinaryMessage->dup(); 520 notify->setObject("buffer", buffer); 521 notify->post(); 522 } else { 523 LOGW("received binary data, but no one cares."); 524 } 525 526 return true; 527 } 528 529 sp<ARTSPResponse> response = new ARTSPResponse; 530 response->mStatusLine = statusLine; 531 532 LOGI("status: %s", response->mStatusLine.c_str()); 533 534 ssize_t space1 = response->mStatusLine.find(" "); 535 if (space1 < 0) { 536 return false; 537 } 538 ssize_t space2 = response->mStatusLine.find(" ", space1 + 1); 539 if (space2 < 0) { 540 return false; 541 } 542 543 AString statusCodeStr( 544 response->mStatusLine, space1 + 1, space2 - space1 - 1); 545 546 if (!ParseSingleUnsignedLong( 547 statusCodeStr.c_str(), &response->mStatusCode) 548 || response->mStatusCode < 100 || response->mStatusCode > 999) { 549 return false; 550 } 551 552 AString line; 553 for (;;) { 554 if (!receiveLine(&line)) { 555 break; 556 } 557 558 if (line.empty()) { 559 break; 560 } 561 562 LOGV("line: %s", line.c_str()); 563 564 ssize_t colonPos = line.find(":"); 565 if (colonPos < 0) { 566 // Malformed header line. 567 return false; 568 } 569 570 AString key(line, 0, colonPos); 571 key.trim(); 572 key.tolower(); 573 574 line.erase(0, colonPos + 1); 575 line.trim(); 576 577 response->mHeaders.add(key, line); 578 } 579 580 unsigned long contentLength = 0; 581 582 ssize_t i = response->mHeaders.indexOfKey("content-length"); 583 584 if (i >= 0) { 585 AString value = response->mHeaders.valueAt(i); 586 if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { 587 return false; 588 } 589 } 590 591 if (contentLength > 0) { 592 response->mContent = new ABuffer(contentLength); 593 594 size_t numBytesRead = 0; 595 while (numBytesRead < contentLength) { 596 ssize_t n = recv( 597 mSocket, response->mContent->data() + numBytesRead, 598 contentLength - numBytesRead, 0); 599 600 if (n == 0) { 601 // Server closed the connection. 602 TRESPASS(); 603 } else if (n < 0) { 604 if (errno == EINTR) { 605 continue; 606 } 607 608 TRESPASS(); 609 } 610 611 numBytesRead += (size_t)n; 612 } 613 } 614 615 return notifyResponseListener(response); 616} 617 618// static 619bool ARTSPConnection::ParseSingleUnsignedLong( 620 const char *from, unsigned long *x) { 621 char *end; 622 *x = strtoul(from, &end, 10); 623 624 if (end == from || *end != '\0') { 625 return false; 626 } 627 628 return true; 629} 630 631bool ARTSPConnection::notifyResponseListener( 632 const sp<ARTSPResponse> &response) { 633 ssize_t i = response->mHeaders.indexOfKey("cseq"); 634 635 if (i < 0) { 636 return true; 637 } 638 639 AString value = response->mHeaders.valueAt(i); 640 641 unsigned long cseq; 642 if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) { 643 return false; 644 } 645 646 i = mPendingRequests.indexOfKey(cseq); 647 648 if (i < 0) { 649 // Unsolicited response? 650 TRESPASS(); 651 } 652 653 sp<AMessage> reply = mPendingRequests.valueAt(i); 654 mPendingRequests.removeItemsAt(i); 655 656 reply->setInt32("result", OK); 657 reply->setObject("response", response); 658 reply->post(); 659 660 return true; 661} 662 663} // namespace android 664