1/******************************************************************************
2 *
3 *  Copyright (C) 2014 Google, Inc.
4 *
5 *  Licensed under the Apache License, Version 2.0 (the "License");
6 *  you may not use this file except in compliance with the License.
7 *  You may obtain a copy of the License at:
8 *
9 *  http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 *
17 ******************************************************************************/
18
19#define LOG_TAG "bt_osi_reactor"
20
21#include "osi/include/reactor.h"
22
23#include <base/logging.h>
24#include <errno.h>
25#include <pthread.h>
26#include <stdlib.h>
27#include <string.h>
28#include <sys/epoll.h>
29#include <sys/eventfd.h>
30#include <unistd.h>
31
32#include <mutex>
33
34#include "osi/include/allocator.h"
35#include "osi/include/list.h"
36#include "osi/include/log.h"
37
38#if !defined(EFD_SEMAPHORE)
39#define EFD_SEMAPHORE (1 << 0)
40#endif
41
42struct reactor_t {
43  int epoll_fd;
44  int event_fd;
45  std::mutex* list_mutex;
46  list_t* invalidation_list;  // reactor objects that have been unregistered.
47  pthread_t run_thread;       // the pthread on which reactor_run is executing.
48  bool is_running;            // indicates whether |run_thread| is valid.
49  bool object_removed;
50};
51
52struct reactor_object_t {
53  int fd;              // the file descriptor to monitor for events.
54  void* context;       // a context that's passed back to the *_ready functions.
55  reactor_t* reactor;  // the reactor instance this object is registered with.
56  std::mutex* mutex;  // protects the lifetime of this object and all variables.
57
58  void (*read_ready)(void* context);   // function to call when the file
59                                       // descriptor becomes readable.
60  void (*write_ready)(void* context);  // function to call when the file
61                                       // descriptor becomes writeable.
62};
63
64static reactor_status_t run_reactor(reactor_t* reactor, int iterations);
65
66static const size_t MAX_EVENTS = 64;
67static const eventfd_t EVENT_REACTOR_STOP = 1;
68
69reactor_t* reactor_new(void) {
70  reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));
71
72  ret->epoll_fd = INVALID_FD;
73  ret->event_fd = INVALID_FD;
74
75  ret->epoll_fd = epoll_create(MAX_EVENTS);
76  if (ret->epoll_fd == INVALID_FD) {
77    LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__,
78              strerror(errno));
79    goto error;
80  }
81
82  ret->event_fd = eventfd(0, 0);
83  if (ret->event_fd == INVALID_FD) {
84    LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__,
85              strerror(errno));
86    goto error;
87  }
88
89  ret->list_mutex = new std::mutex;
90  ret->invalidation_list = list_new(NULL);
91  if (!ret->invalidation_list) {
92    LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.",
93              __func__);
94    goto error;
95  }
96
97  struct epoll_event event;
98  memset(&event, 0, sizeof(event));
99  event.events = EPOLLIN;
100  event.data.ptr = NULL;
101  if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
102    LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s",
103              __func__, strerror(errno));
104    goto error;
105  }
106
107  return ret;
108
109error:;
110  reactor_free(ret);
111  return NULL;
112}
113
114void reactor_free(reactor_t* reactor) {
115  if (!reactor) return;
116
117  list_free(reactor->invalidation_list);
118  close(reactor->event_fd);
119  close(reactor->epoll_fd);
120  osi_free(reactor);
121}
122
123reactor_status_t reactor_start(reactor_t* reactor) {
124  CHECK(reactor != NULL);
125  return run_reactor(reactor, 0);
126}
127
128reactor_status_t reactor_run_once(reactor_t* reactor) {
129  CHECK(reactor != NULL);
130  return run_reactor(reactor, 1);
131}
132
133void reactor_stop(reactor_t* reactor) {
134  CHECK(reactor != NULL);
135
136  eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
137}
138
139reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
140                                   void (*read_ready)(void* context),
141                                   void (*write_ready)(void* context)) {
142  CHECK(reactor != NULL);
143  CHECK(fd != INVALID_FD);
144
145  reactor_object_t* object =
146      (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));
147
148  object->reactor = reactor;
149  object->fd = fd;
150  object->context = context;
151  object->read_ready = read_ready;
152  object->write_ready = write_ready;
153  object->mutex = new std::mutex;
154
155  struct epoll_event event;
156  memset(&event, 0, sizeof(event));
157  if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
158  if (write_ready) event.events |= EPOLLOUT;
159  event.data.ptr = object;
160
161  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
162    LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__,
163              fd, strerror(errno));
164    delete object->mutex;
165    osi_free(object);
166    return NULL;
167  }
168
169  return object;
170}
171
172bool reactor_change_registration(reactor_object_t* object,
173                                 void (*read_ready)(void* context),
174                                 void (*write_ready)(void* context)) {
175  CHECK(object != NULL);
176
177  struct epoll_event event;
178  memset(&event, 0, sizeof(event));
179  if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
180  if (write_ready) event.events |= EPOLLOUT;
181  event.data.ptr = object;
182
183  if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) ==
184      -1) {
185    LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s",
186              __func__, object->fd, strerror(errno));
187    return false;
188  }
189
190  std::lock_guard<std::mutex> lock(*object->mutex);
191  object->read_ready = read_ready;
192  object->write_ready = write_ready;
193
194  return true;
195}
196
197void reactor_unregister(reactor_object_t* obj) {
198  CHECK(obj != NULL);
199
200  reactor_t* reactor = obj->reactor;
201
202  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
203    LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s",
204              __func__, obj->fd, strerror(errno));
205
206  if (reactor->is_running &&
207      pthread_equal(pthread_self(), reactor->run_thread)) {
208    reactor->object_removed = true;
209    return;
210  }
211
212  {
213    std::unique_lock<std::mutex> lock(*reactor->list_mutex);
214    list_append(reactor->invalidation_list, obj);
215  }
216
217  // Taking the object lock here makes sure a callback for |obj| isn't
218  // currently executing. The reactor thread must then either be before
219  // the callbacks or after. If after, we know that the object won't be
220  // referenced because it has been taken out of the epoll set. If before,
221  // it won't be referenced because the reactor thread will check the
222  // invalidation_list and find it in there. So by taking this lock, we
223  // are waiting until the reactor thread drops all references to |obj|.
224  // One the wait completes, we can unlock and destroy |obj| safely.
225  obj->mutex->lock();
226  obj->mutex->unlock();
227  delete obj->mutex;
228  osi_free(obj);
229}
230
231// Runs the reactor loop for a maximum of |iterations|.
232// 0 |iterations| means loop forever.
233// |reactor| may not be NULL.
234static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
235  CHECK(reactor != NULL);
236
237  reactor->run_thread = pthread_self();
238  reactor->is_running = true;
239
240  struct epoll_event events[MAX_EVENTS];
241  for (int i = 0; iterations == 0 || i < iterations; ++i) {
242    {
243      std::lock_guard<std::mutex> lock(*reactor->list_mutex);
244      list_clear(reactor->invalidation_list);
245    }
246
247    int ret;
248    OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
249    if (ret == -1) {
250      LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__,
251                strerror(errno));
252      reactor->is_running = false;
253      return REACTOR_STATUS_ERROR;
254    }
255
256    for (int j = 0; j < ret; ++j) {
257      // The event file descriptor is the only one that registers with
258      // a NULL data pointer. We use the NULL to identify it and break
259      // out of the reactor loop.
260      if (events[j].data.ptr == NULL) {
261        eventfd_t value;
262        eventfd_read(reactor->event_fd, &value);
263        reactor->is_running = false;
264        return REACTOR_STATUS_STOP;
265      }
266
267      reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;
268
269      std::unique_lock<std::mutex> lock(*reactor->list_mutex);
270      if (list_contains(reactor->invalidation_list, object)) {
271        continue;
272      }
273
274      // Downgrade the list lock to an object lock.
275      {
276        std::lock_guard<std::mutex> obj_lock(*object->mutex);
277        lock.unlock();
278
279        reactor->object_removed = false;
280        if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
281            object->read_ready)
282          object->read_ready(object->context);
283        if (!reactor->object_removed && events[j].events & EPOLLOUT &&
284            object->write_ready)
285          object->write_ready(object->context);
286      }
287
288      if (reactor->object_removed) {
289        delete object->mutex;
290        osi_free(object);
291      }
292    }
293  }
294
295  reactor->is_running = false;
296  return REACTOR_STATUS_DONE;
297}
298