1/******************************************************************************
2 *
3 *  Copyright (C) 2014 Google, Inc.
4 *
5 *  Licensed under the Apache License, Version 2.0 (the "License");
6 *  you may not use this file except in compliance with the License.
7 *  You may obtain a copy of the License at:
8 *
9 *  http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 *
17 ******************************************************************************/
18
19#define LOG_TAG "bt_osi_eager_reader"
20
21#include <assert.h>
22#include <errno.h>
23#include <stddef.h>
24#include <string.h>
25#include <sys/eventfd.h>
26
27#include "osi/include/allocator.h"
28#include "osi/include/eager_reader.h"
29#include "osi/include/fixed_queue.h"
30#include "osi/include/osi.h"
31#include "osi/include/log.h"
32#include "osi/include/reactor.h"
33
34#if !defined(EFD_SEMAPHORE)
35#  define EFD_SEMAPHORE (1 << 0)
36#endif
37
38typedef struct {
39  size_t length;
40  size_t offset;
41  uint8_t data[];
42} data_buffer_t;
43
44struct eager_reader_t {
45  int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
46  int inbound_fd;
47
48  const allocator_t *allocator;
49  size_t buffer_size;
50  fixed_queue_t *buffers;
51  data_buffer_t *current_buffer;
52
53  thread_t *inbound_read_thread;
54  reactor_object_t *inbound_read_object;
55
56  reactor_object_t *outbound_registration;
57  eager_reader_cb outbound_read_ready;
58  void *outbound_context;
59};
60
61static bool has_byte(const eager_reader_t *reader);
62static void inbound_data_waiting(void *context);
63static void internal_outbound_read_ready(void *context);
64
65eager_reader_t *eager_reader_new(
66    int fd_to_read,
67    const allocator_t *allocator,
68    size_t buffer_size,
69    size_t max_buffer_count,
70    const char *thread_name) {
71
72  assert(fd_to_read != INVALID_FD);
73  assert(allocator != NULL);
74  assert(buffer_size > 0);
75  assert(max_buffer_count > 0);
76  assert(thread_name != NULL && *thread_name != '\0');
77
78  eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
79  if (!ret) {
80    LOG_ERROR("%s unable to allocate memory for new eager_reader.", __func__);
81    goto error;
82  }
83
84  ret->allocator = allocator;
85  ret->inbound_fd = fd_to_read;
86
87  ret->bytes_available_fd = eventfd(0, 0);
88  if (ret->bytes_available_fd == INVALID_FD) {
89    LOG_ERROR("%s unable to create output reading semaphore.", __func__);
90    goto error;
91  }
92
93  ret->buffer_size = buffer_size;
94
95  ret->buffers = fixed_queue_new(max_buffer_count);
96  if (!ret->buffers) {
97    LOG_ERROR("%s unable to create buffers queue.", __func__);
98    goto error;
99  }
100
101  ret->inbound_read_thread = thread_new(thread_name);
102  if (!ret->inbound_read_thread) {
103    LOG_ERROR("%s unable to make reading thread.", __func__);
104    goto error;
105  }
106
107  ret->inbound_read_object = reactor_register(
108    thread_get_reactor(ret->inbound_read_thread),
109    fd_to_read,
110    ret,
111    inbound_data_waiting,
112    NULL
113  );
114
115  return ret;
116
117error:;
118  eager_reader_free(ret);
119  return NULL;
120}
121
122void eager_reader_free(eager_reader_t *reader) {
123  if (!reader)
124    return;
125
126  eager_reader_unregister(reader);
127
128  // Only unregister from the input if we actually did register
129  if (reader->inbound_read_object)
130    reactor_unregister(reader->inbound_read_object);
131
132  if (reader->bytes_available_fd != INVALID_FD)
133    close(reader->bytes_available_fd);
134
135  // Free the current buffer, because it's not in the queue
136  // and won't be freed below
137  if (reader->current_buffer)
138    reader->allocator->free(reader->current_buffer);
139
140  fixed_queue_free(reader->buffers, reader->allocator->free);
141  thread_free(reader->inbound_read_thread);
142  osi_free(reader);
143}
144
145void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
146  assert(reader != NULL);
147  assert(reactor != NULL);
148  assert(read_cb != NULL);
149
150  // Make sure the reader isn't currently registered.
151  eager_reader_unregister(reader);
152
153  reader->outbound_read_ready = read_cb;
154  reader->outbound_context = context;
155  reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
156}
157
158void eager_reader_unregister(eager_reader_t *reader) {
159  assert(reader != NULL);
160
161  if (reader->outbound_registration) {
162    reactor_unregister(reader->outbound_registration);
163    reader->outbound_registration = NULL;
164  }
165}
166
167// SEE HEADER FOR THREAD SAFETY NOTE
168size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {
169  assert(reader != NULL);
170  assert(buffer != NULL);
171
172  // If the caller wants nonblocking behavior, poll to see if we have
173  // any bytes available before reading.
174  if (!block && !has_byte(reader))
175    return 0;
176
177  // Find out how many bytes we have available in our various buffers.
178  eventfd_t bytes_available;
179  if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
180    LOG_ERROR("%s unable to read semaphore for output data.", __func__);
181    return 0;
182  }
183
184  if (max_size > bytes_available)
185    max_size = bytes_available;
186
187  size_t bytes_consumed = 0;
188  while (bytes_consumed < max_size) {
189    if (!reader->current_buffer)
190      reader->current_buffer = fixed_queue_dequeue(reader->buffers);
191
192    size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
193    if (bytes_to_copy > (max_size - bytes_consumed))
194      bytes_to_copy = max_size - bytes_consumed;
195
196    memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
197    bytes_consumed += bytes_to_copy;
198    reader->current_buffer->offset += bytes_to_copy;
199
200    if (reader->current_buffer->offset >= reader->current_buffer->length) {
201      reader->allocator->free(reader->current_buffer);
202      reader->current_buffer = NULL;
203    }
204  }
205
206  bytes_available -= bytes_consumed;
207  if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
208    LOG_ERROR("%s unable to write back bytes available for output data.", __func__);
209  }
210
211  return bytes_consumed;
212}
213
214thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) {
215  assert(reader != NULL);
216  return reader->inbound_read_thread;
217}
218
219static bool has_byte(const eager_reader_t *reader) {
220  assert(reader != NULL);
221
222  fd_set read_fds;
223  FD_ZERO(&read_fds);
224  FD_SET(reader->bytes_available_fd, &read_fds);
225
226  // Immediate timeout
227  struct timeval timeout;
228  timeout.tv_sec = 0;
229  timeout.tv_usec = 0;
230
231  select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
232  return FD_ISSET(reader->bytes_available_fd, &read_fds);
233}
234
235static void inbound_data_waiting(void *context) {
236  eager_reader_t *reader = (eager_reader_t *)context;
237
238  data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
239  if (!buffer) {
240    LOG_ERROR("%s couldn't aquire memory for inbound data buffer.", __func__);
241    return;
242  }
243
244  buffer->length = 0;
245  buffer->offset = 0;
246
247  int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
248  if (bytes_read > 0) {
249    // Save the data for later
250    buffer->length = bytes_read;
251    fixed_queue_enqueue(reader->buffers, buffer);
252
253    // Tell consumers data is available by incrementing
254    // the semaphore by the number of bytes we just read
255    eventfd_write(reader->bytes_available_fd, bytes_read);
256  } else {
257    if (bytes_read == 0)
258      LOG_WARN("%s fd said bytes existed, but none were found.", __func__);
259    else
260      LOG_WARN("%s unable to read from file descriptor: %s", __func__, strerror(errno));
261
262    reader->allocator->free(buffer);
263  }
264}
265
266static void internal_outbound_read_ready(void *context) {
267  assert(context != NULL);
268
269  eager_reader_t *reader = (eager_reader_t *)context;
270  reader->outbound_read_ready(reader, reader->outbound_context);
271}
272