1/* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
6
7
8#include <stdio.h>
9#include <string.h>
10#include <stdlib.h>
11#include <unistd.h>
12#include <time.h>
13#include <pthread.h>
14#include <semaphore.h>
15#include <fcntl.h>
16#include "../../config.h"
17
18
19/** gcc versions 4.1.0 and later have support for atomic builtins. */
20
21#ifndef HAVE_BUILTIN_ATOMIC
22#error Sorry, but this test program can only be compiled by a compiler that\
23has built-in functions for atomic memory access.
24#endif
25
26
27#define BUFFER_MAX (2)
28#define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29#define FREE_SEMAPHORE_NAME "cb-free-semaphore"
30
31
32typedef int data_t;
33
34typedef struct {
35  /* Counting semaphore representing the number of data items in the buffer. */
36  sem_t* data;
37  /* Counting semaphore representing the number of free elements. */
38  sem_t* free;
39  /* Position where a new elements should be written. */
40  int in;
41  /* Position from where an element can be removed. */
42  int out;
43  /* Mutex that protects 'in'. */
44  pthread_mutex_t mutex_in;
45  /* Mutex that protects 'out'. */
46  pthread_mutex_t mutex_out;
47  /* Data buffer. */
48  data_t buffer[BUFFER_MAX];
49} buffer_t;
50
51static int quiet = 0;
52static int use_locking = 1;
53
54static __inline__
55int fetch_and_add(int* p, int i)
56{
57  return __sync_fetch_and_add(p, i);
58}
59
60static sem_t* create_semaphore(const char* const name, const int value)
61{
62#ifdef VGO_darwin
63  char name_and_pid[32];
64  snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
65  sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
66  if (p == SEM_FAILED) {
67    perror("sem_open");
68    return NULL;
69  }
70  return p;
71#else
72  sem_t* p = malloc(sizeof(*p));
73  if (p)
74    sem_init(p, 0, value);
75  return p;
76#endif
77}
78
79static void destroy_semaphore(const char* const name, sem_t* p)
80{
81#ifdef VGO_darwin
82  sem_close(p);
83  sem_unlink(name);
84#else
85  sem_destroy(p);
86  free(p);
87#endif
88}
89
90static void buffer_init(buffer_t * b)
91{
92  b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
93  b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
94
95  pthread_mutex_init(&b->mutex_in, NULL);
96  pthread_mutex_init(&b->mutex_out, NULL);
97
98  b->in = 0;
99  b->out = 0;
100}
101
102static void buffer_recv(buffer_t* b, data_t* d)
103{
104  int out;
105  sem_wait(b->data);
106  if (use_locking)
107    pthread_mutex_lock(&b->mutex_out);
108  out = fetch_and_add(&b->out, 1);
109  if (out >= BUFFER_MAX)
110  {
111    fetch_and_add(&b->out, -BUFFER_MAX);
112    out -= BUFFER_MAX;
113  }
114  *d = b->buffer[out];
115  if (use_locking)
116    pthread_mutex_unlock(&b->mutex_out);
117  if (! quiet)
118  {
119    printf("received %d from buffer[%d]\n", *d, out);
120    fflush(stdout);
121  }
122  sem_post(b->free);
123}
124
125static void buffer_send(buffer_t* b, data_t* d)
126{
127  int in;
128  sem_wait(b->free);
129  if (use_locking)
130    pthread_mutex_lock(&b->mutex_in);
131  in = fetch_and_add(&b->in, 1);
132  if (in >= BUFFER_MAX)
133  {
134    fetch_and_add(&b->in, -BUFFER_MAX);
135    in -= BUFFER_MAX;
136  }
137  b->buffer[in] = *d;
138  if (use_locking)
139    pthread_mutex_unlock(&b->mutex_in);
140  if (! quiet)
141  {
142    printf("sent %d to buffer[%d]\n", *d, in);
143    fflush(stdout);
144  }
145  sem_post(b->data);
146}
147
148static void buffer_destroy(buffer_t* b)
149{
150  destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
151  destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
152
153  pthread_mutex_destroy(&b->mutex_in);
154  pthread_mutex_destroy(&b->mutex_out);
155}
156
157static buffer_t b;
158
159static void producer(int* id)
160{
161  buffer_send(&b, id);
162  pthread_exit(NULL);
163}
164
165#define MAXSLEEP (100 * 1000)
166
167static void consumer(int* id)
168{
169  int d;
170  usleep(rand() % MAXSLEEP);
171  buffer_recv(&b, &d);
172  if (! quiet)
173  {
174    printf("%i: %i\n", *id, d);
175    fflush(stdout);
176  }
177  pthread_exit(NULL);
178}
179
180#define THREADS (10)
181
182int main(int argc, char** argv)
183{
184  pthread_t producers[THREADS];
185  pthread_t consumers[THREADS];
186  int thread_arg[THREADS];
187  int i;
188  int optchar;
189
190  while ((optchar = getopt(argc, argv, "nq")) != EOF)
191  {
192    switch (optchar)
193    {
194    case 'n':
195      use_locking = 0;
196      break;
197    case 'q':
198      quiet = 1;
199      break;
200    }
201  }
202
203  srand(time(NULL));
204
205  buffer_init(&b);
206
207  for (i = 0; i < THREADS; ++i)
208  {
209    thread_arg[i] = i;
210    pthread_create(producers + i, NULL,
211                   (void * (*)(void *)) producer, &thread_arg[i]);
212  }
213
214  for (i = 0; i < THREADS; ++i)
215    pthread_create(consumers + i, NULL,
216                   (void * (*)(void *)) consumer, &thread_arg[i]);
217
218  for (i = 0; i < THREADS; ++i)
219  {
220    pthread_join(producers[i], NULL);
221    pthread_join(consumers[i], NULL);
222  }
223
224  buffer_destroy(&b);
225
226  return 0;
227}
228