1/* http://frotznet.googlecode.com/svn/trunk/utils/fdevent.c
2**
3** Copyright 2006, Brian Swetland <swetland@frotz.net>
4**
5** Licensed under the Apache License, Version 2.0 (the "License");
6** you may not use this file except in compliance with the License.
7** You may obtain a copy of the License at
8**
9**     http://www.apache.org/licenses/LICENSE-2.0
10**
11** Unless required by applicable law or agreed to in writing, software
12** distributed under the License is distributed on an "AS IS" BASIS,
13** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14** See the License for the specific language governing permissions and
15** limitations under the License.
16*/
17
18#define TRACE_TAG FDEVENT
19
20#include "sysdeps.h"
21#include "fdevent.h"
22
23#include <fcntl.h>
24#include <stdlib.h>
25#include <string.h>
26#include <unistd.h>
27
28#include <atomic>
29#include <functional>
30#include <list>
31#include <mutex>
32#include <unordered_map>
33#include <vector>
34
35#include <android-base/logging.h>
36#include <android-base/stringprintf.h>
37#include <android-base/thread_annotations.h>
38
39#include "adb_io.h"
40#include "adb_trace.h"
41#include "adb_unique_fd.h"
42#include "adb_utils.h"
43
44#if !ADB_HOST
45// This socket is used when a subproc shell service exists.
46// It wakes up the fdevent_loop() and cause the correct handling
47// of the shell's pseudo-tty master. I.e. force close it.
48int SHELL_EXIT_NOTIFY_FD = -1;
49#endif // !ADB_HOST
50
51#define FDE_EVENTMASK  0x00ff
52#define FDE_STATEMASK  0xff00
53
54#define FDE_ACTIVE     0x0100
55#define FDE_PENDING    0x0200
56#define FDE_CREATED    0x0400
57
58struct PollNode {
59  fdevent* fde;
60  adb_pollfd pollfd;
61
62  explicit PollNode(fdevent* fde) : fde(fde) {
63      memset(&pollfd, 0, sizeof(pollfd));
64      pollfd.fd = fde->fd;
65
66#if defined(__linux__)
67      // Always enable POLLRDHUP, so the host server can take action when some clients disconnect.
68      // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034.
69      pollfd.events = POLLRDHUP;
70#endif
71  }
72};
73
74// All operations to fdevent should happen only in the main thread.
75// That's why we don't need a lock for fdevent.
76static auto& g_poll_node_map = *new std::unordered_map<int, PollNode>();
77static auto& g_pending_list = *new std::list<fdevent*>();
78static std::atomic<bool> terminate_loop(false);
79static bool main_thread_valid;
80static unsigned long main_thread_id;
81
82static auto& run_queue_notify_fd = *new unique_fd();
83static auto& run_queue_mutex = *new std::mutex();
84static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector<std::function<void()>>();
85
86void check_main_thread() {
87    if (main_thread_valid) {
88        CHECK_EQ(main_thread_id, adb_thread_id());
89    }
90}
91
92void set_main_thread() {
93    main_thread_valid = true;
94    main_thread_id = adb_thread_id();
95}
96
97static std::string dump_fde(const fdevent* fde) {
98    std::string state;
99    if (fde->state & FDE_ACTIVE) {
100        state += "A";
101    }
102    if (fde->state & FDE_PENDING) {
103        state += "P";
104    }
105    if (fde->state & FDE_CREATED) {
106        state += "C";
107    }
108    if (fde->state & FDE_READ) {
109        state += "R";
110    }
111    if (fde->state & FDE_WRITE) {
112        state += "W";
113    }
114    if (fde->state & FDE_ERROR) {
115        state += "E";
116    }
117    if (fde->state & FDE_DONT_CLOSE) {
118        state += "D";
119    }
120    return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str());
121}
122
123fdevent* fdevent_create(int fd, fd_func func, void* arg) {
124    check_main_thread();
125    fdevent *fde = (fdevent*) malloc(sizeof(fdevent));
126    if(fde == 0) return 0;
127    fdevent_install(fde, fd, func, arg);
128    fde->state |= FDE_CREATED;
129    return fde;
130}
131
132void fdevent_destroy(fdevent* fde) {
133    check_main_thread();
134    if(fde == 0) return;
135    if(!(fde->state & FDE_CREATED)) {
136        LOG(FATAL) << "destroying fde not created by fdevent_create(): " << dump_fde(fde);
137    }
138    fdevent_remove(fde);
139    free(fde);
140}
141
142void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) {
143    check_main_thread();
144    CHECK_GE(fd, 0);
145    memset(fde, 0, sizeof(fdevent));
146    fde->state = FDE_ACTIVE;
147    fde->fd = fd;
148    fde->func = func;
149    fde->arg = arg;
150    if (!set_file_block_mode(fd, false)) {
151        // Here is not proper to handle the error. If it fails here, some error is
152        // likely to be detected by poll(), then we can let the callback function
153        // to handle it.
154        LOG(ERROR) << "failed to set non-blocking mode for fd " << fd;
155    }
156    auto pair = g_poll_node_map.emplace(fde->fd, PollNode(fde));
157    CHECK(pair.second) << "install existing fd " << fd;
158    D("fdevent_install %s", dump_fde(fde).c_str());
159}
160
161void fdevent_remove(fdevent* fde) {
162    check_main_thread();
163    D("fdevent_remove %s", dump_fde(fde).c_str());
164    if (fde->state & FDE_ACTIVE) {
165        g_poll_node_map.erase(fde->fd);
166        if (fde->state & FDE_PENDING) {
167            g_pending_list.remove(fde);
168        }
169        if (!(fde->state & FDE_DONT_CLOSE)) {
170            adb_close(fde->fd);
171            fde->fd = -1;
172        }
173        fde->state = 0;
174        fde->events = 0;
175    }
176}
177
178static void fdevent_update(fdevent* fde, unsigned events) {
179    auto it = g_poll_node_map.find(fde->fd);
180    CHECK(it != g_poll_node_map.end());
181    PollNode& node = it->second;
182    if (events & FDE_READ) {
183        node.pollfd.events |= POLLIN;
184    } else {
185        node.pollfd.events &= ~POLLIN;
186    }
187
188    if (events & FDE_WRITE) {
189        node.pollfd.events |= POLLOUT;
190    } else {
191        node.pollfd.events &= ~POLLOUT;
192    }
193    fde->state = (fde->state & FDE_STATEMASK) | events;
194}
195
196void fdevent_set(fdevent* fde, unsigned events) {
197    check_main_thread();
198    events &= FDE_EVENTMASK;
199    if ((fde->state & FDE_EVENTMASK) == events) {
200        return;
201    }
202    CHECK(fde->state & FDE_ACTIVE);
203    fdevent_update(fde, events);
204    D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events);
205
206    if (fde->state & FDE_PENDING) {
207        // If we are pending, make sure we don't signal an event that is no longer wanted.
208        fde->events &= events;
209        if (fde->events == 0) {
210            g_pending_list.remove(fde);
211            fde->state &= ~FDE_PENDING;
212        }
213    }
214}
215
216void fdevent_add(fdevent* fde, unsigned events) {
217    check_main_thread();
218    fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events);
219}
220
221void fdevent_del(fdevent* fde, unsigned events) {
222    check_main_thread();
223    fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events);
224}
225
226static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
227    std::string result;
228    for (const auto& pollfd : pollfds) {
229        std::string op;
230        if (pollfd.events & POLLIN) {
231            op += "R";
232        }
233        if (pollfd.events & POLLOUT) {
234            op += "W";
235        }
236        android::base::StringAppendF(&result, " %d(%s)", pollfd.fd, op.c_str());
237    }
238    return result;
239}
240
241static void fdevent_process() {
242    std::vector<adb_pollfd> pollfds;
243    for (const auto& pair : g_poll_node_map) {
244        pollfds.push_back(pair.second.pollfd);
245    }
246    CHECK_GT(pollfds.size(), 0u);
247    D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str());
248    int ret = adb_poll(&pollfds[0], pollfds.size(), -1);
249    if (ret == -1) {
250        PLOG(ERROR) << "poll(), ret = " << ret;
251        return;
252    }
253    for (const auto& pollfd : pollfds) {
254        if (pollfd.revents != 0) {
255            D("for fd %d, revents = %x", pollfd.fd, pollfd.revents);
256        }
257        unsigned events = 0;
258        if (pollfd.revents & POLLIN) {
259            events |= FDE_READ;
260        }
261        if (pollfd.revents & POLLOUT) {
262            events |= FDE_WRITE;
263        }
264        if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
265            // We fake a read, as the rest of the code assumes that errors will
266            // be detected at that point.
267            events |= FDE_READ | FDE_ERROR;
268        }
269#if defined(__linux__)
270        if (pollfd.revents & POLLRDHUP) {
271            events |= FDE_READ | FDE_ERROR;
272        }
273#endif
274        if (events != 0) {
275            auto it = g_poll_node_map.find(pollfd.fd);
276            CHECK(it != g_poll_node_map.end());
277            fdevent* fde = it->second.fde;
278            CHECK_EQ(fde->fd, pollfd.fd);
279            fde->events |= events;
280            D("%s got events %x", dump_fde(fde).c_str(), events);
281            fde->state |= FDE_PENDING;
282            g_pending_list.push_back(fde);
283        }
284    }
285}
286
287static void fdevent_call_fdfunc(fdevent* fde) {
288    unsigned events = fde->events;
289    fde->events = 0;
290    CHECK(fde->state & FDE_PENDING);
291    fde->state &= (~FDE_PENDING);
292    D("fdevent_call_fdfunc %s", dump_fde(fde).c_str());
293    fde->func(fde->fd, events, fde->arg);
294}
295
296#if !ADB_HOST
297
298#include <sys/ioctl.h>
299
300static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) {
301    D("subproc handling on fd = %d, ev = %x", fd, ev);
302
303    CHECK_GE(fd, 0);
304
305    if (ev & FDE_READ) {
306        int subproc_fd;
307
308        if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) {
309            LOG(FATAL) << "Failed to read the subproc's fd from " << fd;
310        }
311        auto it = g_poll_node_map.find(subproc_fd);
312        if (it == g_poll_node_map.end()) {
313            D("subproc_fd %d cleared from fd_table", subproc_fd);
314            adb_close(subproc_fd);
315            return;
316        }
317        fdevent* subproc_fde = it->second.fde;
318        if(subproc_fde->fd != subproc_fd) {
319            // Already reallocated?
320            LOG(FATAL) << "subproc_fd(" << subproc_fd << ") != subproc_fde->fd(" << subproc_fde->fd
321                       << ")";
322            return;
323        }
324
325        subproc_fde->force_eof = 1;
326
327        int rcount = 0;
328        ioctl(subproc_fd, FIONREAD, &rcount);
329        D("subproc with fd %d has rcount=%d, err=%d", subproc_fd, rcount, errno);
330        if (rcount != 0) {
331            // If there is data left, it will show up in the select().
332            // This works because there is no other thread reading that
333            // data when in this fd_func().
334            return;
335        }
336
337        D("subproc_fde %s", dump_fde(subproc_fde).c_str());
338        subproc_fde->events |= FDE_READ;
339        if(subproc_fde->state & FDE_PENDING) {
340            return;
341        }
342        subproc_fde->state |= FDE_PENDING;
343        fdevent_call_fdfunc(subproc_fde);
344    }
345}
346
347static void fdevent_subproc_setup() {
348    int s[2];
349
350    if(adb_socketpair(s)) {
351        PLOG(FATAL) << "cannot create shell-exit socket-pair";
352    }
353    D("fdevent_subproc: socket pair (%d, %d)", s[0], s[1]);
354
355    SHELL_EXIT_NOTIFY_FD = s[0];
356    fdevent *fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL);
357    CHECK(fde != nullptr) << "cannot create fdevent for shell-exit handler";
358    fdevent_add(fde, FDE_READ);
359}
360#endif // !ADB_HOST
361
362static void fdevent_run_flush() REQUIRES(run_queue_mutex) {
363    for (auto& f : run_queue) {
364        f();
365    }
366    run_queue.clear();
367}
368
369static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) {
370    CHECK_GE(fd, 0);
371    CHECK(ev & FDE_READ);
372
373    char buf[1024];
374
375    // Empty the fd.
376    if (adb_read(fd, buf, sizeof(buf)) == -1) {
377        PLOG(FATAL) << "failed to empty run queue notify fd";
378    }
379
380    std::lock_guard<std::mutex> lock(run_queue_mutex);
381    fdevent_run_flush();
382}
383
384static void fdevent_run_setup() {
385    std::lock_guard<std::mutex> lock(run_queue_mutex);
386    CHECK(run_queue_notify_fd.get() == -1);
387    int s[2];
388    if (adb_socketpair(s) != 0) {
389        PLOG(FATAL) << "failed to create run queue notify socketpair";
390    }
391
392    run_queue_notify_fd.reset(s[0]);
393    fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr);
394    CHECK(fde != nullptr);
395    fdevent_add(fde, FDE_READ);
396
397    fdevent_run_flush();
398}
399
400void fdevent_run_on_main_thread(std::function<void()> fn) {
401    std::lock_guard<std::mutex> lock(run_queue_mutex);
402    run_queue.push_back(std::move(fn));
403
404    // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
405    // In that case, rely on the setup code to flush the queue without a notification being needed.
406    if (run_queue_notify_fd != -1) {
407        if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) {
408            PLOG(FATAL) << "failed to write to run queue notify fd";
409        }
410    }
411}
412
413void fdevent_loop() {
414    set_main_thread();
415#if !ADB_HOST
416    fdevent_subproc_setup();
417#endif // !ADB_HOST
418    fdevent_run_setup();
419
420    while (true) {
421        if (terminate_loop) {
422            return;
423        }
424
425        D("--- --- waiting for events");
426
427        fdevent_process();
428
429        while (!g_pending_list.empty()) {
430            fdevent* fde = g_pending_list.front();
431            g_pending_list.pop_front();
432            fdevent_call_fdfunc(fde);
433        }
434    }
435}
436
437void fdevent_terminate_loop() {
438    terminate_loop = true;
439}
440
441size_t fdevent_installed_count() {
442    return g_poll_node_map.size();
443}
444
445void fdevent_reset() {
446    g_poll_node_map.clear();
447    g_pending_list.clear();
448
449    std::lock_guard<std::mutex> lock(run_queue_mutex);
450    run_queue_notify_fd.reset();
451    run_queue.clear();
452
453    main_thread_valid = false;
454    terminate_loop = false;
455}
456