1/* 2 * Simple templated message queue implementation that relies on only mutexes for 3 * synchronization (which reduces portability issues). Given the following 4 * setup: 5 * 6 * typedef struct mq_msg_s mq_msg_t; 7 * struct mq_msg_s { 8 * mq_msg(mq_msg_t) link; 9 * [message data] 10 * }; 11 * mq_gen(, mq_, mq_t, mq_msg_t, link) 12 * 13 * The API is as follows: 14 * 15 * bool mq_init(mq_t *mq); 16 * void mq_fini(mq_t *mq); 17 * unsigned mq_count(mq_t *mq); 18 * mq_msg_t *mq_tryget(mq_t *mq); 19 * mq_msg_t *mq_get(mq_t *mq); 20 * void mq_put(mq_t *mq, mq_msg_t *msg); 21 * 22 * The message queue linkage embedded in each message is to be treated as 23 * externally opaque (no need to initialize or clean up externally). mq_fini() 24 * does not perform any cleanup of messages, since it knows nothing of their 25 * payloads. 26 */ 27#define mq_msg(a_mq_msg_type) ql_elm(a_mq_msg_type) 28 29#define mq_gen(a_attr, a_prefix, a_mq_type, a_mq_msg_type, a_field) \ 30typedef struct { \ 31 mtx_t lock; \ 32 ql_head(a_mq_msg_type) msgs; \ 33 unsigned count; \ 34} a_mq_type; \ 35a_attr bool \ 36a_prefix##init(a_mq_type *mq) { \ 37 \ 38 if (mtx_init(&mq->lock)) \ 39 return (true); \ 40 ql_new(&mq->msgs); \ 41 mq->count = 0; \ 42 return (false); \ 43} \ 44a_attr void \ 45a_prefix##fini(a_mq_type *mq) \ 46{ \ 47 \ 48 mtx_fini(&mq->lock); \ 49} \ 50a_attr unsigned \ 51a_prefix##count(a_mq_type *mq) \ 52{ \ 53 unsigned count; \ 54 \ 55 mtx_lock(&mq->lock); \ 56 count = mq->count; \ 57 mtx_unlock(&mq->lock); \ 58 return (count); \ 59} \ 60a_attr a_mq_msg_type * \ 61a_prefix##tryget(a_mq_type *mq) \ 62{ \ 63 a_mq_msg_type *msg; \ 64 \ 65 mtx_lock(&mq->lock); \ 66 msg = ql_first(&mq->msgs); \ 67 if (msg != NULL) { \ 68 ql_head_remove(&mq->msgs, a_mq_msg_type, a_field); \ 69 mq->count--; \ 70 } \ 71 mtx_unlock(&mq->lock); \ 72 return (msg); \ 73} \ 74a_attr a_mq_msg_type * \ 75a_prefix##get(a_mq_type *mq) \ 76{ \ 77 a_mq_msg_type *msg; \ 78 struct timespec timeout; \ 79 \ 80 msg = a_prefix##tryget(mq); \ 81 if (msg != NULL) \ 82 return (msg); \ 83 \ 84 timeout.tv_sec = 0; \ 85 timeout.tv_nsec = 1; \ 86 while (true) { \ 87 nanosleep(&timeout, NULL); \ 88 msg = a_prefix##tryget(mq); \ 89 if (msg != NULL) \ 90 return (msg); \ 91 if (timeout.tv_sec == 0) { \ 92 /* Double sleep time, up to max 1 second. */ \ 93 timeout.tv_nsec <<= 1; \ 94 if (timeout.tv_nsec >= 1000*1000*1000) { \ 95 timeout.tv_sec = 1; \ 96 timeout.tv_nsec = 0; \ 97 } \ 98 } \ 99 } \ 100} \ 101a_attr void \ 102a_prefix##put(a_mq_type *mq, a_mq_msg_type *msg) \ 103{ \ 104 \ 105 mtx_lock(&mq->lock); \ 106 ql_elm_new(msg, a_field); \ 107 ql_tail_insert(&mq->msgs, msg, a_field); \ 108 mq->count++; \ 109 mtx_unlock(&mq->lock); \ 110} 111