1/* GLib testing framework examples and tests 2 * Copyright (C) 2008 Red Hat, Inc 3 * 4 * This work is provided "as is"; redistribution and modification 5 * in whole or in part, in any medium, physical or electronic is 6 * permitted without restriction. 7 * 8 * This work is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * In no event shall the authors or contributors be liable for any 13 * direct, indirect, incidental, special, exemplary, or consequential 14 * damages (including, but not limited to, procurement of substitute 15 * goods or services; loss of use, data, or profits; or business 16 * interruption) however caused and on any theory of liability, whether 17 * in contract, strict liability, or tort (including negligence or 18 * otherwise) arising in any way out of the use of this software, even 19 * if advised of the possibility of such damage. 20 */ 21 22#include <glib/glib.h> 23#include <gio/gio.h> 24#include <gio/gunixinputstream.h> 25#include <gio/gunixoutputstream.h> 26#include <signal.h> 27#include <stdlib.h> 28#include <string.h> 29#include <unistd.h> 30 31#define DATA "abcdefghijklmnopqrstuvwxyz" 32 33int writer_pipe[2], reader_pipe[2]; 34GCancellable *writer_cancel, *reader_cancel, *main_cancel; 35GMainLoop *loop; 36 37static gpointer 38writer_thread (gpointer user_data) 39{ 40 GOutputStream *out; 41 gssize nwrote, offset; 42 GError *err = NULL; 43 44 out = g_unix_output_stream_new (writer_pipe[1], TRUE); 45 46 do 47 { 48 g_usleep (10); 49 50 offset = 0; 51 while (offset < sizeof (DATA)) 52 { 53 nwrote = g_output_stream_write (out, DATA + offset, 54 sizeof (DATA) - offset, 55 writer_cancel, &err); 56 if (nwrote <= 0 || err != NULL) 57 break; 58 offset += nwrote; 59 } 60 61 g_assert (nwrote > 0 || err != NULL); 62 } 63 while (err == NULL); 64 65 if (g_cancellable_is_cancelled (writer_cancel)) 66 { 67 g_cancellable_cancel (main_cancel); 68 g_object_unref (out); 69 return NULL; 70 } 71 72 g_warning ("writer: %s", err->message); 73 g_assert_not_reached (); 74} 75 76static gpointer 77reader_thread (gpointer user_data) 78{ 79 GInputStream *in; 80 gssize nread, total; 81 GError *err = NULL; 82 char buf[sizeof (DATA)]; 83 84 in = g_unix_input_stream_new (reader_pipe[0], TRUE); 85 86 do 87 { 88 total = 0; 89 while (total < sizeof (DATA)) 90 { 91 nread = g_input_stream_read (in, buf + total, sizeof (buf) - total, 92 reader_cancel, &err); 93 if (nread <= 0 || err != NULL) 94 break; 95 total += nread; 96 } 97 98 if (err) 99 break; 100 101 if (nread == 0) 102 { 103 g_assert (err == NULL); 104 /* pipe closed */ 105 g_object_unref (in); 106 return NULL; 107 } 108 109 g_assert_cmpstr (buf, ==, DATA); 110 g_assert (!g_cancellable_is_cancelled (reader_cancel)); 111 } 112 while (err == NULL); 113 114 g_warning ("reader: %s", err->message); 115 g_assert_not_reached (); 116} 117 118char main_buf[sizeof (DATA)]; 119gssize main_len, main_offset; 120 121static void readable (GObject *source, GAsyncResult *res, gpointer user_data); 122static void writable (GObject *source, GAsyncResult *res, gpointer user_data); 123 124static void 125do_main_cancel (GOutputStream *out) 126{ 127 g_output_stream_close (out, NULL, NULL); 128 g_main_loop_quit (loop); 129} 130 131static void 132readable (GObject *source, GAsyncResult *res, gpointer user_data) 133{ 134 GInputStream *in = G_INPUT_STREAM (source); 135 GOutputStream *out = user_data; 136 GError *err = NULL; 137 138 main_len = g_input_stream_read_finish (in, res, &err); 139 140 if (g_cancellable_is_cancelled (main_cancel)) 141 { 142 do_main_cancel (out); 143 return; 144 } 145 146 g_assert (err == NULL); 147 148 main_offset = 0; 149 g_output_stream_write_async (out, main_buf, main_len, 150 G_PRIORITY_DEFAULT, main_cancel, 151 writable, in); 152} 153 154static void 155writable (GObject *source, GAsyncResult *res, gpointer user_data) 156{ 157 GOutputStream *out = G_OUTPUT_STREAM (source); 158 GInputStream *in = user_data; 159 GError *err = NULL; 160 gssize nwrote; 161 162 nwrote = g_output_stream_write_finish (out, res, &err); 163 164 if (g_cancellable_is_cancelled (main_cancel)) 165 { 166 do_main_cancel (out); 167 return; 168 } 169 170 g_assert (err == NULL); 171 g_assert_cmpint (nwrote, <=, main_len - main_offset); 172 173 main_offset += nwrote; 174 if (main_offset == main_len) 175 { 176 g_input_stream_read_async (in, main_buf, sizeof (main_buf), 177 G_PRIORITY_DEFAULT, main_cancel, 178 readable, out); 179 } 180 else 181 { 182 g_output_stream_write_async (out, main_buf + main_offset, 183 main_len - main_offset, 184 G_PRIORITY_DEFAULT, main_cancel, 185 writable, in); 186 } 187} 188 189static gboolean 190timeout (gpointer cancellable) 191{ 192 g_cancellable_cancel (cancellable); 193 return FALSE; 194} 195 196static void 197test_pipe_io (void) 198{ 199 GThread *writer, *reader; 200 GInputStream *in; 201 GOutputStream *out; 202 203 /* Split off two (additional) threads, a reader and a writer. From 204 * the writer thread, write data synchronously in small chunks, 205 * which gets read asynchronously by the main thread and then 206 * written asynchronously to the reader thread, which reads it 207 * synchronously. Eventually a timeout in the main thread will cause 208 * it to cancel the writer thread, which will in turn cancel the 209 * read op in the main thread, which will then close the pipe to 210 * the reader thread, causing the read op to fail. 211 */ 212 213 g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0); 214 215 writer_cancel = g_cancellable_new (); 216 reader_cancel = g_cancellable_new (); 217 main_cancel = g_cancellable_new (); 218 219 writer = g_thread_create (writer_thread, NULL, TRUE, NULL); 220 reader = g_thread_create (reader_thread, NULL, TRUE, NULL); 221 222 in = g_unix_input_stream_new (writer_pipe[0], TRUE); 223 out = g_unix_output_stream_new (reader_pipe[1], TRUE); 224 225 g_input_stream_read_async (in, main_buf, sizeof (main_buf), 226 G_PRIORITY_DEFAULT, main_cancel, 227 readable, out); 228 229 g_timeout_add (500, timeout, writer_cancel); 230 231 loop = g_main_loop_new (NULL, TRUE); 232 g_main_loop_run (loop); 233 g_main_loop_unref (loop); 234 235 g_thread_join (reader); 236 g_thread_join (writer); 237 238 g_object_unref (main_cancel); 239 g_object_unref (reader_cancel); 240 g_object_unref (writer_cancel); 241 g_object_unref (in); 242 g_object_unref (out); 243} 244 245int 246main (int argc, 247 char *argv[]) 248{ 249 g_thread_init (NULL); 250 g_type_init (); 251 g_test_init (&argc, &argv, NULL); 252 253 g_test_add_func ("/unix-streams/pipe-io-test", test_pipe_io); 254 255 return g_test_run(); 256} 257