1/****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19#define LOG_TAG "bt_osi_reactor" 20 21#include <assert.h> 22#include <errno.h> 23#include <pthread.h> 24#include <stdlib.h> 25#include <string.h> 26#include <sys/epoll.h> 27#include <sys/eventfd.h> 28 29#include "osi/include/allocator.h" 30#include "osi/include/list.h" 31#include "osi/include/log.h" 32#include "osi/include/reactor.h" 33 34#if !defined(EFD_SEMAPHORE) 35# define EFD_SEMAPHORE (1 << 0) 36#endif 37 38struct reactor_t { 39 int epoll_fd; 40 int event_fd; 41 pthread_mutex_t list_lock; // protects invalidation_list. 42 list_t *invalidation_list; // reactor objects that have been unregistered. 43 pthread_t run_thread; // the pthread on which reactor_run is executing. 44 bool is_running; // indicates whether |run_thread| is valid. 45 bool object_removed; 46}; 47 48struct reactor_object_t { 49 int fd; // the file descriptor to monitor for events. 50 void *context; // a context that's passed back to the *_ready functions. 51 reactor_t *reactor; // the reactor instance this object is registered with. 52 pthread_mutex_t lock; // protects the lifetime of this object and all variables. 53 54 void (*read_ready)(void *context); // function to call when the file descriptor becomes readable. 55 void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable. 56}; 57 58static reactor_status_t run_reactor(reactor_t *reactor, int iterations); 59 60static const size_t MAX_EVENTS = 64; 61static const eventfd_t EVENT_REACTOR_STOP = 1; 62 63reactor_t *reactor_new(void) { 64 reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t)); 65 if (!ret) 66 return NULL; 67 68 ret->epoll_fd = INVALID_FD; 69 ret->event_fd = INVALID_FD; 70 71 ret->epoll_fd = epoll_create(MAX_EVENTS); 72 if (ret->epoll_fd == INVALID_FD) { 73 LOG_ERROR("%s unable to create epoll instance: %s", __func__, strerror(errno)); 74 goto error; 75 } 76 77 ret->event_fd = eventfd(0, 0); 78 if (ret->event_fd == INVALID_FD) { 79 LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno)); 80 goto error; 81 } 82 83 pthread_mutex_init(&ret->list_lock, NULL); 84 ret->invalidation_list = list_new(NULL); 85 if (!ret->invalidation_list) { 86 LOG_ERROR("%s unable to allocate object invalidation list.", __func__); 87 goto error; 88 } 89 90 struct epoll_event event; 91 memset(&event, 0, sizeof(event)); 92 event.events = EPOLLIN; 93 event.data.ptr = NULL; 94 if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) { 95 LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno)); 96 goto error; 97 } 98 99 return ret; 100 101error:; 102 reactor_free(ret); 103 return NULL; 104} 105 106void reactor_free(reactor_t *reactor) { 107 if (!reactor) 108 return; 109 110 list_free(reactor->invalidation_list); 111 close(reactor->event_fd); 112 close(reactor->epoll_fd); 113 osi_free(reactor); 114} 115 116reactor_status_t reactor_start(reactor_t *reactor) { 117 assert(reactor != NULL); 118 return run_reactor(reactor, 0); 119} 120 121reactor_status_t reactor_run_once(reactor_t *reactor) { 122 assert(reactor != NULL); 123 return run_reactor(reactor, 1); 124} 125 126void reactor_stop(reactor_t *reactor) { 127 assert(reactor != NULL); 128 129 eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); 130} 131 132reactor_object_t *reactor_register(reactor_t *reactor, 133 int fd, void *context, 134 void (*read_ready)(void *context), 135 void (*write_ready)(void *context)) { 136 assert(reactor != NULL); 137 assert(fd != INVALID_FD); 138 139 reactor_object_t *object = (reactor_object_t *)osi_calloc(sizeof(reactor_object_t)); 140 if (!object) { 141 LOG_ERROR("%s unable to allocate reactor object: %s", __func__, strerror(errno)); 142 return NULL; 143 } 144 145 object->reactor = reactor; 146 object->fd = fd; 147 object->context = context; 148 object->read_ready = read_ready; 149 object->write_ready = write_ready; 150 pthread_mutex_init(&object->lock, NULL); 151 152 struct epoll_event event; 153 memset(&event, 0, sizeof(event)); 154 if (read_ready) 155 event.events |= (EPOLLIN | EPOLLRDHUP); 156 if (write_ready) 157 event.events |= EPOLLOUT; 158 event.data.ptr = object; 159 160 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { 161 LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno)); 162 pthread_mutex_destroy(&object->lock); 163 osi_free(object); 164 return NULL; 165 } 166 167 return object; 168} 169 170bool reactor_change_registration(reactor_object_t *object, 171 void (*read_ready)(void *context), 172 void (*write_ready)(void *context)) { 173 assert(object != NULL); 174 175 struct epoll_event event; 176 memset(&event, 0, sizeof(event)); 177 if (read_ready) 178 event.events |= (EPOLLIN | EPOLLRDHUP); 179 if (write_ready) 180 event.events |= EPOLLOUT; 181 event.data.ptr = object; 182 183 if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) { 184 LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno)); 185 return false; 186 } 187 188 pthread_mutex_lock(&object->lock); 189 object->read_ready = read_ready; 190 object->write_ready = write_ready; 191 pthread_mutex_unlock(&object->lock); 192 193 return true; 194} 195 196void reactor_unregister(reactor_object_t *obj) { 197 assert(obj != NULL); 198 199 reactor_t *reactor = obj->reactor; 200 201 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) 202 LOG_ERROR("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno)); 203 204 if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) { 205 reactor->object_removed = true; 206 return; 207 } 208 209 pthread_mutex_lock(&reactor->list_lock); 210 list_append(reactor->invalidation_list, obj); 211 pthread_mutex_unlock(&reactor->list_lock); 212 213 // Taking the object lock here makes sure a callback for |obj| isn't 214 // currently executing. The reactor thread must then either be before 215 // the callbacks or after. If after, we know that the object won't be 216 // referenced because it has been taken out of the epoll set. If before, 217 // it won't be referenced because the reactor thread will check the 218 // invalidation_list and find it in there. So by taking this lock, we 219 // are waiting until the reactor thread drops all references to |obj|. 220 // One the wait completes, we can unlock and destroy |obj| safely. 221 pthread_mutex_lock(&obj->lock); 222 pthread_mutex_unlock(&obj->lock); 223 pthread_mutex_destroy(&obj->lock); 224 osi_free(obj); 225} 226 227// Runs the reactor loop for a maximum of |iterations|. 228// 0 |iterations| means loop forever. 229// |reactor| may not be NULL. 230static reactor_status_t run_reactor(reactor_t *reactor, int iterations) { 231 assert(reactor != NULL); 232 233 reactor->run_thread = pthread_self(); 234 reactor->is_running = true; 235 236 struct epoll_event events[MAX_EVENTS]; 237 for (int i = 0; iterations == 0 || i < iterations; ++i) { 238 pthread_mutex_lock(&reactor->list_lock); 239 list_clear(reactor->invalidation_list); 240 pthread_mutex_unlock(&reactor->list_lock); 241 242 int ret; 243 do { 244 ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1); 245 } while (ret == -1 && errno == EINTR); 246 247 if (ret == -1) { 248 LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno)); 249 reactor->is_running = false; 250 return REACTOR_STATUS_ERROR; 251 } 252 253 for (int j = 0; j < ret; ++j) { 254 // The event file descriptor is the only one that registers with 255 // a NULL data pointer. We use the NULL to identify it and break 256 // out of the reactor loop. 257 if (events[j].data.ptr == NULL) { 258 eventfd_t value; 259 eventfd_read(reactor->event_fd, &value); 260 reactor->is_running = false; 261 return REACTOR_STATUS_STOP; 262 } 263 264 reactor_object_t *object = (reactor_object_t *)events[j].data.ptr; 265 266 pthread_mutex_lock(&reactor->list_lock); 267 if (list_contains(reactor->invalidation_list, object)) { 268 pthread_mutex_unlock(&reactor->list_lock); 269 continue; 270 } 271 272 // Downgrade the list lock to an object lock. 273 pthread_mutex_lock(&object->lock); 274 pthread_mutex_unlock(&reactor->list_lock); 275 276 reactor->object_removed = false; 277 if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready) 278 object->read_ready(object->context); 279 if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready) 280 object->write_ready(object->context); 281 pthread_mutex_unlock(&object->lock); 282 283 if (reactor->object_removed) { 284 pthread_mutex_destroy(&object->lock); 285 osi_free(object); 286 } 287 } 288 } 289 290 reactor->is_running = false; 291 return REACTOR_STATUS_DONE; 292} 293