Lines Matching refs:reader
61 static bool has_byte(const eager_reader_t *reader);
122 void eager_reader_free(eager_reader_t *reader) {
123 if (!reader)
126 eager_reader_unregister(reader);
129 if (reader->inbound_read_object)
130 reactor_unregister(reader->inbound_read_object);
132 if (reader->bytes_available_fd != INVALID_FD)
133 close(reader->bytes_available_fd);
137 if (reader->current_buffer)
138 reader->allocator->free(reader->current_buffer);
140 fixed_queue_free(reader->buffers, reader->allocator->free);
141 thread_free(reader->inbound_read_thread);
142 osi_free(reader);
145 void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
146 assert(reader != NULL);
150 // Make sure the reader isn't currently registered.
151 eager_reader_unregister(reader);
153 reader->outbound_read_ready = read_cb;
154 reader->outbound_context = context;
155 reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
158 void eager_reader_unregister(eager_reader_t *reader) {
159 assert(reader != NULL);
161 if (reader->outbound_registration) {
162 reactor_unregister(reader->outbound_registration);
163 reader->outbound_registration = NULL;
168 size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {
169 assert(reader != NULL);
174 if (!block && !has_byte(reader))
179 if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
189 if (!reader->current_buffer)
190 reader->current_buffer = fixed_queue_dequeue(reader->buffers);
192 size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
196 memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
198 reader->current_buffer->offset += bytes_to_copy;
200 if (reader->current_buffer->offset >= reader->current_buffer->length) {
201 reader->allocator->free(reader->current_buffer);
202 reader->current_buffer = NULL;
207 if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
214 thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) {
215 assert(reader != NULL);
216 return reader->inbound_read_thread;
219 static bool has_byte(const eager_reader_t *reader) {
220 assert(reader != NULL);
224 FD_SET(reader->bytes_available_fd, &read_fds);
231 select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);
232 return FD_ISSET(reader->bytes_available_fd, &read_fds);
236 eager_reader_t *reader = (eager_reader_t *)context;
238 data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
247 int bytes_read = read(reader->inbound_fd, buffer->data, reader->buffer_size);
251 fixed_queue_enqueue(reader->buffers, buffer);
255 eventfd_write(reader->bytes_available_fd, bytes_read);
262 reader->allocator->free(buffer);
269 eager_reader_t *reader = (eager_reader_t *)context;
270 reader->outbound_read_ready(reader, reader->outbound_context);