1/* 2 * Copyright (C) 2007 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "mq" 18 19#include <assert.h> 20#include <errno.h> 21#include <fcntl.h> 22#include <pthread.h> 23#include <stdlib.h> 24#include <string.h> 25#include <unistd.h> 26 27#include <sys/socket.h> 28#include <sys/types.h> 29#include <sys/un.h> 30#include <sys/uio.h> 31 32#include <cutils/array.h> 33#include <cutils/hashmap.h> 34#include <cutils/selector.h> 35 36#include "loghack.h" 37#include "buffer.h" 38 39/** Number of dead peers to remember. */ 40#define PEER_HISTORY (16) 41 42typedef struct sockaddr SocketAddress; 43typedef struct sockaddr_un UnixAddress; 44 45/** 46 * Process/user/group ID. We don't use ucred directly because it's only 47 * available on Linux. 48 */ 49typedef struct { 50 pid_t pid; 51 uid_t uid; 52 gid_t gid; 53} Credentials; 54 55/** Listens for bytes coming from remote peers. */ 56typedef void BytesListener(Credentials credentials, char* bytes, size_t size); 57 58/** Listens for the deaths of remote peers. */ 59typedef void DeathListener(pid_t pid); 60 61/** Types of packets. */ 62typedef enum { 63 /** Request for a connection to another peer. */ 64 CONNECTION_REQUEST, 65 66 /** A connection to another peer. */ 67 CONNECTION, 68 69 /** Reports a failed connection attempt. */ 70 CONNECTION_ERROR, 71 72 /** A generic packet of bytes. */ 73 BYTES, 74} PacketType; 75 76typedef enum { 77 /** Reading a packet header. */ 78 READING_HEADER, 79 80 /** Waiting for a connection from the master. */ 81 ACCEPTING_CONNECTION, 82 83 /** Reading bytes. */ 84 READING_BYTES, 85} InputState; 86 87/** A packet header. */ 88// TODO: Use custom headers for master->peer, peer->master, peer->peer. 89typedef struct { 90 PacketType type; 91 union { 92 /** Packet size. Used for BYTES. */ 93 size_t size; 94 95 /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */ 96 Credentials credentials; 97 }; 98} Header; 99 100/** A packet which will be sent to a peer. */ 101typedef struct OutgoingPacket OutgoingPacket; 102struct OutgoingPacket { 103 /** Packet header. */ 104 Header header; 105 106 union { 107 /** Connection to peer. Used with CONNECTION. */ 108 int socket; 109 110 /** Buffer of bytes. Used with BYTES. */ 111 Buffer* bytes; 112 }; 113 114 /** Frees all resources associated with this packet. */ 115 void (*free)(OutgoingPacket* packet); 116 117 /** Optional context. */ 118 void* context; 119 120 /** Next packet in the queue. */ 121 OutgoingPacket* nextPacket; 122}; 123 124/** Represents a remote peer. */ 125typedef struct PeerProxy PeerProxy; 126 127/** Local peer state. You typically have one peer per process. */ 128typedef struct { 129 /** This peer's PID. */ 130 pid_t pid; 131 132 /** 133 * Map from pid to peer proxy. The peer has a peer proxy for each remote 134 * peer it's connected to. 135 * 136 * Acquire mutex before use. 137 */ 138 Hashmap* peerProxies; 139 140 /** Manages I/O. */ 141 Selector* selector; 142 143 /** Used to synchronize operations with the selector thread. */ 144 pthread_mutex_t mutex; 145 146 /** Is this peer the master? */ 147 bool master; 148 149 /** Peer proxy for the master. */ 150 PeerProxy* masterProxy; 151 152 /** Listens for packets from remote peers. */ 153 BytesListener* onBytes; 154 155 /** Listens for deaths of remote peers. */ 156 DeathListener* onDeath; 157 158 /** Keeps track of recently dead peers. Requires mutex. */ 159 pid_t deadPeers[PEER_HISTORY]; 160 size_t deadPeerCursor; 161} Peer; 162 163struct PeerProxy { 164 /** Credentials of the remote process. */ 165 Credentials credentials; 166 167 /** Keeps track of data coming in from the remote peer. */ 168 InputState inputState; 169 Buffer* inputBuffer; 170 PeerProxy* connecting; 171 172 /** File descriptor for this peer. */ 173 SelectableFd* fd; 174 175 /** 176 * Queue of packets to be written out to the remote peer. 177 * 178 * Requires mutex. 179 */ 180 // TODO: Limit queue length. 181 OutgoingPacket* currentPacket; 182 OutgoingPacket* lastPacket; 183 184 /** Used to write outgoing header. */ 185 Buffer outgoingHeader; 186 187 /** True if this is the master's proxy. */ 188 bool master; 189 190 /** Reference back to the local peer. */ 191 Peer* peer; 192 193 /** 194 * Used in master only. Maps this peer proxy to other peer proxies to 195 * which the peer has been connected to. Maps pid to PeerProxy. Helps 196 * keep track of which connections we've sent to whom. 197 */ 198 Hashmap* connections; 199}; 200 201/** Server socket path. */ 202static const char* MASTER_PATH = "/master.peer"; 203 204/** Credentials of the master peer. */ 205static const Credentials MASTER_CREDENTIALS = {0, 0, 0}; 206 207/** Creates a peer proxy and adds it to the peer proxy map. */ 208static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials); 209 210/** Sets the non-blocking flag on a descriptor. */ 211static void setNonBlocking(int fd) { 212 int flags; 213 if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { 214 LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); 215 } 216 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { 217 LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); 218 } 219} 220 221/** Closes a fd and logs a warning if the close fails. */ 222static void closeWithWarning(int fd) { 223 int result = close(fd); 224 if (result == -1) { 225 LOGW("close() error: %s", strerror(errno)); 226 } 227} 228 229/** Hashes pid_t keys. */ 230static int pidHash(void* key) { 231 pid_t* pid = (pid_t*) key; 232 return (int) (*pid); 233} 234 235/** Compares pid_t keys. */ 236static bool pidEquals(void* keyA, void* keyB) { 237 pid_t* a = (pid_t*) keyA; 238 pid_t* b = (pid_t*) keyB; 239 return *a == *b; 240} 241 242/** Gets the master address. Not thread safe. */ 243static UnixAddress* getMasterAddress() { 244 static UnixAddress masterAddress; 245 static bool initialized = false; 246 if (initialized == false) { 247 masterAddress.sun_family = AF_LOCAL; 248 strcpy(masterAddress.sun_path, MASTER_PATH); 249 initialized = true; 250 } 251 return &masterAddress; 252} 253 254/** Gets exclusive access to the peer for this thread. */ 255static void peerLock(Peer* peer) { 256 pthread_mutex_lock(&peer->mutex); 257} 258 259/** Releases exclusive access to the peer. */ 260static void peerUnlock(Peer* peer) { 261 pthread_mutex_unlock(&peer->mutex); 262} 263 264/** Frees a simple, i.e. header-only, outgoing packet. */ 265static void outgoingPacketFree(OutgoingPacket* packet) { 266 LOGD("Freeing outgoing packet."); 267 free(packet); 268} 269 270/** 271 * Prepare to read a new packet from the peer. 272 */ 273static void peerProxyExpectHeader(PeerProxy* peerProxy) { 274 peerProxy->inputState = READING_HEADER; 275 bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header)); 276} 277 278/** Sets up the buffer for the outgoing header. */ 279static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) { 280 peerProxy->outgoingHeader.data 281 = (char*) &(peerProxy->currentPacket->header); 282 peerProxy->outgoingHeader.size = sizeof(Header); 283 bufferPrepareForWrite(&peerProxy->outgoingHeader); 284} 285 286/** Adds a packet to the end of the queue. Callers must have the mutex. */ 287static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy, 288 OutgoingPacket* newPacket) { 289 newPacket->nextPacket = NULL; // Just in case. 290 if (peerProxy->currentPacket == NULL) { 291 // The queue is empty. 292 peerProxy->currentPacket = newPacket; 293 peerProxy->lastPacket = newPacket; 294 295 peerProxyPrepareOutgoingHeader(peerProxy); 296 } else { 297 peerProxy->lastPacket->nextPacket = newPacket; 298 } 299} 300 301/** Takes the peer lock and enqueues the given packet. */ 302static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy, 303 OutgoingPacket* newPacket) { 304 Peer* peer = peerProxy->peer; 305 peerLock(peer); 306 peerProxyEnqueueOutgoingPacket(peerProxy, newPacket); 307 peerUnlock(peer); 308} 309 310/** 311 * Frees current packet and moves to the next one. Returns true if there is 312 * a next packet or false if the queue is empty. 313 */ 314static bool peerProxyNextPacket(PeerProxy* peerProxy) { 315 Peer* peer = peerProxy->peer; 316 peerLock(peer); 317 318 OutgoingPacket* current = peerProxy->currentPacket; 319 320 if (current == NULL) { 321 // The queue is already empty. 322 peerUnlock(peer); 323 return false; 324 } 325 326 OutgoingPacket* next = current->nextPacket; 327 peerProxy->currentPacket = next; 328 current->nextPacket = NULL; 329 current->free(current); 330 if (next == NULL) { 331 // The queue is empty. 332 peerProxy->lastPacket = NULL; 333 peerUnlock(peer); 334 return false; 335 } else { 336 peerUnlock(peer); 337 peerProxyPrepareOutgoingHeader(peerProxy); 338 339 // TODO: Start writing next packet? It would reduce the number of 340 // system calls, but we could also starve other peers. 341 return true; 342 } 343} 344 345/** 346 * Checks whether a peer died recently. 347 */ 348static bool peerIsDead(Peer* peer, pid_t pid) { 349 size_t i; 350 for (i = 0; i < PEER_HISTORY; i++) { 351 pid_t deadPeer = peer->deadPeers[i]; 352 if (deadPeer == 0) { 353 return false; 354 } 355 if (deadPeer == pid) { 356 return true; 357 } 358 } 359 return false; 360} 361 362/** 363 * Cleans up connection information. 364 */ 365static bool peerProxyRemoveConnection(void* key, void* value, void* context) { 366 PeerProxy* deadPeer = (PeerProxy*) context; 367 PeerProxy* otherPeer = (PeerProxy*) value; 368 hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid)); 369 return true; 370} 371 372/** 373 * Called when the peer dies. 374 */ 375static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) { 376 if (errnoIsSet) { 377 LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, 378 strerror(errno)); 379 } else { 380 LOGI("Peer %d died.", peerProxy->credentials.pid); 381 } 382 383 // If we lost the master, we're up a creek. We can't let this happen. 384 if (peerProxy->master) { 385 LOG_ALWAYS_FATAL("Lost connection to master."); 386 } 387 388 Peer* localPeer = peerProxy->peer; 389 pid_t pid = peerProxy->credentials.pid; 390 391 peerLock(localPeer); 392 393 // Remember for awhile that the peer died. 394 localPeer->deadPeers[localPeer->deadPeerCursor] 395 = peerProxy->credentials.pid; 396 localPeer->deadPeerCursor++; 397 if (localPeer->deadPeerCursor == PEER_HISTORY) { 398 localPeer->deadPeerCursor = 0; 399 } 400 401 // Remove from peer map. 402 hashmapRemove(localPeer->peerProxies, &pid); 403 404 // External threads can no longer get to this peer proxy, so we don't 405 // need the lock anymore. 406 peerUnlock(localPeer); 407 408 // Remove the fd from the selector. 409 if (peerProxy->fd != NULL) { 410 peerProxy->fd->remove = true; 411 } 412 413 // Clear outgoing packet queue. 414 while (peerProxyNextPacket(peerProxy)) {} 415 416 bufferFree(peerProxy->inputBuffer); 417 418 // This only applies to the master. 419 if (peerProxy->connections != NULL) { 420 // We can't leave these other maps pointing to freed memory. 421 hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, 422 peerProxy); 423 hashmapFree(peerProxy->connections); 424 } 425 426 // Invoke death listener. 427 localPeer->onDeath(pid); 428 429 // Free the peer proxy itself. 430 free(peerProxy); 431} 432 433static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) { 434 if (errno == EINTR) { 435 // Log interruptions but otherwise ignore them. 436 LOGW("%s() interrupted.", functionName); 437 } else if (errno == EAGAIN) { 438 LOGD("EWOULDBLOCK"); 439 // Ignore. 440 } else { 441 LOGW("Error returned by %s().", functionName); 442 peerProxyKill(peerProxy, true); 443 } 444} 445 446/** 447 * Buffers output sent to a peer. May be called multiple times until the entire 448 * buffer is filled. Returns true when the buffer is empty. 449 */ 450static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) { 451 ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd); 452 if (size < 0) { 453 peerProxyHandleError(peerProxy, "write"); 454 return false; 455 } else { 456 return bufferWriteComplete(outgoing); 457 } 458} 459 460/** Writes packet bytes to peer. */ 461static void peerProxyWriteBytes(PeerProxy* peerProxy) { 462 Buffer* buffer = peerProxy->currentPacket->bytes; 463 if (peerProxyWriteFromBuffer(peerProxy, buffer)) { 464 LOGD("Bytes written."); 465 peerProxyNextPacket(peerProxy); 466 } 467} 468 469/** Sends a socket to the peer. */ 470static void peerProxyWriteConnection(PeerProxy* peerProxy) { 471 int socket = peerProxy->currentPacket->socket; 472 473 // Why does sending and receiving fds have to be such a PITA? 474 struct msghdr msg; 475 struct iovec iov[1]; 476 477 union { 478 struct cmsghdr cm; 479 char control[CMSG_SPACE(sizeof(int))]; 480 } control_un; 481 482 struct cmsghdr *cmptr; 483 484 msg.msg_control = control_un.control; 485 msg.msg_controllen = sizeof(control_un.control); 486 cmptr = CMSG_FIRSTHDR(&msg); 487 cmptr->cmsg_len = CMSG_LEN(sizeof(int)); 488 cmptr->cmsg_level = SOL_SOCKET; 489 cmptr->cmsg_type = SCM_RIGHTS; 490 491 // Store the socket in the message. 492 *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket; 493 494 msg.msg_name = NULL; 495 msg.msg_namelen = 0; 496 iov[0].iov_base = ""; 497 iov[0].iov_len = 1; 498 msg.msg_iov = iov; 499 msg.msg_iovlen = 1; 500 501 ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0); 502 503 if (result < 0) { 504 peerProxyHandleError(peerProxy, "sendmsg"); 505 } else { 506 // Success. Queue up the next packet. 507 peerProxyNextPacket(peerProxy); 508 509 } 510} 511 512/** 513 * Writes some outgoing data. 514 */ 515static void peerProxyWrite(SelectableFd* fd) { 516 // TODO: Try to write header and body with one system call. 517 518 PeerProxy* peerProxy = (PeerProxy*) fd->data; 519 OutgoingPacket* current = peerProxy->currentPacket; 520 521 if (current == NULL) { 522 // We have nothing left to write. 523 return; 524 } 525 526 // Write the header. 527 Buffer* outgoingHeader = &peerProxy->outgoingHeader; 528 bool headerWritten = bufferWriteComplete(outgoingHeader); 529 if (!headerWritten) { 530 LOGD("Writing header..."); 531 headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader); 532 if (headerWritten) { 533 LOGD("Header written."); 534 } 535 } 536 537 // Write body. 538 if (headerWritten) { 539 PacketType type = current->header.type; 540 switch (type) { 541 case CONNECTION: 542 peerProxyWriteConnection(peerProxy); 543 break; 544 case BYTES: 545 peerProxyWriteBytes(peerProxy); 546 break; 547 case CONNECTION_REQUEST: 548 case CONNECTION_ERROR: 549 // These packets consist solely of a header. 550 peerProxyNextPacket(peerProxy); 551 break; 552 default: 553 LOG_ALWAYS_FATAL("Unknown packet type: %d", type); 554 } 555 } 556} 557 558/** 559 * Sets up a peer proxy's fd before we try to select() it. 560 */ 561static void peerProxyBeforeSelect(SelectableFd* fd) { 562 LOGD("Before select..."); 563 564 PeerProxy* peerProxy = (PeerProxy*) fd->data; 565 566 peerLock(peerProxy->peer); 567 bool hasPackets = peerProxy->currentPacket != NULL; 568 peerUnlock(peerProxy->peer); 569 570 if (hasPackets) { 571 LOGD("Packets found. Setting onWritable()."); 572 573 fd->onWritable = &peerProxyWrite; 574 } else { 575 // We have nothing to write. 576 fd->onWritable = NULL; 577 } 578} 579 580/** Prepare to read bytes from the peer. */ 581static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) { 582 LOGD("Expecting %d bytes.", header->size); 583 584 peerProxy->inputState = READING_BYTES; 585 if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) { 586 LOGW("Couldn't allocate memory for incoming data. Size: %u", 587 (unsigned int) header->size); 588 589 // TODO: Ignore the packet and log a warning? 590 peerProxyKill(peerProxy, false); 591 } 592} 593 594/** 595 * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. 596 * Sends a connection request to the master if desired. 597 * 598 * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died 599 * or ENOMEM if memory couldn't be allocated. 600 */ 601static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, 602 bool requestConnection) { 603 if (pid == peer->pid) { 604 errno = EINVAL; 605 return NULL; 606 } 607 608 if (peerIsDead(peer, pid)) { 609 errno = EHOSTDOWN; 610 return NULL; 611 } 612 613 PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid); 614 if (peerProxy != NULL) { 615 return peerProxy; 616 } 617 618 // If this is the master peer, we already know about all peers. 619 if (peer->master) { 620 errno = EHOSTDOWN; 621 return NULL; 622 } 623 624 // Try to create a peer proxy. 625 Credentials credentials; 626 credentials.pid = pid; 627 628 // Fake gid and uid until we have the real thing. The real creds are 629 // filled in by masterProxyExpectConnection(). These fake creds will 630 // never be exposed to the user. 631 credentials.uid = 0; 632 credentials.gid = 0; 633 634 // Make sure we can allocate the connection request packet. 635 OutgoingPacket* packet = NULL; 636 if (requestConnection) { 637 packet = calloc(1, sizeof(OutgoingPacket)); 638 if (packet == NULL) { 639 errno = ENOMEM; 640 return NULL; 641 } 642 643 packet->header.type = CONNECTION_REQUEST; 644 packet->header.credentials = credentials; 645 packet->free = &outgoingPacketFree; 646 } 647 648 peerProxy = peerProxyCreate(peer, credentials); 649 if (peerProxy == NULL) { 650 free(packet); 651 errno = ENOMEM; 652 return NULL; 653 } else { 654 // Send a connection request to the master. 655 if (requestConnection) { 656 PeerProxy* masterProxy = peer->masterProxy; 657 peerProxyEnqueueOutgoingPacket(masterProxy, packet); 658 } 659 660 return peerProxy; 661 } 662} 663 664/** 665 * Switches the master peer proxy into a state where it's waiting for a 666 * connection from the master. 667 */ 668static void masterProxyExpectConnection(PeerProxy* masterProxy, 669 Header* header) { 670 // TODO: Restructure things so we don't need this check. 671 // Verify that this really is the master. 672 if (!masterProxy->master) { 673 LOGW("Non-master process %d tried to send us a connection.", 674 masterProxy->credentials.pid); 675 // Kill off the evil peer. 676 peerProxyKill(masterProxy, false); 677 return; 678 } 679 680 masterProxy->inputState = ACCEPTING_CONNECTION; 681 Peer* localPeer = masterProxy->peer; 682 683 // Create a peer proxy so we have somewhere to stash the creds. 684 // See if we already have a proxy set up. 685 pid_t pid = header->credentials.pid; 686 peerLock(localPeer); 687 PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false); 688 if (peerProxy == NULL) { 689 LOGW("Peer proxy creation failed: %s", strerror(errno)); 690 } else { 691 // Fill in full credentials. 692 peerProxy->credentials = header->credentials; 693 } 694 peerUnlock(localPeer); 695 696 // Keep track of which peer proxy we're accepting a connection for. 697 masterProxy->connecting = peerProxy; 698} 699 700/** 701 * Reads input from a peer process. 702 */ 703static void peerProxyRead(SelectableFd* fd); 704 705/** Sets up fd callbacks. */ 706static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) { 707 peerProxy->fd = fd; 708 fd->data = peerProxy; 709 fd->onReadable = &peerProxyRead; 710 fd->beforeSelect = &peerProxyBeforeSelect; 711 712 // Make the socket non-blocking. 713 setNonBlocking(fd->fd); 714} 715 716/** 717 * Accepts a connection sent by the master proxy. 718 */ 719static void masterProxyAcceptConnection(PeerProxy* masterProxy) { 720 struct msghdr msg; 721 struct iovec iov[1]; 722 ssize_t size; 723 char ignored; 724 int incomingFd; 725 726 // TODO: Reuse code which writes the connection. Who the heck designed 727 // this API anyway? 728 union { 729 struct cmsghdr cm; 730 char control[CMSG_SPACE(sizeof(int))]; 731 } control_un; 732 struct cmsghdr *cmptr; 733 msg.msg_control = control_un.control; 734 msg.msg_controllen = sizeof(control_un.control); 735 736 msg.msg_name = NULL; 737 msg.msg_namelen = 0; 738 739 // We sent 1 byte of data so we can detect EOF. 740 iov[0].iov_base = &ignored; 741 iov[0].iov_len = 1; 742 msg.msg_iov = iov; 743 msg.msg_iovlen = 1; 744 745 size = recvmsg(masterProxy->fd->fd, &msg, 0); 746 if (size < 0) { 747 if (errno == EINTR) { 748 // Log interruptions but otherwise ignore them. 749 LOGW("recvmsg() interrupted."); 750 return; 751 } else if (errno == EAGAIN) { 752 // Keep waiting for the connection. 753 return; 754 } else { 755 LOG_ALWAYS_FATAL("Error reading connection from master: %s", 756 strerror(errno)); 757 } 758 } else if (size == 0) { 759 // EOF. 760 LOG_ALWAYS_FATAL("Received EOF from master."); 761 } 762 763 // Extract fd from message. 764 if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL 765 && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { 766 if (cmptr->cmsg_level != SOL_SOCKET) { 767 LOG_ALWAYS_FATAL("Expected SOL_SOCKET."); 768 } 769 if (cmptr->cmsg_type != SCM_RIGHTS) { 770 LOG_ALWAYS_FATAL("Expected SCM_RIGHTS."); 771 } 772 incomingFd = *((int*) CMSG_DATA(cmptr)); 773 } else { 774 LOG_ALWAYS_FATAL("Expected fd."); 775 } 776 777 // The peer proxy this connection is for. 778 PeerProxy* peerProxy = masterProxy->connecting; 779 if (peerProxy == NULL) { 780 LOGW("Received connection for unknown peer."); 781 closeWithWarning(incomingFd); 782 } else { 783 Peer* peer = masterProxy->peer; 784 785 SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd); 786 if (selectableFd == NULL) { 787 LOGW("Error adding fd to selector for %d.", 788 peerProxy->credentials.pid); 789 closeWithWarning(incomingFd); 790 peerProxyKill(peerProxy, false); 791 } 792 793 peerProxySetFd(peerProxy, selectableFd); 794 } 795 796 peerProxyExpectHeader(masterProxy); 797} 798 799/** 800 * Frees an outgoing packet containing a connection. 801 */ 802static void outgoingPacketFreeSocket(OutgoingPacket* packet) { 803 closeWithWarning(packet->socket); 804 outgoingPacketFree(packet); 805} 806 807/** 808 * Connects two known peers. 809 */ 810static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) { 811 int sockets[2]; 812 int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets); 813 if (result == -1) { 814 LOGW("socketpair() error: %s", strerror(errno)); 815 // TODO: Send CONNECTION_FAILED packets to peers. 816 return; 817 } 818 819 OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket)); 820 OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket)); 821 if (packetA == NULL || packetB == NULL) { 822 free(packetA); 823 free(packetB); 824 LOGW("malloc() error. Failed to tell process %d that process %d is" 825 " dead.", peerA->credentials.pid, peerB->credentials.pid); 826 return; 827 } 828 829 packetA->header.type = CONNECTION; 830 packetB->header.type = CONNECTION; 831 832 packetA->header.credentials = peerB->credentials; 833 packetB->header.credentials = peerA->credentials; 834 835 packetA->socket = sockets[0]; 836 packetB->socket = sockets[1]; 837 838 packetA->free = &outgoingPacketFreeSocket; 839 packetB->free = &outgoingPacketFreeSocket; 840 841 peerLock(peerA->peer); 842 peerProxyEnqueueOutgoingPacket(peerA, packetA); 843 peerProxyEnqueueOutgoingPacket(peerB, packetB); 844 peerUnlock(peerA->peer); 845} 846 847/** 848 * Informs a peer that the peer they're trying to connect to couldn't be 849 * found. 850 */ 851static void masterReportConnectionError(PeerProxy* peerProxy, 852 Credentials credentials) { 853 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); 854 if (packet == NULL) { 855 LOGW("malloc() error. Failed to tell process %d that process %d is" 856 " dead.", peerProxy->credentials.pid, credentials.pid); 857 return; 858 } 859 860 packet->header.type = CONNECTION_ERROR; 861 packet->header.credentials = credentials; 862 packet->free = &outgoingPacketFree; 863 864 peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet); 865} 866 867/** 868 * Handles a request to be connected to another peer. 869 */ 870static void masterHandleConnectionRequest(PeerProxy* peerProxy, 871 Header* header) { 872 Peer* master = peerProxy->peer; 873 pid_t targetPid = header->credentials.pid; 874 if (!hashmapContainsKey(peerProxy->connections, &targetPid)) { 875 // We haven't connected these peers yet. 876 PeerProxy* targetPeer 877 = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid); 878 if (targetPeer == NULL) { 879 // Unknown process. 880 masterReportConnectionError(peerProxy, header->credentials); 881 } else { 882 masterConnectPeers(peerProxy, targetPeer); 883 } 884 } 885 886 // This packet is complete. Get ready for the next one. 887 peerProxyExpectHeader(peerProxy); 888} 889 890/** 891 * The master told us this peer is dead. 892 */ 893static void masterProxyHandleConnectionError(PeerProxy* masterProxy, 894 Header* header) { 895 Peer* peer = masterProxy->peer; 896 897 // Look up the peer proxy. 898 pid_t pid = header->credentials.pid; 899 PeerProxy* peerProxy = NULL; 900 peerLock(peer); 901 peerProxy = hashmapGet(peer->peerProxies, &pid); 902 peerUnlock(peer); 903 904 if (peerProxy != NULL) { 905 LOGI("Couldn't connect to %d.", pid); 906 peerProxyKill(peerProxy, false); 907 } else { 908 LOGW("Peer proxy for %d not found. This shouldn't happen.", pid); 909 } 910 911 peerProxyExpectHeader(masterProxy); 912} 913 914/** 915 * Handles a packet header. 916 */ 917static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) { 918 switch (header->type) { 919 case CONNECTION_REQUEST: 920 masterHandleConnectionRequest(peerProxy, header); 921 break; 922 case CONNECTION: 923 masterProxyExpectConnection(peerProxy, header); 924 break; 925 case CONNECTION_ERROR: 926 masterProxyHandleConnectionError(peerProxy, header); 927 break; 928 case BYTES: 929 peerProxyExpectBytes(peerProxy, header); 930 break; 931 default: 932 LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, 933 header->type); 934 peerProxyKill(peerProxy, false); 935 } 936} 937 938/** 939 * Buffers input sent by peer. May be called multiple times until the entire 940 * buffer is filled. Returns true when the buffer is full. 941 */ 942static bool peerProxyBufferInput(PeerProxy* peerProxy) { 943 Buffer* in = peerProxy->inputBuffer; 944 ssize_t size = bufferRead(in, peerProxy->fd->fd); 945 if (size < 0) { 946 peerProxyHandleError(peerProxy, "read"); 947 return false; 948 } else if (size == 0) { 949 // EOF. 950 LOGI("EOF"); 951 peerProxyKill(peerProxy, false); 952 return false; 953 } else if (bufferReadComplete(in)) { 954 // We're done! 955 return true; 956 } else { 957 // Continue reading. 958 return false; 959 } 960} 961 962/** 963 * Reads input from a peer process. 964 */ 965static void peerProxyRead(SelectableFd* fd) { 966 LOGD("Reading..."); 967 PeerProxy* peerProxy = (PeerProxy*) fd->data; 968 int state = peerProxy->inputState; 969 Buffer* in = peerProxy->inputBuffer; 970 switch (state) { 971 case READING_HEADER: 972 if (peerProxyBufferInput(peerProxy)) { 973 LOGD("Header read."); 974 // We've read the complete header. 975 Header* header = (Header*) in->data; 976 peerProxyHandleHeader(peerProxy, header); 977 } 978 break; 979 case READING_BYTES: 980 LOGD("Reading bytes..."); 981 if (peerProxyBufferInput(peerProxy)) { 982 LOGD("Bytes read."); 983 // We have the complete packet. Notify bytes listener. 984 peerProxy->peer->onBytes(peerProxy->credentials, 985 in->data, in->size); 986 987 // Get ready for the next packet. 988 peerProxyExpectHeader(peerProxy); 989 } 990 break; 991 case ACCEPTING_CONNECTION: 992 masterProxyAcceptConnection(peerProxy); 993 break; 994 default: 995 LOG_ALWAYS_FATAL("Unknown state: %d", state); 996 } 997} 998 999static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) { 1000 PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy)); 1001 if (peerProxy == NULL) { 1002 return NULL; 1003 } 1004 1005 peerProxy->inputBuffer = bufferCreate(sizeof(Header)); 1006 if (peerProxy->inputBuffer == NULL) { 1007 free(peerProxy); 1008 return NULL; 1009 } 1010 1011 peerProxy->peer = peer; 1012 peerProxy->credentials = credentials; 1013 1014 // Initial state == expecting a header. 1015 peerProxyExpectHeader(peerProxy); 1016 1017 // Add this proxy to the map. Make sure the key points to the stable memory 1018 // inside of the peer proxy itself. 1019 pid_t* pid = &(peerProxy->credentials.pid); 1020 hashmapPut(peer->peerProxies, pid, peerProxy); 1021 return peerProxy; 1022} 1023 1024/** Accepts a connection to the master peer. */ 1025static void masterAcceptConnection(SelectableFd* listenerFd) { 1026 // Accept connection. 1027 int socket = accept(listenerFd->fd, NULL, NULL); 1028 if (socket == -1) { 1029 LOGW("accept() error: %s", strerror(errno)); 1030 return; 1031 } 1032 1033 LOGD("Accepted connection as fd %d.", socket); 1034 1035 // Get credentials. 1036 Credentials credentials; 1037 struct ucred ucredentials; 1038 socklen_t credentialsSize = sizeof(struct ucred); 1039 int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, 1040 &ucredentials, &credentialsSize); 1041 // We might want to verify credentialsSize. 1042 if (result == -1) { 1043 LOGW("getsockopt() error: %s", strerror(errno)); 1044 closeWithWarning(socket); 1045 return; 1046 } 1047 1048 // Copy values into our own structure so we know we have the types right. 1049 credentials.pid = ucredentials.pid; 1050 credentials.uid = ucredentials.uid; 1051 credentials.gid = ucredentials.gid; 1052 1053 LOGI("Accepted connection from process %d.", credentials.pid); 1054 1055 Peer* masterPeer = (Peer*) listenerFd->data; 1056 1057 peerLock(masterPeer); 1058 1059 // Make sure we don't already have a connection from that process. 1060 PeerProxy* peerProxy 1061 = hashmapGet(masterPeer->peerProxies, &credentials.pid); 1062 if (peerProxy != NULL) { 1063 peerUnlock(masterPeer); 1064 LOGW("Alread connected to process %d.", credentials.pid); 1065 closeWithWarning(socket); 1066 return; 1067 } 1068 1069 // Add connection to the selector. 1070 SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket); 1071 if (socketFd == NULL) { 1072 peerUnlock(masterPeer); 1073 LOGW("malloc() failed."); 1074 closeWithWarning(socket); 1075 return; 1076 } 1077 1078 // Create a peer proxy. 1079 peerProxy = peerProxyCreate(masterPeer, credentials); 1080 peerUnlock(masterPeer); 1081 if (peerProxy == NULL) { 1082 LOGW("malloc() failed."); 1083 socketFd->remove = true; 1084 closeWithWarning(socket); 1085 } 1086 peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals); 1087 peerProxySetFd(peerProxy, socketFd); 1088} 1089 1090/** 1091 * Creates the local peer. 1092 */ 1093static Peer* peerCreate() { 1094 Peer* peer = calloc(1, sizeof(Peer)); 1095 if (peer == NULL) { 1096 LOG_ALWAYS_FATAL("malloc() error."); 1097 } 1098 peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals); 1099 peer->selector = selectorCreate(); 1100 1101 pthread_mutexattr_t attributes; 1102 if (pthread_mutexattr_init(&attributes) != 0) { 1103 LOG_ALWAYS_FATAL("pthread_mutexattr_init() error."); 1104 } 1105 if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) { 1106 LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error."); 1107 } 1108 if (pthread_mutex_init(&peer->mutex, &attributes) != 0) { 1109 LOG_ALWAYS_FATAL("pthread_mutex_init() error."); 1110 } 1111 1112 peer->pid = getpid(); 1113 return peer; 1114} 1115 1116/** The local peer. */ 1117static Peer* localPeer; 1118 1119/** Frees a packet of bytes. */ 1120static void outgoingPacketFreeBytes(OutgoingPacket* packet) { 1121 LOGD("Freeing outgoing packet."); 1122 bufferFree(packet->bytes); 1123 free(packet); 1124} 1125 1126/** 1127 * Sends a packet of bytes to a remote peer. Returns 0 on success. 1128 * 1129 * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be 1130 * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno 1131 * to EINVAL if pid is the same as the local pid. 1132 */ 1133int peerSendBytes(pid_t pid, const char* bytes, size_t size) { 1134 Peer* peer = localPeer; 1135 assert(peer != NULL); 1136 1137 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); 1138 if (packet == NULL) { 1139 errno = ENOMEM; 1140 return -1; 1141 } 1142 1143 Buffer* copy = bufferCreate(size); 1144 if (copy == NULL) { 1145 free(packet); 1146 errno = ENOMEM; 1147 return -1; 1148 } 1149 1150 // Copy data. 1151 memcpy(copy->data, bytes, size); 1152 copy->size = size; 1153 1154 packet->bytes = copy; 1155 packet->header.type = BYTES; 1156 packet->header.size = size; 1157 packet->free = outgoingPacketFreeBytes; 1158 bufferPrepareForWrite(packet->bytes); 1159 1160 peerLock(peer); 1161 1162 PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); 1163 if (peerProxy == NULL) { 1164 // The peer is already dead or we couldn't alloc memory. Either way, 1165 // errno is set. 1166 peerUnlock(peer); 1167 packet->free(packet); 1168 return -1; 1169 } else { 1170 peerProxyEnqueueOutgoingPacket(peerProxy, packet); 1171 peerUnlock(peer); 1172 selectorWakeUp(peer->selector); 1173 return 0; 1174 } 1175} 1176 1177/** Keeps track of how to free shared bytes. */ 1178typedef struct { 1179 void (*free)(void* context); 1180 void* context; 1181} SharedBytesFreer; 1182 1183/** Frees shared bytes. */ 1184static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) { 1185 SharedBytesFreer* sharedBytesFreer 1186 = (SharedBytesFreer*) packet->context; 1187 sharedBytesFreer->free(sharedBytesFreer->context); 1188 free(sharedBytesFreer); 1189 free(packet); 1190} 1191 1192/** 1193 * Sends a packet of bytes to a remote peer without copying the bytes. Calls 1194 * free() with context after the bytes have been sent. 1195 * 1196 * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be 1197 * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno 1198 * to EINVAL if pid is the same as the local pid. 1199 */ 1200int peerSendSharedBytes(pid_t pid, char* bytes, size_t size, 1201 void (*free)(void* context), void* context) { 1202 Peer* peer = localPeer; 1203 assert(peer != NULL); 1204 1205 OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); 1206 if (packet == NULL) { 1207 errno = ENOMEM; 1208 return -1; 1209 } 1210 1211 Buffer* wrapper = bufferWrap(bytes, size, size); 1212 if (wrapper == NULL) { 1213 free(packet); 1214 errno = ENOMEM; 1215 return -1; 1216 } 1217 1218 SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer)); 1219 if (sharedBytesFreer == NULL) { 1220 free(packet); 1221 free(wrapper); 1222 errno = ENOMEM; 1223 return -1; 1224 } 1225 sharedBytesFreer->free = free; 1226 sharedBytesFreer->context = context; 1227 1228 packet->bytes = wrapper; 1229 packet->context = sharedBytesFreer; 1230 packet->header.type = BYTES; 1231 packet->header.size = size; 1232 packet->free = &outgoingPacketFreeSharedBytes; 1233 bufferPrepareForWrite(packet->bytes); 1234 1235 peerLock(peer); 1236 1237 PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); 1238 if (peerProxy == NULL) { 1239 // The peer is already dead or we couldn't alloc memory. Either way, 1240 // errno is set. 1241 peerUnlock(peer); 1242 packet->free(packet); 1243 return -1; 1244 } else { 1245 peerProxyEnqueueOutgoingPacket(peerProxy, packet); 1246 peerUnlock(peer); 1247 selectorWakeUp(peer->selector); 1248 return 0; 1249 } 1250} 1251 1252/** 1253 * Starts the master peer. The master peer differs from other peers in that 1254 * it is responsible for connecting the other peers. You can only have one 1255 * master peer. 1256 * 1257 * Goes into an I/O loop and does not return. 1258 */ 1259void masterPeerInitialize(BytesListener* bytesListener, 1260 DeathListener* deathListener) { 1261 // Create and bind socket. 1262 int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0); 1263 if (listenerSocket == -1) { 1264 LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); 1265 } 1266 unlink(MASTER_PATH); 1267 int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), 1268 sizeof(UnixAddress)); 1269 if (result == -1) { 1270 LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno)); 1271 } 1272 1273 LOGD("Listener socket: %d", listenerSocket); 1274 1275 // Queue up to 16 connections. 1276 result = listen(listenerSocket, 16); 1277 if (result != 0) { 1278 LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno)); 1279 } 1280 1281 // Make socket non-blocking. 1282 setNonBlocking(listenerSocket); 1283 1284 // Create the peer for this process. Fail if we already have one. 1285 if (localPeer != NULL) { 1286 LOG_ALWAYS_FATAL("Peer is already initialized."); 1287 } 1288 localPeer = peerCreate(); 1289 if (localPeer == NULL) { 1290 LOG_ALWAYS_FATAL("malloc() failed."); 1291 } 1292 localPeer->master = true; 1293 localPeer->onBytes = bytesListener; 1294 localPeer->onDeath = deathListener; 1295 1296 // Make listener socket selectable. 1297 SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket); 1298 if (listenerFd == NULL) { 1299 LOG_ALWAYS_FATAL("malloc() error."); 1300 } 1301 listenerFd->data = localPeer; 1302 listenerFd->onReadable = &masterAcceptConnection; 1303} 1304 1305/** 1306 * Starts a local peer. 1307 * 1308 * Goes into an I/O loop and does not return. 1309 */ 1310void peerInitialize(BytesListener* bytesListener, 1311 DeathListener* deathListener) { 1312 // Connect to master peer. 1313 int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0); 1314 if (masterSocket == -1) { 1315 LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); 1316 } 1317 int result = connect(masterSocket, (SocketAddress*) getMasterAddress(), 1318 sizeof(UnixAddress)); 1319 if (result != 0) { 1320 LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno)); 1321 } 1322 1323 // Create the peer for this process. Fail if we already have one. 1324 if (localPeer != NULL) { 1325 LOG_ALWAYS_FATAL("Peer is already initialized."); 1326 } 1327 localPeer = peerCreate(); 1328 if (localPeer == NULL) { 1329 LOG_ALWAYS_FATAL("malloc() failed."); 1330 } 1331 localPeer->onBytes = bytesListener; 1332 localPeer->onDeath = deathListener; 1333 1334 // Make connection selectable. 1335 SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket); 1336 if (masterFd == NULL) { 1337 LOG_ALWAYS_FATAL("malloc() error."); 1338 } 1339 1340 // Create a peer proxy for the master peer. 1341 PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS); 1342 if (masterProxy == NULL) { 1343 LOG_ALWAYS_FATAL("malloc() error."); 1344 } 1345 peerProxySetFd(masterProxy, masterFd); 1346 masterProxy->master = true; 1347 localPeer->masterProxy = masterProxy; 1348} 1349 1350/** Starts the master peer I/O loop. Doesn't return. */ 1351void peerLoop() { 1352 assert(localPeer != NULL); 1353 1354 // Start selector. 1355 selectorLoop(localPeer->selector); 1356} 1357 1358