circular_buffer.c revision e739ac0589b4fb43561f801c4faba8c1b89f8680
1/* Test program that performs producer-consumer style communication through 2 * a circular buffer. This test program is a slightly modified version of the 3 * test program made available by Miguel Ojeda 4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782. 5 */ 6 7 8#include <stdio.h> 9#include <string.h> 10#include <stdlib.h> 11#include <unistd.h> 12#include <time.h> 13#include <pthread.h> 14#include <semaphore.h> 15#include <fcntl.h> 16#include "../../config.h" 17 18 19/** gcc versions 4.1.0 and later have support for atomic builtins. */ 20 21#ifndef HAVE_BUILTIN_ATOMIC 22#error Sorry, but this test program can only be compiled by a compiler that\ 23has built-in functions for atomic memory access. 24#endif 25 26 27#define BUFFER_MAX (2) 28#define DATA_SEMAPHORE_NAME "cb-data-semaphore" 29#define FREE_SEMAPHORE_NAME "cb-free-semaphore" 30 31 32typedef int data_t; 33 34typedef struct { 35 /* Counting semaphore representing the number of data items in the buffer. */ 36 sem_t* data; 37 /* Counting semaphore representing the number of free elements. */ 38 sem_t* free; 39 /* Position where a new elements should be written. */ 40 int in; 41 /* Position from where an element can be removed. */ 42 int out; 43 /* Mutex that protects 'in'. */ 44 pthread_mutex_t mutex_in; 45 /* Mutex that protects 'out'. */ 46 pthread_mutex_t mutex_out; 47 /* Data buffer. */ 48 data_t buffer[BUFFER_MAX]; 49} buffer_t; 50 51static int quiet = 0; 52static int use_locking = 1; 53 54static __inline__ 55int fetch_and_add(int* p, int i) 56{ 57 return __sync_fetch_and_add(p, i); 58} 59 60static sem_t* create_semaphore(const char* const name, const int value) 61{ 62#ifdef __APPLE__ 63 sem_t* p = sem_open(name, O_CREAT, 0600, value); 64 return p; 65#else 66 sem_t* p = malloc(sizeof(*p)); 67 if (p) 68 sem_init(p, 0, value); 69 return p; 70#endif 71} 72 73static void destroy_semaphore(const char* const name, sem_t* p) 74{ 75#ifdef __APPLE__ 76 sem_close(p); 77 sem_unlink(name); 78#else 79 sem_destroy(p); 80 free(p); 81#endif 82} 83 84static void buffer_init(buffer_t * b) 85{ 86 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0); 87 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX); 88 89 pthread_mutex_init(&b->mutex_in, NULL); 90 pthread_mutex_init(&b->mutex_out, NULL); 91 92 b->in = 0; 93 b->out = 0; 94} 95 96static void buffer_recv(buffer_t* b, data_t* d) 97{ 98 int out; 99 sem_wait(b->data); 100 if (use_locking) 101 pthread_mutex_lock(&b->mutex_out); 102 out = fetch_and_add(&b->out, 1); 103 if (out >= BUFFER_MAX) 104 { 105 fetch_and_add(&b->out, -BUFFER_MAX); 106 out -= BUFFER_MAX; 107 } 108 *d = b->buffer[out]; 109 if (use_locking) 110 pthread_mutex_unlock(&b->mutex_out); 111 if (! quiet) 112 { 113 printf("received %d from buffer[%d]\n", *d, out); 114 fflush(stdout); 115 } 116 sem_post(b->free); 117} 118 119static void buffer_send(buffer_t* b, data_t* d) 120{ 121 int in; 122 sem_wait(b->free); 123 if (use_locking) 124 pthread_mutex_lock(&b->mutex_in); 125 in = fetch_and_add(&b->in, 1); 126 if (in >= BUFFER_MAX) 127 { 128 fetch_and_add(&b->in, -BUFFER_MAX); 129 in -= BUFFER_MAX; 130 } 131 b->buffer[in] = *d; 132 if (use_locking) 133 pthread_mutex_unlock(&b->mutex_in); 134 if (! quiet) 135 { 136 printf("sent %d to buffer[%d]\n", *d, in); 137 fflush(stdout); 138 } 139 sem_post(b->data); 140} 141 142static void buffer_destroy(buffer_t* b) 143{ 144 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data); 145 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free); 146 147 pthread_mutex_destroy(&b->mutex_in); 148 pthread_mutex_destroy(&b->mutex_out); 149} 150 151static buffer_t b; 152 153static void producer(int* id) 154{ 155 buffer_send(&b, id); 156 pthread_exit(NULL); 157} 158 159#define MAXSLEEP (100 * 1000) 160 161static void consumer(int* id) 162{ 163 int d; 164 usleep(rand() % MAXSLEEP); 165 buffer_recv(&b, &d); 166 if (! quiet) 167 { 168 printf("%i: %i\n", *id, d); 169 fflush(stdout); 170 } 171 pthread_exit(NULL); 172} 173 174#define THREADS (10) 175 176int main(int argc, char** argv) 177{ 178 pthread_t producers[THREADS]; 179 pthread_t consumers[THREADS]; 180 int thread_arg[THREADS]; 181 int i; 182 int optchar; 183 184 while ((optchar = getopt(argc, argv, "nq")) != EOF) 185 { 186 switch (optchar) 187 { 188 case 'n': 189 use_locking = 0; 190 break; 191 case 'q': 192 quiet = 1; 193 break; 194 } 195 } 196 197 srand(time(NULL)); 198 199 buffer_init(&b); 200 201 for (i = 0; i < THREADS; ++i) 202 { 203 thread_arg[i] = i; 204 pthread_create(producers + i, NULL, 205 (void * (*)(void *)) producer, &thread_arg[i]); 206 } 207 208 for (i = 0; i < THREADS; ++i) 209 pthread_create(consumers + i, NULL, 210 (void * (*)(void *)) consumer, &thread_arg[i]); 211 212 for (i = 0; i < THREADS; ++i) 213 { 214 pthread_join(producers[i], NULL); 215 pthread_join(consumers[i], NULL); 216 } 217 218 buffer_destroy(&b); 219 220 return 0; 221} 222