1/******************************************************************************
2 *
3 *  Copyright (C) 2014 Google, Inc.
4 *
5 *  Licensed under the Apache License, Version 2.0 (the "License");
6 *  you may not use this file except in compliance with the License.
7 *  You may obtain a copy of the License at:
8 *
9 *  http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 *
17 ******************************************************************************/
18
19#include <assert.h>
20#include <pthread.h>
21#include <stdlib.h>
22
23#include "fixed_queue.h"
24#include "list.h"
25#include "osi.h"
26#include "semaphore.h"
27
28typedef struct fixed_queue_t {
29  list_t *list;
30  semaphore_t *enqueue_sem;
31  semaphore_t *dequeue_sem;
32  pthread_mutex_t lock;
33  size_t capacity;
34} fixed_queue_t;
35
36fixed_queue_t *fixed_queue_new(size_t capacity) {
37  fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
38  if (!ret)
39    goto error;
40
41  ret->list = list_new(NULL);
42  if (!ret->list)
43    goto error;
44
45  ret->enqueue_sem = semaphore_new(capacity);
46  if (!ret->enqueue_sem)
47    goto error;
48
49  ret->dequeue_sem = semaphore_new(0);
50  if (!ret->dequeue_sem)
51    goto error;
52
53  pthread_mutex_init(&ret->lock, NULL);
54  ret->capacity = capacity;
55
56  return ret;
57
58error:;
59  if (ret) {
60    list_free(ret->list);
61    semaphore_free(ret->enqueue_sem);
62    semaphore_free(ret->dequeue_sem);
63  }
64
65  free(ret);
66  return NULL;
67}
68
69void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70  if (!queue)
71    return;
72
73  if (free_cb)
74    for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
75      free_cb(list_node(node));
76
77  list_free(queue->list);
78  semaphore_free(queue->enqueue_sem);
79  semaphore_free(queue->dequeue_sem);
80  pthread_mutex_destroy(&queue->lock);
81  free(queue);
82}
83
84void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
85  assert(queue != NULL);
86  assert(data != NULL);
87
88  semaphore_wait(queue->enqueue_sem);
89
90  pthread_mutex_lock(&queue->lock);
91  list_append(queue->list, data);
92  pthread_mutex_unlock(&queue->lock);
93
94  semaphore_post(queue->dequeue_sem);
95}
96
97void *fixed_queue_dequeue(fixed_queue_t *queue) {
98  assert(queue != NULL);
99
100  semaphore_wait(queue->dequeue_sem);
101
102  pthread_mutex_lock(&queue->lock);
103  void *ret = list_front(queue->list);
104  list_remove(queue->list, ret);
105  pthread_mutex_unlock(&queue->lock);
106
107  semaphore_post(queue->enqueue_sem);
108
109  return ret;
110}
111
112bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
113  assert(queue != NULL);
114  assert(data != NULL);
115
116  if (!semaphore_try_wait(queue->enqueue_sem))
117    return false;
118
119  pthread_mutex_lock(&queue->lock);
120  list_append(queue->list, data);
121  pthread_mutex_unlock(&queue->lock);
122
123  semaphore_post(queue->dequeue_sem);
124  return true;
125}
126
127void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
128  assert(queue != NULL);
129
130  if (!semaphore_try_wait(queue->dequeue_sem))
131    return NULL;
132
133  pthread_mutex_lock(&queue->lock);
134  void *ret = list_front(queue->list);
135  list_remove(queue->list, ret);
136  pthread_mutex_unlock(&queue->lock);
137
138  semaphore_post(queue->enqueue_sem);
139
140  return ret;
141}
142
143int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
144  assert(queue != NULL);
145  return semaphore_get_fd(queue->dequeue_sem);
146}
147
148int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
149  assert(queue != NULL);
150  return semaphore_get_fd(queue->enqueue_sem);
151}
152