Lines Matching refs:queue

69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70 if (!queue)
73 fixed_queue_unregister_dequeue(queue);
76 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
79 list_free(queue->list);
80 semaphore_free(queue->enqueue_sem);
81 semaphore_free(queue->dequeue_sem);
82 pthread_mutex_destroy(&queue->lock);
83 osi_free(queue);
86 bool fixed_queue_is_empty(fixed_queue_t *queue) {
87 if (queue == NULL)
90 pthread_mutex_lock(&queue->lock);
91 bool is_empty = list_is_empty(queue->list);
92 pthread_mutex_unlock(&queue->lock);
97 size_t fixed_queue_length(fixed_queue_t *queue) {
98 if (queue == NULL)
101 pthread_mutex_lock(&queue->lock);
102 size_t length = list_length(queue->list);
103 pthread_mutex_unlock(&queue->lock);
108 size_t fixed_queue_capacity(fixed_queue_t *queue) {
109 assert(queue != NULL);
111 return queue->capacity;
114 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
115 assert(queue != NULL);
118 semaphore_wait(queue->enqueue_sem);
120 pthread_mutex_lock(&queue->lock);
121 list_append(queue->list, data);
122 pthread_mutex_unlock(&queue->lock);
124 semaphore_post(queue->dequeue_sem);
127 void *fixed_queue_dequeue(fixed_queue_t *queue) {
128 assert(queue != NULL);
130 semaphore_wait(queue->dequeue_sem);
132 pthread_mutex_lock(&queue->lock);
133 void *ret = list_front(queue->list);
134 list_remove(queue->list, ret);
135 pthread_mutex_unlock(&queue->lock);
137 semaphore_post(queue->enqueue_sem);
142 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
143 assert(queue != NULL);
146 if (!semaphore_try_wait(queue->enqueue_sem))
149 pthread_mutex_lock(&queue->lock);
150 list_append(queue->list, data);
151 pthread_mutex_unlock(&queue->lock);
153 semaphore_post(queue->dequeue_sem);
157 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
158 if (queue == NULL)
161 if (!semaphore_try_wait(queue->dequeue_sem))
164 pthread_mutex_lock(&queue->lock);
165 void *ret = list_front(queue->list);
166 list_remove(queue->list, ret);
167 pthread_mutex_unlock(&queue->lock);
169 semaphore_post(queue->enqueue_sem);
174 void *fixed_queue_try_peek_first(fixed_queue_t *queue) {
175 if (queue == NULL)
178 pthread_mutex_lock(&queue->lock);
179 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
180 pthread_mutex_unlock(&queue->lock);
185 void *fixed_queue_try_peek_last(fixed_queue_t *queue) {
186 if (queue == NULL)
189 pthread_mutex_lock(&queue->lock);
190 void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
191 pthread_mutex_unlock(&queue->lock);
196 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) {
197 if (queue == NULL)
201 pthread_mutex_lock(&queue->lock);
202 if (list_contains(queue->list, data) &&
203 semaphore_try_wait(queue->dequeue_sem)) {
204 removed = list_remove(queue->list, data);
207 pthread_mutex_unlock(&queue->lock);
210 semaphore_post(queue->enqueue_sem);
216 list_t *fixed_queue_get_list(fixed_queue_t *queue) {
217 assert(queue != NULL);
221 return queue->list;
225 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
226 assert(queue != NULL);
227 return semaphore_get_fd(queue->dequeue_sem);
230 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
231 assert(queue != NULL);
232 return semaphore_get_fd(queue->enqueue_sem);
235 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
236 assert(queue != NULL);
241 fixed_queue_unregister_dequeue(queue);
243 queue->dequeue_ready = ready_cb;
244 queue->dequeue_context = context;
245 queue->dequeue_object = reactor_register(
247 fixed_queue_get_dequeue_fd(queue),
248 queue,
254 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
255 assert(queue != NULL);
257 if (queue->dequeue_object) {
258 reactor_unregister(queue->dequeue_object);
259 queue->dequeue_object = NULL;
266 fixed_queue_t *queue = context;
267 queue->dequeue_ready(queue, queue->dequeue_context);