1/* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "NetworkSession" 19#include <utils/Log.h> 20 21#include "ANetworkSession.h" 22#include "ParsedMessage.h" 23 24#include <arpa/inet.h> 25#include <fcntl.h> 26#include <net/if.h> 27#include <netdb.h> 28#include <netinet/in.h> 29#include <sys/socket.h> 30 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/foundation/hexdump.h> 35#include <media/stagefright/Utils.h> 36 37namespace android { 38 39static const size_t kMaxUDPSize = 1500; 40 41struct ANetworkSession::NetworkThread : public Thread { 42 NetworkThread(ANetworkSession *session); 43 44protected: 45 virtual ~NetworkThread(); 46 47private: 48 ANetworkSession *mSession; 49 50 virtual bool threadLoop(); 51 52 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); 53}; 54 55struct ANetworkSession::Session : public RefBase { 56 enum State { 57 CONNECTING, 58 CONNECTED, 59 LISTENING_RTSP, 60 LISTENING_TCP_DGRAMS, 61 DATAGRAM, 62 }; 63 64 Session(int32_t sessionID, 65 State state, 66 int s, 67 const sp<AMessage> ¬ify); 68 69 int32_t sessionID() const; 70 int socket() const; 71 sp<AMessage> getNotificationMessage() const; 72 73 bool isRTSPServer() const; 74 bool isTCPDatagramServer() const; 75 76 bool wantsToRead(); 77 bool wantsToWrite(); 78 79 status_t readMore(); 80 status_t writeMore(); 81 82 status_t sendRequest(const void *data, ssize_t size); 83 84 void setIsRTSPConnection(bool yesno); 85 86protected: 87 virtual ~Session(); 88 89private: 90 int32_t mSessionID; 91 State mState; 92 bool mIsRTSPConnection; 93 int mSocket; 94 sp<AMessage> mNotify; 95 bool mSawReceiveFailure, mSawSendFailure; 96 97 // for TCP / stream data 98 AString mOutBuffer; 99 100 // for UDP / datagrams 101 List<sp<ABuffer> > mOutDatagrams; 102 103 AString mInBuffer; 104 105 void notifyError(bool send, status_t err, const char *detail); 106 void notify(NotificationReason reason); 107 108 DISALLOW_EVIL_CONSTRUCTORS(Session); 109}; 110//////////////////////////////////////////////////////////////////////////////// 111 112ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) 113 : mSession(session) { 114} 115 116ANetworkSession::NetworkThread::~NetworkThread() { 117} 118 119bool ANetworkSession::NetworkThread::threadLoop() { 120 mSession->threadLoop(); 121 122 return true; 123} 124 125//////////////////////////////////////////////////////////////////////////////// 126 127ANetworkSession::Session::Session( 128 int32_t sessionID, 129 State state, 130 int s, 131 const sp<AMessage> ¬ify) 132 : mSessionID(sessionID), 133 mState(state), 134 mIsRTSPConnection(false), 135 mSocket(s), 136 mNotify(notify), 137 mSawReceiveFailure(false), 138 mSawSendFailure(false) { 139 if (mState == CONNECTED) { 140 struct sockaddr_in localAddr; 141 socklen_t localAddrLen = sizeof(localAddr); 142 143 int res = getsockname( 144 mSocket, (struct sockaddr *)&localAddr, &localAddrLen); 145 CHECK_GE(res, 0); 146 147 struct sockaddr_in remoteAddr; 148 socklen_t remoteAddrLen = sizeof(remoteAddr); 149 150 res = getpeername( 151 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 152 CHECK_GE(res, 0); 153 154 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); 155 AString localAddrString = StringPrintf( 156 "%d.%d.%d.%d", 157 (addr >> 24), 158 (addr >> 16) & 0xff, 159 (addr >> 8) & 0xff, 160 addr & 0xff); 161 162 addr = ntohl(remoteAddr.sin_addr.s_addr); 163 AString remoteAddrString = StringPrintf( 164 "%d.%d.%d.%d", 165 (addr >> 24), 166 (addr >> 16) & 0xff, 167 (addr >> 8) & 0xff, 168 addr & 0xff); 169 170 sp<AMessage> msg = mNotify->dup(); 171 msg->setInt32("sessionID", mSessionID); 172 msg->setInt32("reason", kWhatClientConnected); 173 msg->setString("server-ip", localAddrString.c_str()); 174 msg->setInt32("server-port", ntohs(localAddr.sin_port)); 175 msg->setString("client-ip", remoteAddrString.c_str()); 176 msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); 177 msg->post(); 178 } 179} 180 181ANetworkSession::Session::~Session() { 182 ALOGV("Session %d gone", mSessionID); 183 184 close(mSocket); 185 mSocket = -1; 186} 187 188int32_t ANetworkSession::Session::sessionID() const { 189 return mSessionID; 190} 191 192int ANetworkSession::Session::socket() const { 193 return mSocket; 194} 195 196void ANetworkSession::Session::setIsRTSPConnection(bool yesno) { 197 mIsRTSPConnection = yesno; 198} 199 200sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { 201 return mNotify; 202} 203 204bool ANetworkSession::Session::isRTSPServer() const { 205 return mState == LISTENING_RTSP; 206} 207 208bool ANetworkSession::Session::isTCPDatagramServer() const { 209 return mState == LISTENING_TCP_DGRAMS; 210} 211 212bool ANetworkSession::Session::wantsToRead() { 213 return !mSawReceiveFailure && mState != CONNECTING; 214} 215 216bool ANetworkSession::Session::wantsToWrite() { 217 return !mSawSendFailure 218 && (mState == CONNECTING 219 || (mState == CONNECTED && !mOutBuffer.empty()) 220 || (mState == DATAGRAM && !mOutDatagrams.empty())); 221} 222 223status_t ANetworkSession::Session::readMore() { 224 if (mState == DATAGRAM) { 225 status_t err; 226 do { 227 sp<ABuffer> buf = new ABuffer(kMaxUDPSize); 228 229 struct sockaddr_in remoteAddr; 230 socklen_t remoteAddrLen = sizeof(remoteAddr); 231 232 ssize_t n; 233 do { 234 n = recvfrom( 235 mSocket, buf->data(), buf->capacity(), 0, 236 (struct sockaddr *)&remoteAddr, &remoteAddrLen); 237 } while (n < 0 && errno == EINTR); 238 239 err = OK; 240 if (n < 0) { 241 err = -errno; 242 } else if (n == 0) { 243 err = -ECONNRESET; 244 } else { 245 buf->setRange(0, n); 246 247 int64_t nowUs = ALooper::GetNowUs(); 248 buf->meta()->setInt64("arrivalTimeUs", nowUs); 249 250 sp<AMessage> notify = mNotify->dup(); 251 notify->setInt32("sessionID", mSessionID); 252 notify->setInt32("reason", kWhatDatagram); 253 254 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); 255 notify->setString( 256 "fromAddr", 257 StringPrintf( 258 "%u.%u.%u.%u", 259 ip >> 24, 260 (ip >> 16) & 0xff, 261 (ip >> 8) & 0xff, 262 ip & 0xff).c_str()); 263 264 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); 265 266 notify->setBuffer("data", buf); 267 notify->post(); 268 } 269 } while (err == OK); 270 271 if (err == -EAGAIN) { 272 err = OK; 273 } 274 275 if (err != OK) { 276 notifyError(false /* send */, err, "Recvfrom failed."); 277 mSawReceiveFailure = true; 278 } 279 280 return err; 281 } 282 283 char tmp[512]; 284 ssize_t n; 285 do { 286 n = recv(mSocket, tmp, sizeof(tmp), 0); 287 } while (n < 0 && errno == EINTR); 288 289 status_t err = OK; 290 291 if (n > 0) { 292 mInBuffer.append(tmp, n); 293 294#if 0 295 ALOGI("in:"); 296 hexdump(tmp, n); 297#endif 298 } else if (n < 0) { 299 err = -errno; 300 } else { 301 err = -ECONNRESET; 302 } 303 304 if (!mIsRTSPConnection) { 305 // TCP stream carrying 16-bit length-prefixed datagrams. 306 307 while (mInBuffer.size() >= 2) { 308 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str()); 309 310 if (mInBuffer.size() < packetSize + 2) { 311 break; 312 } 313 314 sp<ABuffer> packet = new ABuffer(packetSize); 315 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize); 316 317 sp<AMessage> notify = mNotify->dup(); 318 notify->setInt32("sessionID", mSessionID); 319 notify->setInt32("reason", kWhatDatagram); 320 notify->setBuffer("data", packet); 321 notify->post(); 322 323 mInBuffer.erase(0, packetSize + 2); 324 } 325 } else { 326 for (;;) { 327 size_t length; 328 329 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { 330 if (mInBuffer.size() < 4) { 331 break; 332 } 333 334 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); 335 336 if (mInBuffer.size() < 4 + length) { 337 break; 338 } 339 340 sp<AMessage> notify = mNotify->dup(); 341 notify->setInt32("sessionID", mSessionID); 342 notify->setInt32("reason", kWhatBinaryData); 343 notify->setInt32("channel", mInBuffer.c_str()[1]); 344 345 sp<ABuffer> data = new ABuffer(length); 346 memcpy(data->data(), mInBuffer.c_str() + 4, length); 347 348 int64_t nowUs = ALooper::GetNowUs(); 349 data->meta()->setInt64("arrivalTimeUs", nowUs); 350 351 notify->setBuffer("data", data); 352 notify->post(); 353 354 mInBuffer.erase(0, 4 + length); 355 continue; 356 } 357 358 sp<ParsedMessage> msg = 359 ParsedMessage::Parse( 360 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); 361 362 if (msg == NULL) { 363 break; 364 } 365 366 sp<AMessage> notify = mNotify->dup(); 367 notify->setInt32("sessionID", mSessionID); 368 notify->setInt32("reason", kWhatData); 369 notify->setObject("data", msg); 370 notify->post(); 371 372#if 1 373 // XXX The (old) dongle sends the wrong content length header on a 374 // SET_PARAMETER request that signals a "wfd_idr_request". 375 // (17 instead of 19). 376 const char *content = msg->getContent(); 377 if (content 378 && !memcmp(content, "wfd_idr_request\r\n", 17) 379 && length >= 19 380 && mInBuffer.c_str()[length] == '\r' 381 && mInBuffer.c_str()[length + 1] == '\n') { 382 length += 2; 383 } 384#endif 385 386 mInBuffer.erase(0, length); 387 388 if (err != OK) { 389 break; 390 } 391 } 392 } 393 394 if (err != OK) { 395 notifyError(false /* send */, err, "Recv failed."); 396 mSawReceiveFailure = true; 397 } 398 399 return err; 400} 401 402status_t ANetworkSession::Session::writeMore() { 403 if (mState == DATAGRAM) { 404 CHECK(!mOutDatagrams.empty()); 405 406 status_t err; 407 do { 408 const sp<ABuffer> &datagram = *mOutDatagrams.begin(); 409 410 uint8_t *data = datagram->data(); 411 if (data[0] == 0x80 && (data[1] & 0x7f) == 33) { 412 int64_t nowUs = ALooper::GetNowUs(); 413 414 uint32_t prevRtpTime = U32_AT(&data[4]); 415 416 // 90kHz time scale 417 uint32_t rtpTime = (nowUs * 9ll) / 100ll; 418 int32_t diffTime = (int32_t)rtpTime - (int32_t)prevRtpTime; 419 420 ALOGV("correcting rtpTime by %.0f ms", diffTime / 90.0); 421 422 data[4] = rtpTime >> 24; 423 data[5] = (rtpTime >> 16) & 0xff; 424 data[6] = (rtpTime >> 8) & 0xff; 425 data[7] = rtpTime & 0xff; 426 } 427 428 int n; 429 do { 430 n = send(mSocket, datagram->data(), datagram->size(), 0); 431 } while (n < 0 && errno == EINTR); 432 433 err = OK; 434 435 if (n > 0) { 436 mOutDatagrams.erase(mOutDatagrams.begin()); 437 } else if (n < 0) { 438 err = -errno; 439 } else if (n == 0) { 440 err = -ECONNRESET; 441 } 442 } while (err == OK && !mOutDatagrams.empty()); 443 444 if (err == -EAGAIN) { 445 if (!mOutDatagrams.empty()) { 446 ALOGI("%d datagrams remain queued.", mOutDatagrams.size()); 447 } 448 err = OK; 449 } 450 451 if (err != OK) { 452 notifyError(true /* send */, err, "Send datagram failed."); 453 mSawSendFailure = true; 454 } 455 456 return err; 457 } 458 459 if (mState == CONNECTING) { 460 int err; 461 socklen_t optionLen = sizeof(err); 462 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 463 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 464 465 if (err != 0) { 466 notifyError(kWhatError, -err, "Connection failed"); 467 mSawSendFailure = true; 468 469 return -err; 470 } 471 472 mState = CONNECTED; 473 notify(kWhatConnected); 474 475 return OK; 476 } 477 478 CHECK_EQ(mState, CONNECTED); 479 CHECK(!mOutBuffer.empty()); 480 481 ssize_t n; 482 do { 483 n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0); 484 } while (n < 0 && errno == EINTR); 485 486 status_t err = OK; 487 488 if (n > 0) { 489#if 0 490 ALOGI("out:"); 491 hexdump(mOutBuffer.c_str(), n); 492#endif 493 494 mOutBuffer.erase(0, n); 495 } else if (n < 0) { 496 err = -errno; 497 } else if (n == 0) { 498 err = -ECONNRESET; 499 } 500 501 if (err != OK) { 502 notifyError(true /* send */, err, "Send failed."); 503 mSawSendFailure = true; 504 } 505 506 return err; 507} 508 509status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { 510 CHECK(mState == CONNECTED || mState == DATAGRAM); 511 512 if (mState == DATAGRAM) { 513 CHECK_GE(size, 0); 514 515 sp<ABuffer> datagram = new ABuffer(size); 516 memcpy(datagram->data(), data, size); 517 518 mOutDatagrams.push_back(datagram); 519 return OK; 520 } 521 522 if (mState == CONNECTED && !mIsRTSPConnection) { 523 CHECK_LE(size, 65535); 524 525 uint8_t prefix[2]; 526 prefix[0] = size >> 8; 527 prefix[1] = size & 0xff; 528 529 mOutBuffer.append((const char *)prefix, sizeof(prefix)); 530 } 531 532 mOutBuffer.append( 533 (const char *)data, 534 (size >= 0) ? size : strlen((const char *)data)); 535 536 return OK; 537} 538 539void ANetworkSession::Session::notifyError( 540 bool send, status_t err, const char *detail) { 541 sp<AMessage> msg = mNotify->dup(); 542 msg->setInt32("sessionID", mSessionID); 543 msg->setInt32("reason", kWhatError); 544 msg->setInt32("send", send); 545 msg->setInt32("err", err); 546 msg->setString("detail", detail); 547 msg->post(); 548} 549 550void ANetworkSession::Session::notify(NotificationReason reason) { 551 sp<AMessage> msg = mNotify->dup(); 552 msg->setInt32("sessionID", mSessionID); 553 msg->setInt32("reason", reason); 554 msg->post(); 555} 556 557//////////////////////////////////////////////////////////////////////////////// 558 559ANetworkSession::ANetworkSession() 560 : mNextSessionID(1) { 561 mPipeFd[0] = mPipeFd[1] = -1; 562} 563 564ANetworkSession::~ANetworkSession() { 565 stop(); 566} 567 568status_t ANetworkSession::start() { 569 if (mThread != NULL) { 570 return INVALID_OPERATION; 571 } 572 573 int res = pipe(mPipeFd); 574 if (res != 0) { 575 mPipeFd[0] = mPipeFd[1] = -1; 576 return -errno; 577 } 578 579 mThread = new NetworkThread(this); 580 581 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); 582 583 if (err != OK) { 584 mThread.clear(); 585 586 close(mPipeFd[0]); 587 close(mPipeFd[1]); 588 mPipeFd[0] = mPipeFd[1] = -1; 589 590 return err; 591 } 592 593 return OK; 594} 595 596status_t ANetworkSession::stop() { 597 if (mThread == NULL) { 598 return INVALID_OPERATION; 599 } 600 601 mThread->requestExit(); 602 interrupt(); 603 mThread->requestExitAndWait(); 604 605 mThread.clear(); 606 607 close(mPipeFd[0]); 608 close(mPipeFd[1]); 609 mPipeFd[0] = mPipeFd[1] = -1; 610 611 return OK; 612} 613 614status_t ANetworkSession::createRTSPClient( 615 const char *host, unsigned port, const sp<AMessage> ¬ify, 616 int32_t *sessionID) { 617 return createClientOrServer( 618 kModeCreateRTSPClient, 619 NULL /* addr */, 620 0 /* port */, 621 host, 622 port, 623 notify, 624 sessionID); 625} 626 627status_t ANetworkSession::createRTSPServer( 628 const struct in_addr &addr, unsigned port, 629 const sp<AMessage> ¬ify, int32_t *sessionID) { 630 return createClientOrServer( 631 kModeCreateRTSPServer, 632 &addr, 633 port, 634 NULL /* remoteHost */, 635 0 /* remotePort */, 636 notify, 637 sessionID); 638} 639 640status_t ANetworkSession::createUDPSession( 641 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { 642 return createUDPSession(localPort, NULL, 0, notify, sessionID); 643} 644 645status_t ANetworkSession::createUDPSession( 646 unsigned localPort, 647 const char *remoteHost, 648 unsigned remotePort, 649 const sp<AMessage> ¬ify, 650 int32_t *sessionID) { 651 return createClientOrServer( 652 kModeCreateUDPSession, 653 NULL /* addr */, 654 localPort, 655 remoteHost, 656 remotePort, 657 notify, 658 sessionID); 659} 660 661status_t ANetworkSession::createTCPDatagramSession( 662 const struct in_addr &addr, unsigned port, 663 const sp<AMessage> ¬ify, int32_t *sessionID) { 664 return createClientOrServer( 665 kModeCreateTCPDatagramSessionPassive, 666 &addr, 667 port, 668 NULL /* remoteHost */, 669 0 /* remotePort */, 670 notify, 671 sessionID); 672} 673 674status_t ANetworkSession::createTCPDatagramSession( 675 unsigned localPort, 676 const char *remoteHost, 677 unsigned remotePort, 678 const sp<AMessage> ¬ify, 679 int32_t *sessionID) { 680 return createClientOrServer( 681 kModeCreateTCPDatagramSessionActive, 682 NULL /* addr */, 683 localPort, 684 remoteHost, 685 remotePort, 686 notify, 687 sessionID); 688} 689 690status_t ANetworkSession::destroySession(int32_t sessionID) { 691 Mutex::Autolock autoLock(mLock); 692 693 ssize_t index = mSessions.indexOfKey(sessionID); 694 695 if (index < 0) { 696 return -ENOENT; 697 } 698 699 mSessions.removeItemsAt(index); 700 701 interrupt(); 702 703 return OK; 704} 705 706// static 707status_t ANetworkSession::MakeSocketNonBlocking(int s) { 708 int flags = fcntl(s, F_GETFL, 0); 709 if (flags < 0) { 710 flags = 0; 711 } 712 713 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); 714 if (res < 0) { 715 return -errno; 716 } 717 718 return OK; 719} 720 721status_t ANetworkSession::createClientOrServer( 722 Mode mode, 723 const struct in_addr *localAddr, 724 unsigned port, 725 const char *remoteHost, 726 unsigned remotePort, 727 const sp<AMessage> ¬ify, 728 int32_t *sessionID) { 729 Mutex::Autolock autoLock(mLock); 730 731 *sessionID = 0; 732 status_t err = OK; 733 int s, res; 734 sp<Session> session; 735 736 s = socket( 737 AF_INET, 738 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, 739 0); 740 741 if (s < 0) { 742 err = -errno; 743 goto bail; 744 } 745 746 if (mode == kModeCreateRTSPServer 747 || mode == kModeCreateTCPDatagramSessionPassive) { 748 const int yes = 1; 749 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); 750 751 if (res < 0) { 752 err = -errno; 753 goto bail2; 754 } 755 } 756 757 if (mode == kModeCreateUDPSession) { 758 int size = 256 * 1024; 759 760 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 761 762 if (res < 0) { 763 err = -errno; 764 goto bail2; 765 } 766 767 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 768 769 if (res < 0) { 770 err = -errno; 771 goto bail2; 772 } 773 } 774 775 err = MakeSocketNonBlocking(s); 776 777 if (err != OK) { 778 goto bail2; 779 } 780 781 struct sockaddr_in addr; 782 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 783 addr.sin_family = AF_INET; 784 785 if (mode == kModeCreateRTSPClient 786 || mode == kModeCreateTCPDatagramSessionActive) { 787 struct hostent *ent= gethostbyname(remoteHost); 788 if (ent == NULL) { 789 err = -h_errno; 790 goto bail2; 791 } 792 793 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 794 addr.sin_port = htons(remotePort); 795 } else if (localAddr != NULL) { 796 addr.sin_addr = *localAddr; 797 addr.sin_port = htons(port); 798 } else { 799 addr.sin_addr.s_addr = htonl(INADDR_ANY); 800 addr.sin_port = htons(port); 801 } 802 803 if (mode == kModeCreateRTSPClient 804 || mode == kModeCreateTCPDatagramSessionActive) { 805 in_addr_t x = ntohl(addr.sin_addr.s_addr); 806 ALOGI("connecting socket %d to %d.%d.%d.%d:%d", 807 s, 808 (x >> 24), 809 (x >> 16) & 0xff, 810 (x >> 8) & 0xff, 811 x & 0xff, 812 ntohs(addr.sin_port)); 813 814 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); 815 816 CHECK_LT(res, 0); 817 if (errno == EINPROGRESS) { 818 res = 0; 819 } 820 } else { 821 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); 822 823 if (res == 0) { 824 if (mode == kModeCreateRTSPServer 825 || mode == kModeCreateTCPDatagramSessionPassive) { 826 res = listen(s, 4); 827 } else { 828 CHECK_EQ(mode, kModeCreateUDPSession); 829 830 if (remoteHost != NULL) { 831 struct sockaddr_in remoteAddr; 832 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 833 remoteAddr.sin_family = AF_INET; 834 remoteAddr.sin_port = htons(remotePort); 835 836 struct hostent *ent= gethostbyname(remoteHost); 837 if (ent == NULL) { 838 err = -h_errno; 839 goto bail2; 840 } 841 842 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 843 844 res = connect( 845 s, 846 (const struct sockaddr *)&remoteAddr, 847 sizeof(remoteAddr)); 848 } 849 } 850 } 851 } 852 853 if (res < 0) { 854 err = -errno; 855 goto bail2; 856 } 857 858 Session::State state; 859 switch (mode) { 860 case kModeCreateRTSPClient: 861 state = Session::CONNECTING; 862 break; 863 864 case kModeCreateTCPDatagramSessionActive: 865 state = Session::CONNECTING; 866 break; 867 868 case kModeCreateTCPDatagramSessionPassive: 869 state = Session::LISTENING_TCP_DGRAMS; 870 break; 871 872 case kModeCreateRTSPServer: 873 state = Session::LISTENING_RTSP; 874 break; 875 876 default: 877 CHECK_EQ(mode, kModeCreateUDPSession); 878 state = Session::DATAGRAM; 879 break; 880 } 881 882 session = new Session( 883 mNextSessionID++, 884 state, 885 s, 886 notify); 887 888 if (mode == kModeCreateTCPDatagramSessionActive) { 889 session->setIsRTSPConnection(false); 890 } else if (mode == kModeCreateRTSPClient) { 891 session->setIsRTSPConnection(true); 892 } 893 894 mSessions.add(session->sessionID(), session); 895 896 interrupt(); 897 898 *sessionID = session->sessionID(); 899 900 goto bail; 901 902bail2: 903 close(s); 904 s = -1; 905 906bail: 907 return err; 908} 909 910status_t ANetworkSession::connectUDPSession( 911 int32_t sessionID, const char *remoteHost, unsigned remotePort) { 912 Mutex::Autolock autoLock(mLock); 913 914 ssize_t index = mSessions.indexOfKey(sessionID); 915 916 if (index < 0) { 917 return -ENOENT; 918 } 919 920 const sp<Session> session = mSessions.valueAt(index); 921 int s = session->socket(); 922 923 struct sockaddr_in remoteAddr; 924 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 925 remoteAddr.sin_family = AF_INET; 926 remoteAddr.sin_port = htons(remotePort); 927 928 status_t err = OK; 929 struct hostent *ent = gethostbyname(remoteHost); 930 if (ent == NULL) { 931 err = -h_errno; 932 } else { 933 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 934 935 int res = connect( 936 s, 937 (const struct sockaddr *)&remoteAddr, 938 sizeof(remoteAddr)); 939 940 if (res < 0) { 941 err = -errno; 942 } 943 } 944 945 return err; 946} 947 948status_t ANetworkSession::sendRequest( 949 int32_t sessionID, const void *data, ssize_t size) { 950 Mutex::Autolock autoLock(mLock); 951 952 ssize_t index = mSessions.indexOfKey(sessionID); 953 954 if (index < 0) { 955 return -ENOENT; 956 } 957 958 const sp<Session> session = mSessions.valueAt(index); 959 960 status_t err = session->sendRequest(data, size); 961 962 interrupt(); 963 964 return err; 965} 966 967void ANetworkSession::interrupt() { 968 static const char dummy = 0; 969 970 ssize_t n; 971 do { 972 n = write(mPipeFd[1], &dummy, 1); 973 } while (n < 0 && errno == EINTR); 974 975 if (n < 0) { 976 ALOGW("Error writing to pipe (%s)", strerror(errno)); 977 } 978} 979 980void ANetworkSession::threadLoop() { 981 fd_set rs, ws; 982 FD_ZERO(&rs); 983 FD_ZERO(&ws); 984 985 FD_SET(mPipeFd[0], &rs); 986 int maxFd = mPipeFd[0]; 987 988 { 989 Mutex::Autolock autoLock(mLock); 990 991 for (size_t i = 0; i < mSessions.size(); ++i) { 992 const sp<Session> &session = mSessions.valueAt(i); 993 994 int s = session->socket(); 995 996 if (s < 0) { 997 continue; 998 } 999 1000 if (session->wantsToRead()) { 1001 FD_SET(s, &rs); 1002 if (s > maxFd) { 1003 maxFd = s; 1004 } 1005 } 1006 1007 if (session->wantsToWrite()) { 1008 FD_SET(s, &ws); 1009 if (s > maxFd) { 1010 maxFd = s; 1011 } 1012 } 1013 } 1014 } 1015 1016 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); 1017 1018 if (res == 0) { 1019 return; 1020 } 1021 1022 if (res < 0) { 1023 if (errno == EINTR) { 1024 return; 1025 } 1026 1027 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); 1028 return; 1029 } 1030 1031 if (FD_ISSET(mPipeFd[0], &rs)) { 1032 char c; 1033 ssize_t n; 1034 do { 1035 n = read(mPipeFd[0], &c, 1); 1036 } while (n < 0 && errno == EINTR); 1037 1038 if (n < 0) { 1039 ALOGW("Error reading from pipe (%s)", strerror(errno)); 1040 } 1041 1042 --res; 1043 } 1044 1045 { 1046 Mutex::Autolock autoLock(mLock); 1047 1048 List<sp<Session> > sessionsToAdd; 1049 1050 for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { 1051 const sp<Session> &session = mSessions.valueAt(i); 1052 1053 int s = session->socket(); 1054 1055 if (s < 0) { 1056 continue; 1057 } 1058 1059 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { 1060 --res; 1061 } 1062 1063 if (FD_ISSET(s, &rs)) { 1064 if (session->isRTSPServer() || session->isTCPDatagramServer()) { 1065 struct sockaddr_in remoteAddr; 1066 socklen_t remoteAddrLen = sizeof(remoteAddr); 1067 1068 int clientSocket = accept( 1069 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 1070 1071 if (clientSocket >= 0) { 1072 status_t err = MakeSocketNonBlocking(clientSocket); 1073 1074 if (err != OK) { 1075 ALOGE("Unable to make client socket non blocking, " 1076 "failed w/ error %d (%s)", 1077 err, strerror(-err)); 1078 1079 close(clientSocket); 1080 clientSocket = -1; 1081 } else { 1082 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); 1083 1084 ALOGI("incoming connection from %d.%d.%d.%d:%d " 1085 "(socket %d)", 1086 (addr >> 24), 1087 (addr >> 16) & 0xff, 1088 (addr >> 8) & 0xff, 1089 addr & 0xff, 1090 ntohs(remoteAddr.sin_port), 1091 clientSocket); 1092 1093 sp<Session> clientSession = 1094 // using socket sd as sessionID 1095 new Session( 1096 mNextSessionID++, 1097 Session::CONNECTED, 1098 clientSocket, 1099 session->getNotificationMessage()); 1100 1101 clientSession->setIsRTSPConnection( 1102 session->isRTSPServer()); 1103 1104 sessionsToAdd.push_back(clientSession); 1105 } 1106 } else { 1107 ALOGE("accept returned error %d (%s)", 1108 errno, strerror(errno)); 1109 } 1110 } else { 1111 status_t err = session->readMore(); 1112 if (err != OK) { 1113 ALOGE("readMore on socket %d failed w/ error %d (%s)", 1114 s, err, strerror(-err)); 1115 } 1116 } 1117 } 1118 1119 if (FD_ISSET(s, &ws)) { 1120 status_t err = session->writeMore(); 1121 if (err != OK) { 1122 ALOGE("writeMore on socket %d failed w/ error %d (%s)", 1123 s, err, strerror(-err)); 1124 } 1125 } 1126 } 1127 1128 while (!sessionsToAdd.empty()) { 1129 sp<Session> session = *sessionsToAdd.begin(); 1130 sessionsToAdd.erase(sessionsToAdd.begin()); 1131 1132 mSessions.add(session->sessionID(), session); 1133 1134 ALOGI("added clientSession %d", session->sessionID()); 1135 } 1136 } 1137} 1138 1139} // namespace android 1140 1141