1/* http://frotznet.googlecode.com/svn/trunk/utils/fdevent.c 2** 3** Copyright 2006, Brian Swetland <swetland@frotz.net> 4** 5** Licensed under the Apache License, Version 2.0 (the "License"); 6** you may not use this file except in compliance with the License. 7** You may obtain a copy of the License at 8** 9** http://www.apache.org/licenses/LICENSE-2.0 10** 11** Unless required by applicable law or agreed to in writing, software 12** distributed under the License is distributed on an "AS IS" BASIS, 13** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14** See the License for the specific language governing permissions and 15** limitations under the License. 16*/ 17 18#define TRACE_TAG FDEVENT 19 20#include "sysdeps.h" 21#include "fdevent.h" 22 23#include <fcntl.h> 24#include <stdlib.h> 25#include <string.h> 26#include <unistd.h> 27 28#include <atomic> 29#include <functional> 30#include <list> 31#include <mutex> 32#include <unordered_map> 33#include <vector> 34 35#include <android-base/logging.h> 36#include <android-base/stringprintf.h> 37#include <android-base/thread_annotations.h> 38 39#include "adb_io.h" 40#include "adb_trace.h" 41#include "adb_unique_fd.h" 42#include "adb_utils.h" 43 44#if !ADB_HOST 45// This socket is used when a subproc shell service exists. 46// It wakes up the fdevent_loop() and cause the correct handling 47// of the shell's pseudo-tty master. I.e. force close it. 48int SHELL_EXIT_NOTIFY_FD = -1; 49#endif // !ADB_HOST 50 51#define FDE_EVENTMASK 0x00ff 52#define FDE_STATEMASK 0xff00 53 54#define FDE_ACTIVE 0x0100 55#define FDE_PENDING 0x0200 56#define FDE_CREATED 0x0400 57 58struct PollNode { 59 fdevent* fde; 60 adb_pollfd pollfd; 61 62 explicit PollNode(fdevent* fde) : fde(fde) { 63 memset(&pollfd, 0, sizeof(pollfd)); 64 pollfd.fd = fde->fd; 65 66#if defined(__linux__) 67 // Always enable POLLRDHUP, so the host server can take action when some clients disconnect. 68 // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034. 69 pollfd.events = POLLRDHUP; 70#endif 71 } 72}; 73 74// All operations to fdevent should happen only in the main thread. 75// That's why we don't need a lock for fdevent. 76static auto& g_poll_node_map = *new std::unordered_map<int, PollNode>(); 77static auto& g_pending_list = *new std::list<fdevent*>(); 78static std::atomic<bool> terminate_loop(false); 79static bool main_thread_valid; 80static unsigned long main_thread_id; 81 82static auto& run_queue_notify_fd = *new unique_fd(); 83static auto& run_queue_mutex = *new std::mutex(); 84static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector<std::function<void()>>(); 85 86void check_main_thread() { 87 if (main_thread_valid) { 88 CHECK_EQ(main_thread_id, adb_thread_id()); 89 } 90} 91 92void set_main_thread() { 93 main_thread_valid = true; 94 main_thread_id = adb_thread_id(); 95} 96 97static std::string dump_fde(const fdevent* fde) { 98 std::string state; 99 if (fde->state & FDE_ACTIVE) { 100 state += "A"; 101 } 102 if (fde->state & FDE_PENDING) { 103 state += "P"; 104 } 105 if (fde->state & FDE_CREATED) { 106 state += "C"; 107 } 108 if (fde->state & FDE_READ) { 109 state += "R"; 110 } 111 if (fde->state & FDE_WRITE) { 112 state += "W"; 113 } 114 if (fde->state & FDE_ERROR) { 115 state += "E"; 116 } 117 if (fde->state & FDE_DONT_CLOSE) { 118 state += "D"; 119 } 120 return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str()); 121} 122 123fdevent* fdevent_create(int fd, fd_func func, void* arg) { 124 check_main_thread(); 125 fdevent *fde = (fdevent*) malloc(sizeof(fdevent)); 126 if(fde == 0) return 0; 127 fdevent_install(fde, fd, func, arg); 128 fde->state |= FDE_CREATED; 129 return fde; 130} 131 132void fdevent_destroy(fdevent* fde) { 133 check_main_thread(); 134 if(fde == 0) return; 135 if(!(fde->state & FDE_CREATED)) { 136 LOG(FATAL) << "destroying fde not created by fdevent_create(): " << dump_fde(fde); 137 } 138 fdevent_remove(fde); 139 free(fde); 140} 141 142void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) { 143 check_main_thread(); 144 CHECK_GE(fd, 0); 145 memset(fde, 0, sizeof(fdevent)); 146 fde->state = FDE_ACTIVE; 147 fde->fd = fd; 148 fde->func = func; 149 fde->arg = arg; 150 if (!set_file_block_mode(fd, false)) { 151 // Here is not proper to handle the error. If it fails here, some error is 152 // likely to be detected by poll(), then we can let the callback function 153 // to handle it. 154 LOG(ERROR) << "failed to set non-blocking mode for fd " << fd; 155 } 156 auto pair = g_poll_node_map.emplace(fde->fd, PollNode(fde)); 157 CHECK(pair.second) << "install existing fd " << fd; 158 D("fdevent_install %s", dump_fde(fde).c_str()); 159} 160 161void fdevent_remove(fdevent* fde) { 162 check_main_thread(); 163 D("fdevent_remove %s", dump_fde(fde).c_str()); 164 if (fde->state & FDE_ACTIVE) { 165 g_poll_node_map.erase(fde->fd); 166 if (fde->state & FDE_PENDING) { 167 g_pending_list.remove(fde); 168 } 169 if (!(fde->state & FDE_DONT_CLOSE)) { 170 adb_close(fde->fd); 171 fde->fd = -1; 172 } 173 fde->state = 0; 174 fde->events = 0; 175 } 176} 177 178static void fdevent_update(fdevent* fde, unsigned events) { 179 auto it = g_poll_node_map.find(fde->fd); 180 CHECK(it != g_poll_node_map.end()); 181 PollNode& node = it->second; 182 if (events & FDE_READ) { 183 node.pollfd.events |= POLLIN; 184 } else { 185 node.pollfd.events &= ~POLLIN; 186 } 187 188 if (events & FDE_WRITE) { 189 node.pollfd.events |= POLLOUT; 190 } else { 191 node.pollfd.events &= ~POLLOUT; 192 } 193 fde->state = (fde->state & FDE_STATEMASK) | events; 194} 195 196void fdevent_set(fdevent* fde, unsigned events) { 197 check_main_thread(); 198 events &= FDE_EVENTMASK; 199 if ((fde->state & FDE_EVENTMASK) == events) { 200 return; 201 } 202 CHECK(fde->state & FDE_ACTIVE); 203 fdevent_update(fde, events); 204 D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events); 205 206 if (fde->state & FDE_PENDING) { 207 // If we are pending, make sure we don't signal an event that is no longer wanted. 208 fde->events &= events; 209 if (fde->events == 0) { 210 g_pending_list.remove(fde); 211 fde->state &= ~FDE_PENDING; 212 } 213 } 214} 215 216void fdevent_add(fdevent* fde, unsigned events) { 217 check_main_thread(); 218 fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); 219} 220 221void fdevent_del(fdevent* fde, unsigned events) { 222 check_main_thread(); 223 fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); 224} 225 226static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { 227 std::string result; 228 for (const auto& pollfd : pollfds) { 229 std::string op; 230 if (pollfd.events & POLLIN) { 231 op += "R"; 232 } 233 if (pollfd.events & POLLOUT) { 234 op += "W"; 235 } 236 android::base::StringAppendF(&result, " %d(%s)", pollfd.fd, op.c_str()); 237 } 238 return result; 239} 240 241static void fdevent_process() { 242 std::vector<adb_pollfd> pollfds; 243 for (const auto& pair : g_poll_node_map) { 244 pollfds.push_back(pair.second.pollfd); 245 } 246 CHECK_GT(pollfds.size(), 0u); 247 D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); 248 int ret = adb_poll(&pollfds[0], pollfds.size(), -1); 249 if (ret == -1) { 250 PLOG(ERROR) << "poll(), ret = " << ret; 251 return; 252 } 253 for (const auto& pollfd : pollfds) { 254 if (pollfd.revents != 0) { 255 D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); 256 } 257 unsigned events = 0; 258 if (pollfd.revents & POLLIN) { 259 events |= FDE_READ; 260 } 261 if (pollfd.revents & POLLOUT) { 262 events |= FDE_WRITE; 263 } 264 if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { 265 // We fake a read, as the rest of the code assumes that errors will 266 // be detected at that point. 267 events |= FDE_READ | FDE_ERROR; 268 } 269#if defined(__linux__) 270 if (pollfd.revents & POLLRDHUP) { 271 events |= FDE_READ | FDE_ERROR; 272 } 273#endif 274 if (events != 0) { 275 auto it = g_poll_node_map.find(pollfd.fd); 276 CHECK(it != g_poll_node_map.end()); 277 fdevent* fde = it->second.fde; 278 CHECK_EQ(fde->fd, pollfd.fd); 279 fde->events |= events; 280 D("%s got events %x", dump_fde(fde).c_str(), events); 281 fde->state |= FDE_PENDING; 282 g_pending_list.push_back(fde); 283 } 284 } 285} 286 287static void fdevent_call_fdfunc(fdevent* fde) { 288 unsigned events = fde->events; 289 fde->events = 0; 290 CHECK(fde->state & FDE_PENDING); 291 fde->state &= (~FDE_PENDING); 292 D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); 293 fde->func(fde->fd, events, fde->arg); 294} 295 296#if !ADB_HOST 297 298#include <sys/ioctl.h> 299 300static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) { 301 D("subproc handling on fd = %d, ev = %x", fd, ev); 302 303 CHECK_GE(fd, 0); 304 305 if (ev & FDE_READ) { 306 int subproc_fd; 307 308 if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) { 309 LOG(FATAL) << "Failed to read the subproc's fd from " << fd; 310 } 311 auto it = g_poll_node_map.find(subproc_fd); 312 if (it == g_poll_node_map.end()) { 313 D("subproc_fd %d cleared from fd_table", subproc_fd); 314 adb_close(subproc_fd); 315 return; 316 } 317 fdevent* subproc_fde = it->second.fde; 318 if(subproc_fde->fd != subproc_fd) { 319 // Already reallocated? 320 LOG(FATAL) << "subproc_fd(" << subproc_fd << ") != subproc_fde->fd(" << subproc_fde->fd 321 << ")"; 322 return; 323 } 324 325 subproc_fde->force_eof = 1; 326 327 int rcount = 0; 328 ioctl(subproc_fd, FIONREAD, &rcount); 329 D("subproc with fd %d has rcount=%d, err=%d", subproc_fd, rcount, errno); 330 if (rcount != 0) { 331 // If there is data left, it will show up in the select(). 332 // This works because there is no other thread reading that 333 // data when in this fd_func(). 334 return; 335 } 336 337 D("subproc_fde %s", dump_fde(subproc_fde).c_str()); 338 subproc_fde->events |= FDE_READ; 339 if(subproc_fde->state & FDE_PENDING) { 340 return; 341 } 342 subproc_fde->state |= FDE_PENDING; 343 fdevent_call_fdfunc(subproc_fde); 344 } 345} 346 347static void fdevent_subproc_setup() { 348 int s[2]; 349 350 if(adb_socketpair(s)) { 351 PLOG(FATAL) << "cannot create shell-exit socket-pair"; 352 } 353 D("fdevent_subproc: socket pair (%d, %d)", s[0], s[1]); 354 355 SHELL_EXIT_NOTIFY_FD = s[0]; 356 fdevent *fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL); 357 CHECK(fde != nullptr) << "cannot create fdevent for shell-exit handler"; 358 fdevent_add(fde, FDE_READ); 359} 360#endif // !ADB_HOST 361 362static void fdevent_run_flush() REQUIRES(run_queue_mutex) { 363 for (auto& f : run_queue) { 364 f(); 365 } 366 run_queue.clear(); 367} 368 369static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { 370 CHECK_GE(fd, 0); 371 CHECK(ev & FDE_READ); 372 373 char buf[1024]; 374 375 // Empty the fd. 376 if (adb_read(fd, buf, sizeof(buf)) == -1) { 377 PLOG(FATAL) << "failed to empty run queue notify fd"; 378 } 379 380 std::lock_guard<std::mutex> lock(run_queue_mutex); 381 fdevent_run_flush(); 382} 383 384static void fdevent_run_setup() { 385 std::lock_guard<std::mutex> lock(run_queue_mutex); 386 CHECK(run_queue_notify_fd.get() == -1); 387 int s[2]; 388 if (adb_socketpair(s) != 0) { 389 PLOG(FATAL) << "failed to create run queue notify socketpair"; 390 } 391 392 run_queue_notify_fd.reset(s[0]); 393 fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr); 394 CHECK(fde != nullptr); 395 fdevent_add(fde, FDE_READ); 396 397 fdevent_run_flush(); 398} 399 400void fdevent_run_on_main_thread(std::function<void()> fn) { 401 std::lock_guard<std::mutex> lock(run_queue_mutex); 402 run_queue.push_back(std::move(fn)); 403 404 // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. 405 // In that case, rely on the setup code to flush the queue without a notification being needed. 406 if (run_queue_notify_fd != -1) { 407 if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) { 408 PLOG(FATAL) << "failed to write to run queue notify fd"; 409 } 410 } 411} 412 413void fdevent_loop() { 414 set_main_thread(); 415#if !ADB_HOST 416 fdevent_subproc_setup(); 417#endif // !ADB_HOST 418 fdevent_run_setup(); 419 420 while (true) { 421 if (terminate_loop) { 422 return; 423 } 424 425 D("--- --- waiting for events"); 426 427 fdevent_process(); 428 429 while (!g_pending_list.empty()) { 430 fdevent* fde = g_pending_list.front(); 431 g_pending_list.pop_front(); 432 fdevent_call_fdfunc(fde); 433 } 434 } 435} 436 437void fdevent_terminate_loop() { 438 terminate_loop = true; 439} 440 441size_t fdevent_installed_count() { 442 return g_poll_node_map.size(); 443} 444 445void fdevent_reset() { 446 g_poll_node_map.clear(); 447 g_pending_list.clear(); 448 449 std::lock_guard<std::mutex> lock(run_queue_mutex); 450 run_queue_notify_fd.reset(); 451 run_queue.clear(); 452 453 main_thread_valid = false; 454 terminate_loop = false; 455} 456