1#undef G_DISABLE_ASSERT
2#undef G_LOG_DOMAIN
3
4#include <errno.h>
5#include <glib.h>
6#ifdef G_OS_UNIX
7#include <unistd.h>
8#endif
9#include <stdio.h>
10#include <stdlib.h>
11
12#ifdef G_OS_WIN32
13#include <fcntl.h>		/* For _O_BINARY used by pipe() macro */
14#include <io.h>			/* for _pipe() */
15#define pipe(fds) _pipe(fds, 4096, _O_BINARY)
16#endif
17
18#define ITERS 10000
19#define INCREMENT 10
20#define NTHREADS 4
21#define NCRAWLERS 4
22#define CRAWLER_TIMEOUT_RANGE 40
23#define RECURSER_TIMEOUT 50
24
25/* The partial ordering between the context array mutex and
26 * crawler array mutex is that the crawler array mutex cannot
27 * be locked while the context array mutex is locked
28 */
29GPtrArray *context_array;
30GMutex *context_array_mutex;
31GCond *context_array_cond;
32
33GMainLoop *main_loop;
34
35G_LOCK_DEFINE_STATIC (crawler_array_lock);
36GPtrArray *crawler_array;
37
38typedef struct _AddrData AddrData;
39typedef struct _TestData TestData;
40
41struct _AddrData
42{
43  GMainLoop *loop;
44  GIOChannel *dest;
45  gint count;
46};
47
48struct _TestData
49{
50  gint current_val;
51  gint iters;
52  GIOChannel *in;
53};
54
55static void cleanup_crawlers (GMainContext *context);
56
57gboolean
58read_all (GIOChannel *channel, char *buf, gsize len)
59{
60  gsize bytes_read = 0;
61  gsize count;
62  GIOError err;
63
64  while (bytes_read < len)
65    {
66      err = g_io_channel_read (channel, buf + bytes_read, len - bytes_read, &count);
67      if (err)
68	{
69	  if (err != G_IO_ERROR_AGAIN)
70	    return FALSE;
71	}
72      else if (count == 0)
73	return FALSE;
74
75      bytes_read += count;
76    }
77
78  return TRUE;
79}
80
81gboolean
82write_all (GIOChannel *channel, char *buf, gsize len)
83{
84  gsize bytes_written = 0;
85  gsize count;
86  GIOError err;
87
88  while (bytes_written < len)
89    {
90      err = g_io_channel_write (channel, buf + bytes_written, len - bytes_written, &count);
91      if (err && err != G_IO_ERROR_AGAIN)
92	return FALSE;
93
94      bytes_written += count;
95    }
96
97  return TRUE;
98}
99
100gboolean
101adder_callback (GIOChannel   *source,
102		GIOCondition  condition,
103		gpointer      data)
104{
105  char buf1[32];
106  char buf2[32];
107
108  char result[32];
109
110  AddrData *addr_data = data;
111
112  if (!read_all (source, buf1, 32) ||
113      !read_all (source, buf2, 32))
114    {
115      g_main_loop_quit (addr_data->loop);
116      return FALSE;
117    }
118
119  sprintf (result, "%d", atoi(buf1) + atoi(buf2));
120  write_all (addr_data->dest, result, 32);
121
122  return TRUE;
123}
124
125gboolean
126timeout_callback (gpointer data)
127{
128  AddrData *addr_data = data;
129
130  addr_data->count++;
131
132  return TRUE;
133}
134
135gpointer
136adder_thread (gpointer data)
137{
138  GMainContext *context;
139  GSource *adder_source;
140  GSource *timeout_source;
141
142  GIOChannel **channels = data;
143  AddrData addr_data;
144
145  context = g_main_context_new ();
146
147  g_mutex_lock (context_array_mutex);
148
149  g_ptr_array_add (context_array, context);
150
151  if (context_array->len == NTHREADS)
152    g_cond_broadcast (context_array_cond);
153
154  g_mutex_unlock (context_array_mutex);
155
156  addr_data.dest = channels[1];
157  addr_data.loop = g_main_loop_new (context, FALSE);
158  addr_data.count = 0;
159
160  adder_source = g_io_create_watch (channels[0], G_IO_IN | G_IO_HUP);
161  g_source_set_callback (adder_source, (GSourceFunc)adder_callback, &addr_data, NULL);
162  g_source_attach (adder_source, context);
163  g_source_unref (adder_source);
164
165  timeout_source = g_timeout_source_new (10);
166  g_source_set_callback (timeout_source, (GSourceFunc)timeout_callback, &addr_data, NULL);
167  g_source_set_priority (timeout_source, G_PRIORITY_HIGH);
168  g_source_attach (timeout_source, context);
169  g_source_unref (timeout_source);
170
171  g_main_loop_run (addr_data.loop);
172
173  g_io_channel_unref (channels[0]);
174  g_io_channel_unref (channels[1]);
175
176  g_free (channels);
177
178  g_main_loop_unref (addr_data.loop);
179
180#ifdef VERBOSE
181  g_print ("Timeout run %d times\n", addr_data.count);
182#endif
183
184  g_mutex_lock (context_array_mutex);
185  g_ptr_array_remove (context_array, context);
186  if (context_array->len == 0)
187    g_main_loop_quit (main_loop);
188  g_mutex_unlock (context_array_mutex);
189
190  cleanup_crawlers (context);
191
192  return NULL;
193}
194
195void
196io_pipe (GIOChannel **channels)
197{
198  gint fds[2];
199
200  if (pipe(fds) < 0)
201    {
202      g_warning ("Cannot create pipe %s\n", g_strerror (errno));
203      exit (1);
204    }
205
206  channels[0] = g_io_channel_unix_new (fds[0]);
207  channels[1] = g_io_channel_unix_new (fds[1]);
208
209  g_io_channel_set_close_on_unref (channels[0], TRUE);
210  g_io_channel_set_close_on_unref (channels[1], TRUE);
211}
212
213void
214do_add (GIOChannel *in, gint a, gint b)
215{
216  char buf1[32];
217  char buf2[32];
218
219  sprintf (buf1, "%d", a);
220  sprintf (buf2, "%d", b);
221
222  write_all (in, buf1, 32);
223  write_all (in, buf2, 32);
224}
225
226gboolean
227adder_response (GIOChannel   *source,
228		GIOCondition  condition,
229		gpointer      data)
230{
231  char result[32];
232  TestData *test_data = data;
233
234  if (!read_all (source, result, 32))
235    return FALSE;
236
237  test_data->current_val = atoi (result);
238  test_data->iters--;
239
240  if (test_data->iters == 0)
241    {
242      if (test_data->current_val != ITERS * INCREMENT)
243	{
244	  g_print ("Addition failed: %d != %d\n",
245		   test_data->current_val, ITERS * INCREMENT);
246	  exit (1);
247	}
248
249      g_io_channel_unref (source);
250      g_io_channel_unref (test_data->in);
251
252      g_free (test_data);
253
254      return FALSE;
255    }
256
257  do_add (test_data->in, test_data->current_val, INCREMENT);
258
259  return TRUE;
260}
261
262void
263create_adder_thread (void)
264{
265  GError *err = NULL;
266  TestData *test_data;
267
268  GIOChannel *in_channels[2];
269  GIOChannel *out_channels[2];
270
271  GIOChannel **sub_channels;
272
273  sub_channels = g_new (GIOChannel *, 2);
274
275  io_pipe (in_channels);
276  io_pipe (out_channels);
277
278  sub_channels[0] = in_channels[0];
279  sub_channels[1] = out_channels[1];
280
281  g_thread_create (adder_thread, sub_channels, FALSE, &err);
282
283  if (err)
284    {
285      g_warning ("Cannot create thread: %s", err->message);
286      exit (1);
287    }
288
289  test_data = g_new (TestData, 1);
290  test_data->in = in_channels[1];
291  test_data->current_val = 0;
292  test_data->iters = ITERS;
293
294  g_io_add_watch (out_channels[0], G_IO_IN | G_IO_HUP,
295		  adder_response, test_data);
296
297  do_add (test_data->in, test_data->current_val, INCREMENT);
298}
299
300static void create_crawler (void);
301
302static void
303remove_crawler (void)
304{
305  GSource *other_source;
306
307  if (crawler_array->len > 0)
308    {
309      other_source = crawler_array->pdata[g_random_int_range (0, crawler_array->len)];
310      g_source_destroy (other_source);
311      g_assert (g_ptr_array_remove_fast (crawler_array, other_source));
312    }
313}
314
315static gint
316crawler_callback (gpointer data)
317{
318  GSource *source = data;
319
320  G_LOCK (crawler_array_lock);
321
322  if (!g_ptr_array_remove_fast (crawler_array, source))
323    remove_crawler();
324
325  remove_crawler();
326  G_UNLOCK (crawler_array_lock);
327
328  create_crawler();
329  create_crawler();
330
331  return FALSE;
332}
333
334static void
335create_crawler (void)
336{
337  GSource *source = g_timeout_source_new (g_random_int_range (0, CRAWLER_TIMEOUT_RANGE));
338  g_source_set_callback (source, (GSourceFunc)crawler_callback, source, NULL);
339
340  G_LOCK (crawler_array_lock);
341  g_ptr_array_add (crawler_array, source);
342
343  g_mutex_lock (context_array_mutex);
344  g_source_attach (source, context_array->pdata[g_random_int_range (0, context_array->len)]);
345  g_source_unref (source);
346  g_mutex_unlock (context_array_mutex);
347
348  G_UNLOCK (crawler_array_lock);
349}
350
351static void
352cleanup_crawlers (GMainContext *context)
353{
354  gint i;
355
356  G_LOCK (crawler_array_lock);
357  for (i=0; i < crawler_array->len; i++)
358    {
359      if (g_source_get_context (crawler_array->pdata[i]) == context)
360	{
361	  g_source_destroy (g_ptr_array_remove_index (crawler_array, i));
362	  i--;
363	}
364    }
365  G_UNLOCK (crawler_array_lock);
366}
367
368static gboolean
369recurser_idle (gpointer data)
370{
371  GMainContext *context = data;
372  gint i;
373
374  for (i = 0; i < 10; i++)
375    g_main_context_iteration (context, FALSE);
376
377  return FALSE;
378}
379
380static gboolean
381recurser_start (gpointer data)
382{
383  GMainContext *context;
384  GSource *source;
385
386  g_mutex_lock (context_array_mutex);
387  context = context_array->pdata[g_random_int_range (0, context_array->len)];
388  source = g_idle_source_new ();
389  g_source_set_callback (source, recurser_idle, context, NULL);
390  g_source_attach (source, context);
391  g_source_unref (source);
392  g_mutex_unlock (context_array_mutex);
393
394  return TRUE;
395}
396
397int
398main (int   argc,
399      char *argv[])
400{
401  /* Only run the test, if threads are enabled and a default thread
402     implementation is available */
403#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
404  gint i;
405
406  g_thread_init (NULL);
407
408  context_array = g_ptr_array_new ();
409  context_array_mutex = g_mutex_new ();
410  context_array_cond = g_cond_new ();
411
412  crawler_array = g_ptr_array_new ();
413
414  main_loop = g_main_loop_new (NULL, FALSE);
415
416  for (i = 0; i < NTHREADS; i++)
417    create_adder_thread ();
418
419  /* Wait for all threads to start
420   */
421  g_mutex_lock (context_array_mutex);
422
423  if (context_array->len < NTHREADS)
424    g_cond_wait (context_array_cond, context_array_mutex);
425
426  g_mutex_unlock (context_array_mutex);
427
428  for (i = 0; i < NCRAWLERS; i++)
429    create_crawler ();
430
431  g_timeout_add (RECURSER_TIMEOUT, recurser_start, NULL);
432
433  g_main_loop_run (main_loop);
434  g_main_loop_unref (main_loop);
435
436#endif
437  return 0;
438}
439