1/*
2 * Simple templated message queue implementation that relies on only mutexes for
3 * synchronization (which reduces portability issues).  Given the following
4 * setup:
5 *
6 *   typedef struct mq_msg_s mq_msg_t;
7 *   struct mq_msg_s {
8 *           mq_msg(mq_msg_t) link;
9 *           [message data]
10 *   };
11 *   mq_gen(, mq_, mq_t, mq_msg_t, link)
12 *
13 * The API is as follows:
14 *
15 *   bool mq_init(mq_t *mq);
16 *   void mq_fini(mq_t *mq);
17 *   unsigned mq_count(mq_t *mq);
18 *   mq_msg_t *mq_tryget(mq_t *mq);
19 *   mq_msg_t *mq_get(mq_t *mq);
20 *   void mq_put(mq_t *mq, mq_msg_t *msg);
21 *
22 * The message queue linkage embedded in each message is to be treated as
23 * externally opaque (no need to initialize or clean up externally).  mq_fini()
24 * does not perform any cleanup of messages, since it knows nothing of their
25 * payloads.
26 */
27#define	mq_msg(a_mq_msg_type)	ql_elm(a_mq_msg_type)
28
29#define	mq_gen(a_attr, a_prefix, a_mq_type, a_mq_msg_type, a_field)	\
30typedef struct {							\
31	mtx_t			lock;					\
32	ql_head(a_mq_msg_type)	msgs;					\
33	unsigned		count;					\
34} a_mq_type;								\
35a_attr bool								\
36a_prefix##init(a_mq_type *mq) {						\
37									\
38	if (mtx_init(&mq->lock))					\
39		return (true);						\
40	ql_new(&mq->msgs);						\
41	mq->count = 0;							\
42	return (false);							\
43}									\
44a_attr void								\
45a_prefix##fini(a_mq_type *mq)						\
46{									\
47									\
48	mtx_fini(&mq->lock);						\
49}									\
50a_attr unsigned								\
51a_prefix##count(a_mq_type *mq)						\
52{									\
53	unsigned count;							\
54									\
55	mtx_lock(&mq->lock);						\
56	count = mq->count;						\
57	mtx_unlock(&mq->lock);						\
58	return (count);							\
59}									\
60a_attr a_mq_msg_type *							\
61a_prefix##tryget(a_mq_type *mq)						\
62{									\
63	a_mq_msg_type *msg;						\
64									\
65	mtx_lock(&mq->lock);						\
66	msg = ql_first(&mq->msgs);					\
67	if (msg != NULL) {						\
68		ql_head_remove(&mq->msgs, a_mq_msg_type, a_field);	\
69		mq->count--;						\
70	}								\
71	mtx_unlock(&mq->lock);						\
72	return (msg);							\
73}									\
74a_attr a_mq_msg_type *							\
75a_prefix##get(a_mq_type *mq)						\
76{									\
77	a_mq_msg_type *msg;						\
78	struct timespec timeout;					\
79									\
80	msg = a_prefix##tryget(mq);					\
81	if (msg != NULL)						\
82		return (msg);						\
83									\
84	timeout.tv_sec = 0;						\
85	timeout.tv_nsec = 1;						\
86	while (true) {							\
87		nanosleep(&timeout, NULL);				\
88		msg = a_prefix##tryget(mq);				\
89		if (msg != NULL)					\
90			return (msg);					\
91		if (timeout.tv_sec == 0) {				\
92			/* Double sleep time, up to max 1 second. */	\
93			timeout.tv_nsec <<= 1;				\
94			if (timeout.tv_nsec >= 1000*1000*1000) {	\
95				timeout.tv_sec = 1;			\
96				timeout.tv_nsec = 0;			\
97			}						\
98		}							\
99	}								\
100}									\
101a_attr void								\
102a_prefix##put(a_mq_type *mq, a_mq_msg_type *msg)			\
103{									\
104									\
105	mtx_lock(&mq->lock);						\
106	ql_elm_new(msg, a_field);					\
107	ql_tail_insert(&mq->msgs, msg, a_field);			\
108	mq->count++;							\
109	mtx_unlock(&mq->lock);						\
110}
111