circular_buffer.c revision e739ac0589b4fb43561f801c4faba8c1b89f8680
1/* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
6
7
8#include <stdio.h>
9#include <string.h>
10#include <stdlib.h>
11#include <unistd.h>
12#include <time.h>
13#include <pthread.h>
14#include <semaphore.h>
15#include <fcntl.h>
16#include "../../config.h"
17
18
19/** gcc versions 4.1.0 and later have support for atomic builtins. */
20
21#ifndef HAVE_BUILTIN_ATOMIC
22#error Sorry, but this test program can only be compiled by a compiler that\
23has built-in functions for atomic memory access.
24#endif
25
26
27#define BUFFER_MAX (2)
28#define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29#define FREE_SEMAPHORE_NAME "cb-free-semaphore"
30
31
32typedef int data_t;
33
34typedef struct {
35  /* Counting semaphore representing the number of data items in the buffer. */
36  sem_t* data;
37  /* Counting semaphore representing the number of free elements. */
38  sem_t* free;
39  /* Position where a new elements should be written. */
40  int in;
41  /* Position from where an element can be removed. */
42  int out;
43  /* Mutex that protects 'in'. */
44  pthread_mutex_t mutex_in;
45  /* Mutex that protects 'out'. */
46  pthread_mutex_t mutex_out;
47  /* Data buffer. */
48  data_t buffer[BUFFER_MAX];
49} buffer_t;
50
51static int quiet = 0;
52static int use_locking = 1;
53
54static __inline__
55int fetch_and_add(int* p, int i)
56{
57  return __sync_fetch_and_add(p, i);
58}
59
60static sem_t* create_semaphore(const char* const name, const int value)
61{
62#ifdef __APPLE__
63  sem_t* p = sem_open(name, O_CREAT, 0600, value);
64  return p;
65#else
66  sem_t* p = malloc(sizeof(*p));
67  if (p)
68    sem_init(p, 0, value);
69  return p;
70#endif
71}
72
73static void destroy_semaphore(const char* const name, sem_t* p)
74{
75#ifdef __APPLE__
76  sem_close(p);
77  sem_unlink(name);
78#else
79  sem_destroy(p);
80  free(p);
81#endif
82}
83
84static void buffer_init(buffer_t * b)
85{
86  b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
87  b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
88
89  pthread_mutex_init(&b->mutex_in, NULL);
90  pthread_mutex_init(&b->mutex_out, NULL);
91
92  b->in = 0;
93  b->out = 0;
94}
95
96static void buffer_recv(buffer_t* b, data_t* d)
97{
98  int out;
99  sem_wait(b->data);
100  if (use_locking)
101    pthread_mutex_lock(&b->mutex_out);
102  out = fetch_and_add(&b->out, 1);
103  if (out >= BUFFER_MAX)
104  {
105    fetch_and_add(&b->out, -BUFFER_MAX);
106    out -= BUFFER_MAX;
107  }
108  *d = b->buffer[out];
109  if (use_locking)
110    pthread_mutex_unlock(&b->mutex_out);
111  if (! quiet)
112  {
113    printf("received %d from buffer[%d]\n", *d, out);
114    fflush(stdout);
115  }
116  sem_post(b->free);
117}
118
119static void buffer_send(buffer_t* b, data_t* d)
120{
121  int in;
122  sem_wait(b->free);
123  if (use_locking)
124    pthread_mutex_lock(&b->mutex_in);
125  in = fetch_and_add(&b->in, 1);
126  if (in >= BUFFER_MAX)
127  {
128    fetch_and_add(&b->in, -BUFFER_MAX);
129    in -= BUFFER_MAX;
130  }
131  b->buffer[in] = *d;
132  if (use_locking)
133    pthread_mutex_unlock(&b->mutex_in);
134  if (! quiet)
135  {
136    printf("sent %d to buffer[%d]\n", *d, in);
137    fflush(stdout);
138  }
139  sem_post(b->data);
140}
141
142static void buffer_destroy(buffer_t* b)
143{
144  destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
145  destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
146
147  pthread_mutex_destroy(&b->mutex_in);
148  pthread_mutex_destroy(&b->mutex_out);
149}
150
151static buffer_t b;
152
153static void producer(int* id)
154{
155  buffer_send(&b, id);
156  pthread_exit(NULL);
157}
158
159#define MAXSLEEP (100 * 1000)
160
161static void consumer(int* id)
162{
163  int d;
164  usleep(rand() % MAXSLEEP);
165  buffer_recv(&b, &d);
166  if (! quiet)
167  {
168    printf("%i: %i\n", *id, d);
169    fflush(stdout);
170  }
171  pthread_exit(NULL);
172}
173
174#define THREADS (10)
175
176int main(int argc, char** argv)
177{
178  pthread_t producers[THREADS];
179  pthread_t consumers[THREADS];
180  int thread_arg[THREADS];
181  int i;
182  int optchar;
183
184  while ((optchar = getopt(argc, argv, "nq")) != EOF)
185  {
186    switch (optchar)
187    {
188    case 'n':
189      use_locking = 0;
190      break;
191    case 'q':
192      quiet = 1;
193      break;
194    }
195  }
196
197  srand(time(NULL));
198
199  buffer_init(&b);
200
201  for (i = 0; i < THREADS; ++i)
202  {
203    thread_arg[i] = i;
204    pthread_create(producers + i, NULL,
205                   (void * (*)(void *)) producer, &thread_arg[i]);
206  }
207
208  for (i = 0; i < THREADS; ++i)
209    pthread_create(consumers + i, NULL,
210                   (void * (*)(void *)) consumer, &thread_arg[i]);
211
212  for (i = 0; i < THREADS; ++i)
213  {
214    pthread_join(producers[i], NULL);
215    pthread_join(consumers[i], NULL);
216  }
217
218  buffer_destroy(&b);
219
220  return 0;
221}
222