gbufferedoutputstream.c revision a2ca589703273fca80cb126430a8b058aba3eb52
1/* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
19 *
20 * Author: Christian Kellner <gicmo@gnome.org>
21 */
22
23#include <config.h>
24#include "gbufferedoutputstream.h"
25#include "goutputstream.h"
26#include "gsimpleasyncresult.h"
27#include "string.h"
28#include "glibintl.h"
29
30#include <gioalias.h>
31
32/**
33 * SECTION:gbufferedoutputstream
34 * @short_description: Buffered Output Stream
35 * @see_also: #GFilterOutputStream, #GOutputStream
36 *
37 * Buffered output stream implements #GFilterOutputStream and provides
38 * for buffered writes.
39 *
40 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
41 *
42 * To create a buffered output stream, use g_buffered_output_stream_new(), or
43 * g_buffered_output_stream_new_sized() to specify the buffer's size at construction.
44 *
45 * To get the size of a buffer within a buffered input stream, use
46 * g_buffered_output_stream_get_buffer_size(). To change the size of a
47 * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size().
48 * Note: the buffer's size cannot be reduced below the size of the data within the
49 * buffer.
50 *
51 **/
52
53
54
55#define DEFAULT_BUFFER_SIZE 4096
56
57struct _GBufferedOutputStreamPrivate {
58  guint8 *buffer;
59  gsize   len;
60  goffset pos;
61  gboolean auto_grow;
62};
63
64enum {
65  PROP_0,
66  PROP_BUFSIZE
67};
68
69static void     g_buffered_output_stream_set_property (GObject      *object,
70                                                       guint         prop_id,
71                                                       const GValue *value,
72                                                       GParamSpec   *pspec);
73
74static void     g_buffered_output_stream_get_property (GObject    *object,
75                                                       guint       prop_id,
76                                                       GValue     *value,
77                                                       GParamSpec *pspec);
78static void     g_buffered_output_stream_finalize     (GObject *object);
79
80
81static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
82                                                       const void    *buffer,
83                                                       gsize          count,
84                                                       GCancellable  *cancellable,
85                                                       GError       **error);
86static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
87                                                       GCancellable  *cancellable,
88                                                       GError          **error);
89static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
90                                                       GCancellable   *cancellable,
91                                                       GError        **error);
92
93static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
94                                                       const void           *buffer,
95                                                       gsize                 count,
96                                                       int                   io_priority,
97                                                       GCancellable         *cancellable,
98                                                       GAsyncReadyCallback   callback,
99                                                       gpointer              data);
100static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
101                                                       GAsyncResult         *result,
102                                                       GError              **error);
103static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
104                                                       int                   io_priority,
105                                                       GCancellable         *cancellable,
106                                                       GAsyncReadyCallback   callback,
107                                                       gpointer              data);
108static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
109                                                       GAsyncResult         *result,
110                                                       GError              **error);
111static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
112                                                       int                   io_priority,
113                                                       GCancellable         *cancellable,
114                                                       GAsyncReadyCallback   callback,
115                                                       gpointer              data);
116static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
117                                                       GAsyncResult         *result,
118                                                       GError              **error);
119
120G_DEFINE_TYPE (GBufferedOutputStream,
121               g_buffered_output_stream,
122               G_TYPE_FILTER_OUTPUT_STREAM)
123
124
125static void
126g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
127{
128  GObjectClass *object_class;
129  GOutputStreamClass *ostream_class;
130
131  g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
132
133  object_class = G_OBJECT_CLASS (klass);
134  object_class->get_property = g_buffered_output_stream_get_property;
135  object_class->set_property = g_buffered_output_stream_set_property;
136  object_class->finalize     = g_buffered_output_stream_finalize;
137
138  ostream_class = G_OUTPUT_STREAM_CLASS (klass);
139  ostream_class->write = g_buffered_output_stream_write;
140  ostream_class->flush = g_buffered_output_stream_flush;
141  ostream_class->close = g_buffered_output_stream_close;
142  ostream_class->write_async  = g_buffered_output_stream_write_async;
143  ostream_class->write_finish = g_buffered_output_stream_write_finish;
144  ostream_class->flush_async  = g_buffered_output_stream_flush_async;
145  ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
146  ostream_class->close_async  = g_buffered_output_stream_close_async;
147  ostream_class->close_finish = g_buffered_output_stream_close_finish;
148
149  g_object_class_install_property (object_class,
150                                   PROP_BUFSIZE,
151                                   g_param_spec_uint ("buffer-size",
152                                                      P_("Buffer Size"),
153                                                      P_("The size of the backend buffer"),
154                                                      1,
155                                                      G_MAXUINT,
156                                                      DEFAULT_BUFFER_SIZE,
157                                                      G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
158                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
159
160}
161
162/**
163 * g_buffered_output_stream_get_buffer_size:
164 * @stream: a #GBufferedOutputStream.
165 *
166 * Gets the size of the buffer in the @stream.
167 *
168 * Returns: the current size of the buffer.
169 **/
170gsize
171g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
172{
173  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
174
175  return stream->priv->len;
176}
177
178/**
179 * g_buffered_output_stream_set_buffer_size:
180 * @stream: a #GBufferedOutputStream.
181 * @size: a #gsize.
182 *
183 * Sets the size of the internal buffer to @size.
184 **/
185void
186g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
187					 gsize                 size)
188{
189  GBufferedOutputStreamPrivate *priv;
190  guint8 *buffer;
191
192  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
193
194  priv = stream->priv;
195
196  if (priv->buffer)
197    {
198      size = MAX (size, priv->pos);
199
200      buffer = g_malloc (size);
201      memcpy (buffer, priv->buffer, priv->pos);
202      g_free (priv->buffer);
203      priv->buffer = buffer;
204      priv->len = size;
205      /* Keep old pos */
206    }
207  else
208    {
209      priv->buffer = g_malloc (size);
210      priv->len = size;
211      priv->pos = 0;
212    }
213}
214
215/**
216 * g_buffered_output_stream_get_auto_grow:
217 * @stream: a #GBufferedOutputStream.
218 *
219 * Checks if the buffer automatically grows as data is added.
220 *
221 * Returns: %TRUE if the @stream's buffer automatically grows,
222 * %FALSE otherwise.
223 **/
224gboolean
225g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
226{
227  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
228
229  return stream->priv->auto_grow;
230}
231
232/**
233 * g_buffered_output_stream_set_auto_grow:
234 * @stream: a #GBufferedOutputStream.
235 * @auto_grow: a #gboolean.
236 *
237 * Sets whether or not the @stream's buffer should automatically grow.
238 **/
239void
240g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
241				       gboolean              auto_grow)
242{
243  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
244
245  stream->priv->auto_grow = auto_grow;
246}
247
248static void
249g_buffered_output_stream_set_property (GObject         *object,
250                                       guint            prop_id,
251                                       const GValue    *value,
252                                       GParamSpec      *pspec)
253{
254  GBufferedOutputStream *buffered_stream;
255  GBufferedOutputStreamPrivate *priv;
256
257  buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
258  priv = buffered_stream->priv;
259
260  switch (prop_id)
261    {
262
263    case PROP_BUFSIZE:
264      g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value));
265      break;
266
267    default:
268      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
269      break;
270    }
271
272}
273
274static void
275g_buffered_output_stream_get_property (GObject    *object,
276                                       guint       prop_id,
277                                       GValue     *value,
278                                       GParamSpec *pspec)
279{
280  GBufferedOutputStream *buffered_stream;
281  GBufferedOutputStreamPrivate *priv;
282
283  buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
284  priv = buffered_stream->priv;
285
286  switch (prop_id)
287    {
288
289    case PROP_BUFSIZE:
290      g_value_set_uint (value, priv->len);
291      break;
292
293    default:
294      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295      break;
296    }
297
298}
299
300static void
301g_buffered_output_stream_finalize (GObject *object)
302{
303  GBufferedOutputStream *stream;
304  GBufferedOutputStreamPrivate *priv;
305
306  stream = G_BUFFERED_OUTPUT_STREAM (object);
307  priv = stream->priv;
308
309  g_free (priv->buffer);
310
311  if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
312    (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
313}
314
315static void
316g_buffered_output_stream_init (GBufferedOutputStream *stream)
317{
318  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
319                                              G_TYPE_BUFFERED_OUTPUT_STREAM,
320                                              GBufferedOutputStreamPrivate);
321
322}
323
324/**
325 * g_buffered_output_stream_new:
326 * @base_stream: a #GOutputStream.
327 *
328 * Creates a new buffered output stream for a base stream.
329 *
330 * Returns: a #GOutputStream for the given @base_stream.
331 **/
332GOutputStream *
333g_buffered_output_stream_new (GOutputStream *base_stream)
334{
335  GOutputStream *stream;
336
337  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
338
339  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
340                         "base-stream", base_stream,
341                         NULL);
342
343  return stream;
344}
345
346/**
347 * g_buffered_output_stream_new_sized:
348 * @base_stream: a #GOutputStream.
349 * @size: a #gsize.
350 *
351 * Creates a new buffered output stream with a given buffer size.
352 *
353 * Returns: a #GOutputStream with an internal buffer set to @size.
354 **/
355GOutputStream *
356g_buffered_output_stream_new_sized (GOutputStream *base_stream,
357                                    guint          size)
358{
359  GOutputStream *stream;
360
361  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
362
363  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
364                         "base-stream", base_stream,
365                         "buffer-size", size,
366                         NULL);
367
368  return stream;
369}
370
371static gboolean
372flush_buffer (GBufferedOutputStream  *stream,
373              GCancellable           *cancellable,
374              GError                 **error)
375{
376  GBufferedOutputStreamPrivate *priv;
377  GOutputStream                *base_stream;
378  gboolean                      res;
379  gsize                         bytes_written;
380  gsize                         count;
381
382  priv = stream->priv;
383  bytes_written = 0;
384  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
385
386  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
387
388  res = g_output_stream_write_all (base_stream,
389                                   priv->buffer,
390                                   priv->pos,
391                                   &bytes_written,
392                                   cancellable,
393                                   error);
394
395  count = priv->pos - bytes_written;
396
397  if (count > 0)
398    g_memmove (priv->buffer, priv->buffer + bytes_written, count);
399
400  priv->pos -= bytes_written;
401
402  return res;
403}
404
405static gssize
406g_buffered_output_stream_write  (GOutputStream *stream,
407                                 const void    *buffer,
408                                 gsize          count,
409                                 GCancellable  *cancellable,
410                                 GError       **error)
411{
412  GBufferedOutputStream        *bstream;
413  GBufferedOutputStreamPrivate *priv;
414  gboolean res;
415  gsize    n;
416  gsize new_size;
417
418  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
419  priv = bstream->priv;
420
421  n = priv->len - priv->pos;
422
423  if (priv->auto_grow && n < count)
424    {
425      new_size = MAX (priv->len * 2, priv->len + count);
426      g_buffered_output_stream_set_buffer_size (bstream, new_size);
427    }
428  else if (n == 0)
429    {
430      res = flush_buffer (bstream, cancellable, error);
431
432      if (res == FALSE)
433	return -1;
434    }
435
436  n = priv->len - priv->pos;
437
438  count = MIN (count, n);
439  memcpy (priv->buffer + priv->pos, buffer, count);
440  priv->pos += count;
441
442  return count;
443}
444
445static gboolean
446g_buffered_output_stream_flush (GOutputStream  *stream,
447                                GCancellable   *cancellable,
448                                GError        **error)
449{
450  GBufferedOutputStream *bstream;
451  GBufferedOutputStreamPrivate *priv;
452  GOutputStream                *base_stream;
453  gboolean res;
454
455  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
456  priv = bstream->priv;
457  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
458
459  res = flush_buffer (bstream, cancellable, error);
460
461  if (res == FALSE)
462    return FALSE;
463
464  res = g_output_stream_flush (base_stream, cancellable, error);
465
466  return res;
467}
468
469static gboolean
470g_buffered_output_stream_close (GOutputStream  *stream,
471                                GCancellable   *cancellable,
472                                GError        **error)
473{
474  GBufferedOutputStream        *bstream;
475  GBufferedOutputStreamPrivate *priv;
476  GOutputStream                *base_stream;
477  gboolean                      res;
478
479  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
480  priv = bstream->priv;
481  base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
482
483  res = flush_buffer (bstream, cancellable, error);
484
485  /* report the first error but still close the stream */
486  if (res)
487    res = g_output_stream_close (base_stream, cancellable, error);
488  else
489    g_output_stream_close (base_stream, cancellable, NULL);
490
491  return res;
492}
493
494/* ************************** */
495/* Async stuff implementation */
496/* ************************** */
497
498/* TODO: This should be using the base class async ops, not threads */
499
500typedef struct {
501
502  guint flush_stream : 1;
503  guint close_stream : 1;
504
505} FlushData;
506
507static void
508free_flush_data (gpointer data)
509{
510  g_slice_free (FlushData, data);
511}
512
513/* This function is used by all three (i.e.
514 * _write, _flush, _close) functions since
515 * all of them will need to flush the buffer
516 * and so closing and writing is just a special
517 * case of flushing + some addition stuff */
518static void
519flush_buffer_thread (GSimpleAsyncResult *result,
520                     GObject            *object,
521                     GCancellable       *cancellable)
522{
523  GBufferedOutputStream *stream;
524  GOutputStream *base_stream;
525  FlushData     *fdata;
526  gboolean       res;
527  GError        *error = NULL;
528
529  stream = G_BUFFERED_OUTPUT_STREAM (object);
530  fdata = g_simple_async_result_get_op_res_gpointer (result);
531  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
532
533  res = flush_buffer (stream, cancellable, &error);
534
535  /* if flushing the buffer didn't work don't even bother
536   * to flush the stream but just report that error */
537  if (res && fdata->flush_stream)
538    res = g_output_stream_flush (base_stream, cancellable, &error);
539
540  if (fdata->close_stream)
541    {
542
543      /* if flushing the buffer or the stream returned
544       * an error report that first error but still try
545       * close the stream */
546      if (res == FALSE)
547        g_output_stream_close (base_stream, cancellable, NULL);
548      else
549        res = g_output_stream_close (base_stream, cancellable, &error);
550    }
551
552  if (res == FALSE)
553    {
554      g_simple_async_result_set_from_error (result, error);
555      g_error_free (error);
556    }
557}
558
559typedef struct {
560
561  FlushData fdata;
562
563  gsize  count;
564  const void  *buffer;
565
566} WriteData;
567
568static void
569free_write_data (gpointer data)
570{
571  g_slice_free (WriteData, data);
572}
573
574static void
575g_buffered_output_stream_write_async (GOutputStream        *stream,
576                                      const void           *buffer,
577                                      gsize                 count,
578                                      int                   io_priority,
579                                      GCancellable         *cancellable,
580                                      GAsyncReadyCallback   callback,
581                                      gpointer              data)
582{
583  GBufferedOutputStream *buffered_stream;
584  GBufferedOutputStreamPrivate *priv;
585  GSimpleAsyncResult *res;
586  WriteData *wdata;
587
588  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
589  priv = buffered_stream->priv;
590
591  wdata = g_slice_new (WriteData);
592  wdata->count  = count;
593  wdata->buffer = buffer;
594
595  res = g_simple_async_result_new (G_OBJECT (stream),
596                                   callback,
597                                   data,
598                                   g_buffered_output_stream_write_async);
599
600  g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
601
602  /* if we have space left directly call the
603   * callback (from idle) otherwise schedule a buffer
604   * flush in the thread. In both cases the actual
605   * copying of the data to the buffer will be done in
606   * the write_finish () func since that should
607   * be fast enough */
608  if (priv->len - priv->pos > 0)
609    {
610      g_simple_async_result_complete_in_idle (res);
611    }
612  else
613    {
614      wdata->fdata.flush_stream = FALSE;
615      wdata->fdata.close_stream = FALSE;
616      g_simple_async_result_run_in_thread (res,
617                                           flush_buffer_thread,
618                                           io_priority,
619                                           cancellable);
620      g_object_unref (res);
621    }
622}
623
624static gssize
625g_buffered_output_stream_write_finish (GOutputStream        *stream,
626                                       GAsyncResult         *result,
627                                       GError              **error)
628{
629  GBufferedOutputStreamPrivate *priv;
630  GBufferedOutputStream        *buffered_stream;
631  GSimpleAsyncResult *simple;
632  WriteData          *wdata;
633  gssize              count;
634
635  simple = G_SIMPLE_ASYNC_RESULT (result);
636  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
637  priv = buffered_stream->priv;
638
639  g_assert (g_simple_async_result_get_source_tag (simple) ==
640            g_buffered_output_stream_write_async);
641
642  wdata = g_simple_async_result_get_op_res_gpointer (simple);
643
644  /* Now do the real copying of data to the buffer */
645  count = priv->len - priv->pos;
646  count = MIN (wdata->count, count);
647
648  memcpy (priv->buffer + priv->pos, wdata->buffer, count);
649
650  priv->pos += count;
651
652  return count;
653}
654
655static void
656g_buffered_output_stream_flush_async (GOutputStream        *stream,
657                                      int                   io_priority,
658                                      GCancellable         *cancellable,
659                                      GAsyncReadyCallback   callback,
660                                      gpointer              data)
661{
662  GSimpleAsyncResult *res;
663  FlushData          *fdata;
664
665  fdata = g_slice_new (FlushData);
666  fdata->flush_stream = TRUE;
667  fdata->close_stream = FALSE;
668
669  res = g_simple_async_result_new (G_OBJECT (stream),
670                                   callback,
671                                   data,
672                                   g_buffered_output_stream_flush_async);
673
674  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
675
676  g_simple_async_result_run_in_thread (res,
677                                       flush_buffer_thread,
678                                       io_priority,
679                                       cancellable);
680  g_object_unref (res);
681}
682
683static gboolean
684g_buffered_output_stream_flush_finish (GOutputStream        *stream,
685                                       GAsyncResult         *result,
686                                       GError              **error)
687{
688  GSimpleAsyncResult *simple;
689
690  simple = G_SIMPLE_ASYNC_RESULT (result);
691
692  g_assert (g_simple_async_result_get_source_tag (simple) ==
693            g_buffered_output_stream_flush_async);
694
695  return TRUE;
696}
697
698static void
699g_buffered_output_stream_close_async (GOutputStream        *stream,
700                                      int                   io_priority,
701                                      GCancellable         *cancellable,
702                                      GAsyncReadyCallback   callback,
703                                      gpointer              data)
704{
705  GSimpleAsyncResult *res;
706  FlushData          *fdata;
707
708  fdata = g_slice_new (FlushData);
709  fdata->close_stream = TRUE;
710
711  res = g_simple_async_result_new (G_OBJECT (stream),
712                                   callback,
713                                   data,
714                                   g_buffered_output_stream_close_async);
715
716  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
717
718  g_simple_async_result_run_in_thread (res,
719                                       flush_buffer_thread,
720                                       io_priority,
721                                       cancellable);
722  g_object_unref (res);
723}
724
725static gboolean
726g_buffered_output_stream_close_finish (GOutputStream        *stream,
727                                       GAsyncResult         *result,
728                                       GError              **error)
729{
730  GSimpleAsyncResult *simple;
731
732  simple = G_SIMPLE_ASYNC_RESULT (result);
733
734  g_assert (g_simple_async_result_get_source_tag (simple) ==
735            g_buffered_output_stream_flush_async);
736
737  return TRUE;
738}
739
740#define __G_BUFFERED_OUTPUT_STREAM_C__
741#include "gioaliasdef.c"
742