1/****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19#define LOG_TAG "bt_osi_eager_reader" 20 21#include <assert.h> 22#include <errno.h> 23#include <stddef.h> 24#include <string.h> 25#include <sys/eventfd.h> 26 27#include "osi/include/allocator.h" 28#include "osi/include/eager_reader.h" 29#include "osi/include/fixed_queue.h" 30#include "osi/include/osi.h" 31#include "osi/include/log.h" 32#include "osi/include/reactor.h" 33 34#if !defined(EFD_SEMAPHORE) 35# define EFD_SEMAPHORE (1 << 0) 36#endif 37 38typedef struct { 39 size_t length; 40 size_t offset; 41 uint8_t data[]; 42} data_buffer_t; 43 44struct eager_reader_t { 45 int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes 46 int inbound_fd; 47 48 const allocator_t *allocator; 49 size_t buffer_size; 50 fixed_queue_t *buffers; 51 data_buffer_t *current_buffer; 52 53 thread_t *inbound_read_thread; 54 reactor_object_t *inbound_read_object; 55 56 reactor_object_t *outbound_registration; 57 eager_reader_cb outbound_read_ready; 58 void *outbound_context; 59}; 60 61static bool has_byte(const eager_reader_t *reader); 62static void inbound_data_waiting(void *context); 63static void internal_outbound_read_ready(void *context); 64 65eager_reader_t *eager_reader_new( 66 int fd_to_read, 67 const allocator_t *allocator, 68 size_t buffer_size, 69 size_t max_buffer_count, 70 const char *thread_name) { 71 72 assert(fd_to_read != INVALID_FD); 73 assert(allocator != NULL); 74 assert(buffer_size > 0); 75 assert(max_buffer_count > 0); 76 assert(thread_name != NULL && *thread_name != '\0'); 77 78 eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t)); 79 if (!ret) { 80 LOG_ERROR("%s unable to allocate memory for new eager_reader.", __func__); 81 goto error; 82 } 83 84 ret->allocator = allocator; 85 ret->inbound_fd = fd_to_read; 86 87 ret->bytes_available_fd = eventfd(0, 0); 88 if (ret->bytes_available_fd == INVALID_FD) { 89 LOG_ERROR("%s unable to create output reading semaphore.", __func__); 90 goto error; 91 } 92 93 ret->buffer_size = buffer_size; 94 95 ret->buffers = fixed_queue_new(max_buffer_count); 96 if (!ret->buffers) { 97 LOG_ERROR("%s unable to create buffers queue.", __func__); 98 goto error; 99 } 100 101 ret->inbound_read_thread = thread_new(thread_name); 102 if (!ret->inbound_read_thread) { 103 LOG_ERROR("%s unable to make reading thread.", __func__); 104 goto error; 105 } 106 107 ret->inbound_read_object = reactor_register( 108 thread_get_reactor(ret->inbound_read_thread), 109 fd_to_read, 110 ret, 111 inbound_data_waiting, 112 NULL 113 ); 114 115 return ret; 116 117error:; 118 eager_reader_free(ret); 119 return NULL; 120} 121 122void eager_reader_free(eager_reader_t *reader) { 123 if (!reader) 124 return; 125 126 eager_reader_unregister(reader); 127 128 // Only unregister from the input if we actually did register 129 if (reader->inbound_read_object) 130 reactor_unregister(reader->inbound_read_object); 131 132 if (reader->bytes_available_fd != INVALID_FD) 133 close(reader->bytes_available_fd); 134 135 // Free the current buffer, because it's not in the queue 136 // and won't be freed below 137 if (reader->current_buffer) 138 reader->allocator->free(reader->current_buffer); 139 140 fixed_queue_free(reader->buffers, reader->allocator->free); 141 thread_free(reader->inbound_read_thread); 142 osi_free(reader); 143} 144 145void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) { 146 assert(reader != NULL); 147 assert(reactor != NULL); 148 assert(read_cb != NULL); 149 150 // Make sure the reader isn't currently registered. 151 eager_reader_unregister(reader); 152 153 reader->outbound_read_ready = read_cb; 154 reader->outbound_context = context; 155 reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL); 156} 157 158void eager_reader_unregister(eager_reader_t *reader) { 159 assert(reader != NULL); 160 161 if (reader->outbound_registration) { 162 reactor_unregister(reader->outbound_registration); 163 reader->outbound_registration = NULL; 164 } 165} 166 167// SEE HEADER FOR THREAD SAFETY NOTE 168size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) { 169 assert(reader != NULL); 170 assert(buffer != NULL); 171 172 // If the caller wants nonblocking behavior, poll to see if we have 173 // any bytes available before reading. 174 if (!block && !has_byte(reader)) 175 return 0; 176 177 // Find out how many bytes we have available in our various buffers. 178 eventfd_t bytes_available; 179 if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) { 180 LOG_ERROR("%s unable to read semaphore for output data.", __func__); 181 return 0; 182 } 183 184 if (max_size > bytes_available) 185 max_size = bytes_available; 186 187 size_t bytes_consumed = 0; 188 while (bytes_consumed < max_size) { 189 if (!reader->current_buffer) 190 reader->current_buffer = fixed_queue_dequeue(reader->buffers); 191 192 size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset; 193 if (bytes_to_copy > (max_size - bytes_consumed)) 194 bytes_to_copy = max_size - bytes_consumed; 195 196 memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy); 197 bytes_consumed += bytes_to_copy; 198 reader->current_buffer->offset += bytes_to_copy; 199 200 if (reader->current_buffer->offset >= reader->current_buffer->length) { 201 reader->allocator->free(reader->current_buffer); 202 reader->current_buffer = NULL; 203 } 204 } 205 206 bytes_available -= bytes_consumed; 207 if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) { 208 LOG_ERROR("%s unable to write back bytes available for output data.", __func__); 209 } 210 211 return bytes_consumed; 212} 213 214thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) { 215 assert(reader != NULL); 216 return reader->inbound_read_thread; 217} 218 219static bool has_byte(const eager_reader_t *reader) { 220 assert(reader != NULL); 221 222 fd_set read_fds; 223 FD_ZERO(&read_fds); 224 FD_SET(reader->bytes_available_fd, &read_fds); 225 226 // Immediate timeout 227 struct timeval timeout; 228 timeout.tv_sec = 0; 229 timeout.tv_usec = 0; 230 231 select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout); 232 return FD_ISSET(reader->bytes_available_fd, &read_fds); 233} 234 235static void inbound_data_waiting(void *context) { 236 eager_reader_t *reader = (eager_reader_t *)context; 237 238 data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t)); 239 if (!buffer) { 240 LOG_ERROR("%s couldn't aquire memory for inbound data buffer.", __func__); 241 return; 242 } 243 244 buffer->length = 0; 245 buffer->offset = 0; 246 247 int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size); 248 if (bytes_read > 0) { 249 // Save the data for later 250 buffer->length = bytes_read; 251 fixed_queue_enqueue(reader->buffers, buffer); 252 253 // Tell consumers data is available by incrementing 254 // the semaphore by the number of bytes we just read 255 eventfd_write(reader->bytes_available_fd, bytes_read); 256 } else { 257 if (bytes_read == 0) 258 LOG_WARN("%s fd said bytes existed, but none were found.", __func__); 259 else 260 LOG_WARN("%s unable to read from file descriptor: %s", __func__, strerror(errno)); 261 262 reader->allocator->free(buffer); 263 } 264} 265 266static void internal_outbound_read_ready(void *context) { 267 assert(context != NULL); 268 269 eager_reader_t *reader = (eager_reader_t *)context; 270 reader->outbound_read_ready(reader, reader->outbound_context); 271} 272