1/******************************************************************************
2 *
3 *  Copyright (C) 2014 Google, Inc.
4 *
5 *  Licensed under the Apache License, Version 2.0 (the "License");
6 *  you may not use this file except in compliance with the License.
7 *  You may obtain a copy of the License at:
8 *
9 *  http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 *
17 ******************************************************************************/
18
19#define LOG_TAG "bt_osi_reactor"
20
21#include "osi/include/reactor.h"
22
23#include <assert.h>
24#include <errno.h>
25#include <pthread.h>
26#include <stdlib.h>
27#include <string.h>
28#include <sys/epoll.h>
29#include <sys/eventfd.h>
30#include <unistd.h>
31
32#include "osi/include/allocator.h"
33#include "osi/include/list.h"
34#include "osi/include/log.h"
35
36#if !defined(EFD_SEMAPHORE)
37#  define EFD_SEMAPHORE (1 << 0)
38#endif
39
40struct reactor_t {
41  int epoll_fd;
42  int event_fd;
43  pthread_mutex_t list_lock;  // protects invalidation_list.
44  list_t *invalidation_list;  // reactor objects that have been unregistered.
45  pthread_t run_thread;       // the pthread on which reactor_run is executing.
46  bool is_running;            // indicates whether |run_thread| is valid.
47  bool object_removed;
48};
49
50struct reactor_object_t {
51  int fd;                              // the file descriptor to monitor for events.
52  void *context;                       // a context that's passed back to the *_ready functions.
53  reactor_t *reactor;                  // the reactor instance this object is registered with.
54  pthread_mutex_t lock;                // protects the lifetime of this object and all variables.
55
56  void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
57  void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
58};
59
60static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
61
62static const size_t MAX_EVENTS = 64;
63static const eventfd_t EVENT_REACTOR_STOP = 1;
64
65reactor_t *reactor_new(void) {
66  reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));
67
68  ret->epoll_fd = INVALID_FD;
69  ret->event_fd = INVALID_FD;
70
71  ret->epoll_fd = epoll_create(MAX_EVENTS);
72  if (ret->epoll_fd == INVALID_FD) {
73    LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__, strerror(errno));
74    goto error;
75  }
76
77  ret->event_fd = eventfd(0, 0);
78  if (ret->event_fd == INVALID_FD) {
79    LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__, strerror(errno));
80    goto error;
81  }
82
83  pthread_mutex_init(&ret->list_lock, NULL);
84  ret->invalidation_list = list_new(NULL);
85  if (!ret->invalidation_list) {
86    LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.", __func__);
87    goto error;
88  }
89
90  struct epoll_event event;
91  memset(&event, 0, sizeof(event));
92  event.events = EPOLLIN;
93  event.data.ptr = NULL;
94  if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
95    LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
96    goto error;
97  }
98
99  return ret;
100
101error:;
102  reactor_free(ret);
103  return NULL;
104}
105
106void reactor_free(reactor_t *reactor) {
107  if (!reactor)
108    return;
109
110  list_free(reactor->invalidation_list);
111  close(reactor->event_fd);
112  close(reactor->epoll_fd);
113  osi_free(reactor);
114}
115
116reactor_status_t reactor_start(reactor_t *reactor) {
117  assert(reactor != NULL);
118  return run_reactor(reactor, 0);
119}
120
121reactor_status_t reactor_run_once(reactor_t *reactor) {
122  assert(reactor != NULL);
123  return run_reactor(reactor, 1);
124}
125
126void reactor_stop(reactor_t *reactor) {
127  assert(reactor != NULL);
128
129  eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
130}
131
132reactor_object_t *reactor_register(reactor_t *reactor,
133    int fd, void *context,
134    void (*read_ready)(void *context),
135    void (*write_ready)(void *context)) {
136  assert(reactor != NULL);
137  assert(fd != INVALID_FD);
138
139  reactor_object_t *object =
140      (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
141
142  object->reactor = reactor;
143  object->fd = fd;
144  object->context = context;
145  object->read_ready = read_ready;
146  object->write_ready = write_ready;
147  pthread_mutex_init(&object->lock, NULL);
148
149  struct epoll_event event;
150  memset(&event, 0, sizeof(event));
151  if (read_ready)
152    event.events |= (EPOLLIN | EPOLLRDHUP);
153  if (write_ready)
154    event.events |= EPOLLOUT;
155  event.data.ptr = object;
156
157  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
158    LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
159    pthread_mutex_destroy(&object->lock);
160    osi_free(object);
161    return NULL;
162  }
163
164  return object;
165}
166
167bool reactor_change_registration(reactor_object_t *object,
168    void (*read_ready)(void *context),
169    void (*write_ready)(void *context)) {
170  assert(object != NULL);
171
172  struct epoll_event event;
173  memset(&event, 0, sizeof(event));
174  if (read_ready)
175    event.events |= (EPOLLIN | EPOLLRDHUP);
176  if (write_ready)
177    event.events |= EPOLLOUT;
178  event.data.ptr = object;
179
180  if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
181    LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
182    return false;
183  }
184
185  pthread_mutex_lock(&object->lock);
186  object->read_ready = read_ready;
187  object->write_ready = write_ready;
188  pthread_mutex_unlock(&object->lock);
189
190  return true;
191}
192
193void reactor_unregister(reactor_object_t *obj) {
194  assert(obj != NULL);
195
196  reactor_t *reactor = obj->reactor;
197
198  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
199    LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
200
201  if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
202    reactor->object_removed = true;
203    return;
204  }
205
206  pthread_mutex_lock(&reactor->list_lock);
207  list_append(reactor->invalidation_list, obj);
208  pthread_mutex_unlock(&reactor->list_lock);
209
210  // Taking the object lock here makes sure a callback for |obj| isn't
211  // currently executing. The reactor thread must then either be before
212  // the callbacks or after. If after, we know that the object won't be
213  // referenced because it has been taken out of the epoll set. If before,
214  // it won't be referenced because the reactor thread will check the
215  // invalidation_list and find it in there. So by taking this lock, we
216  // are waiting until the reactor thread drops all references to |obj|.
217  // One the wait completes, we can unlock and destroy |obj| safely.
218  pthread_mutex_lock(&obj->lock);
219  pthread_mutex_unlock(&obj->lock);
220  pthread_mutex_destroy(&obj->lock);
221  osi_free(obj);
222}
223
224// Runs the reactor loop for a maximum of |iterations|.
225// 0 |iterations| means loop forever.
226// |reactor| may not be NULL.
227static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
228  assert(reactor != NULL);
229
230  reactor->run_thread = pthread_self();
231  reactor->is_running = true;
232
233  struct epoll_event events[MAX_EVENTS];
234  for (int i = 0; iterations == 0 || i < iterations; ++i) {
235    pthread_mutex_lock(&reactor->list_lock);
236    list_clear(reactor->invalidation_list);
237    pthread_mutex_unlock(&reactor->list_lock);
238
239    int ret;
240    OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
241    if (ret == -1) {
242      LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__, strerror(errno));
243      reactor->is_running = false;
244      return REACTOR_STATUS_ERROR;
245    }
246
247    for (int j = 0; j < ret; ++j) {
248      // The event file descriptor is the only one that registers with
249      // a NULL data pointer. We use the NULL to identify it and break
250      // out of the reactor loop.
251      if (events[j].data.ptr == NULL) {
252        eventfd_t value;
253        eventfd_read(reactor->event_fd, &value);
254        reactor->is_running = false;
255        return REACTOR_STATUS_STOP;
256      }
257
258      reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
259
260      pthread_mutex_lock(&reactor->list_lock);
261      if (list_contains(reactor->invalidation_list, object)) {
262        pthread_mutex_unlock(&reactor->list_lock);
263        continue;
264      }
265
266      // Downgrade the list lock to an object lock.
267      pthread_mutex_lock(&object->lock);
268      pthread_mutex_unlock(&reactor->list_lock);
269
270      reactor->object_removed = false;
271      if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
272        object->read_ready(object->context);
273      if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
274        object->write_ready(object->context);
275      pthread_mutex_unlock(&object->lock);
276
277      if (reactor->object_removed) {
278        pthread_mutex_destroy(&object->lock);
279        osi_free(object);
280      }
281    }
282  }
283
284  reactor->is_running = false;
285  return REACTOR_STATUS_DONE;
286}
287