gbufferedoutputstream.c revision 61582bd91cba508362d0e28db4d6e3f307b27b48
1/* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
19 *
20 * Author: Christian Kellner <gicmo@gnome.org>
21 */
22
23#include <config.h>
24#include "gbufferedoutputstream.h"
25#include "goutputstream.h"
26#include "gsimpleasyncresult.h"
27#include "string.h"
28#include "glibintl.h"
29
30/**
31 * SECTION:gbufferedoutputstream
32 * @short_description: Buffered Output Stream
33 * @see_also: #GFilterOutputStream, #GOutputStream
34 *
35 * Buffered output stream implements #GFilterOutputStream and provides
36 * for buffered writes.
37 *
38 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
39 *
40 * To create a buffered output stream, use g_buffered_output_stream_new(), or
41 * g_buffered_output_stream_new_sized() to specify the buffer's size at construction.
42 *
43 * To get the size of a buffer within a buffered input stream, use
44 * g_buffered_output_stream_get_buffer_size(). To change the size of a
45 * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size().
46 * Note: the buffer's size cannot be reduced below the size of the data within the
47 * buffer.
48 *
49 **/
50
51
52
53#define DEFAULT_BUFFER_SIZE 4096
54
55struct _GBufferedOutputStreamPrivate {
56  guint8 *buffer;
57  gsize   len;
58  goffset pos;
59  gboolean auto_grow;
60};
61
62enum {
63  PROP_0,
64  PROP_BUFSIZE
65};
66
67static void     g_buffered_output_stream_set_property (GObject      *object,
68                                                       guint         prop_id,
69                                                       const GValue *value,
70                                                       GParamSpec   *pspec);
71
72static void     g_buffered_output_stream_get_property (GObject    *object,
73                                                       guint       prop_id,
74                                                       GValue     *value,
75                                                       GParamSpec *pspec);
76static void     g_buffered_output_stream_finalize     (GObject *object);
77
78
79static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
80                                                       const void    *buffer,
81                                                       gsize          count,
82                                                       GCancellable  *cancellable,
83                                                       GError       **error);
84static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
85                                                       GCancellable  *cancellable,
86                                                       GError          **error);
87static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
88                                                       GCancellable   *cancellable,
89                                                       GError        **error);
90
91static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
92                                                       const void           *buffer,
93                                                       gsize                 count,
94                                                       int                   io_priority,
95                                                       GCancellable         *cancellable,
96                                                       GAsyncReadyCallback   callback,
97                                                       gpointer              data);
98static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
99                                                       GAsyncResult         *result,
100                                                       GError              **error);
101static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
102                                                       int                   io_priority,
103                                                       GCancellable         *cancellable,
104                                                       GAsyncReadyCallback   callback,
105                                                       gpointer              data);
106static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
107                                                       GAsyncResult         *result,
108                                                       GError              **error);
109static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
110                                                       int                   io_priority,
111                                                       GCancellable         *cancellable,
112                                                       GAsyncReadyCallback   callback,
113                                                       gpointer              data);
114static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
115                                                       GAsyncResult         *result,
116                                                       GError              **error);
117
118G_DEFINE_TYPE (GBufferedOutputStream,
119               g_buffered_output_stream,
120               G_TYPE_FILTER_OUTPUT_STREAM)
121
122
123static void
124g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
125{
126  GObjectClass *object_class;
127  GOutputStreamClass *ostream_class;
128
129  g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
130
131  object_class = G_OBJECT_CLASS (klass);
132  object_class->get_property = g_buffered_output_stream_get_property;
133  object_class->set_property = g_buffered_output_stream_set_property;
134  object_class->finalize     = g_buffered_output_stream_finalize;
135
136  ostream_class = G_OUTPUT_STREAM_CLASS (klass);
137  ostream_class->write = g_buffered_output_stream_write;
138  ostream_class->flush = g_buffered_output_stream_flush;
139  ostream_class->close = g_buffered_output_stream_close;
140  ostream_class->write_async  = g_buffered_output_stream_write_async;
141  ostream_class->write_finish = g_buffered_output_stream_write_finish;
142  ostream_class->flush_async  = g_buffered_output_stream_flush_async;
143  ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
144  ostream_class->close_async  = g_buffered_output_stream_close_async;
145  ostream_class->close_finish = g_buffered_output_stream_close_finish;
146
147  g_object_class_install_property (object_class,
148                                   PROP_BUFSIZE,
149                                   g_param_spec_uint ("buffer-size",
150                                                      P_("Buffer Size"),
151                                                      P_("The size of the backend buffer"),
152                                                      1,
153                                                      G_MAXUINT,
154                                                      DEFAULT_BUFFER_SIZE,
155                                                      G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
156                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
157
158}
159
160/**
161 * g_buffered_output_stream_get_buffer_size:
162 * @stream: a #GBufferedOutputStream.
163 *
164 * Gets the size of the buffer in the @stream.
165 *
166 * Returns: the current size of the buffer.
167 **/
168gsize
169g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
170{
171  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
172
173  return stream->priv->len;
174}
175
176/**
177 * g_buffered_output_stream_set_buffer_size:
178 * @stream: a #GBufferedOutputStream.
179 * @size: a #gsize.
180 *
181 * Sets the size of the internal buffer to @size.
182 **/
183void
184g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
185					 gsize                 size)
186{
187  GBufferedOutputStreamPrivate *priv;
188  guint8 *buffer;
189
190  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
191
192  priv = stream->priv;
193
194  if (priv->buffer)
195    {
196      size = MAX (size, priv->pos);
197
198      buffer = g_malloc (size);
199      memcpy (buffer, priv->buffer, priv->pos);
200      g_free (priv->buffer);
201      priv->buffer = buffer;
202      priv->len = size;
203      /* Keep old pos */
204    }
205  else
206    {
207      priv->buffer = g_malloc (size);
208      priv->len = size;
209      priv->pos = 0;
210    }
211}
212
213/**
214 * g_buffered_output_stream_get_auto_grow:
215 * @stream: a #GBufferedOutputStream.
216 *
217 * Checks if the buffer automatically grows as data is added.
218 *
219 * Returns: %TRUE if the @stream's buffer automatically grows,
220 * %FALSE otherwise.
221 **/
222gboolean
223g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
224{
225  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
226
227  return stream->priv->auto_grow;
228}
229
230/**
231 * g_buffered_output_stream_set_auto_grow:
232 * @stream: a #GBufferedOutputStream.
233 * @auto_grow: a #gboolean.
234 *
235 * Sets whether or not the @stream's buffer should automatically grow.
236 **/
237void
238g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
239				       gboolean              auto_grow)
240{
241  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
242
243  stream->priv->auto_grow = auto_grow;
244}
245
246static void
247g_buffered_output_stream_set_property (GObject         *object,
248                                       guint            prop_id,
249                                       const GValue    *value,
250                                       GParamSpec      *pspec)
251{
252  GBufferedOutputStream *buffered_stream;
253  GBufferedOutputStreamPrivate *priv;
254
255  buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
256  priv = buffered_stream->priv;
257
258  switch (prop_id)
259    {
260
261    case PROP_BUFSIZE:
262      g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value));
263      break;
264
265    default:
266      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
267      break;
268    }
269
270}
271
272static void
273g_buffered_output_stream_get_property (GObject    *object,
274                                       guint       prop_id,
275                                       GValue     *value,
276                                       GParamSpec *pspec)
277{
278  GBufferedOutputStream *buffered_stream;
279  GBufferedOutputStreamPrivate *priv;
280
281  buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
282  priv = buffered_stream->priv;
283
284  switch (prop_id)
285    {
286
287    case PROP_BUFSIZE:
288      g_value_set_uint (value, priv->len);
289      break;
290
291    default:
292      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
293      break;
294    }
295
296}
297
298static void
299g_buffered_output_stream_finalize (GObject *object)
300{
301  GBufferedOutputStream *stream;
302  GBufferedOutputStreamPrivate *priv;
303
304  stream = G_BUFFERED_OUTPUT_STREAM (object);
305  priv = stream->priv;
306
307  g_free (priv->buffer);
308
309  if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
310    (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
311}
312
313static void
314g_buffered_output_stream_init (GBufferedOutputStream *stream)
315{
316  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
317                                              G_TYPE_BUFFERED_OUTPUT_STREAM,
318                                              GBufferedOutputStreamPrivate);
319
320}
321
322/**
323 * g_buffered_output_stream_new:
324 * @base_stream: a #GOutputStream.
325 *
326 * Creates a new buffered output stream for a base stream.
327 *
328 * Returns: a #GOutputStream for the given @base_stream.
329 **/
330GOutputStream *
331g_buffered_output_stream_new (GOutputStream *base_stream)
332{
333  GOutputStream *stream;
334
335  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
336
337  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
338                         "base-stream", base_stream,
339                         NULL);
340
341  return stream;
342}
343
344/**
345 * g_buffered_output_stream_new_sized:
346 * @base_stream: a #GOutputStream.
347 * @size: a #gsize.
348 *
349 * Creates a new buffered output stream with a given buffer size.
350 *
351 * Returns: a #GOutputStream with an internal buffer set to @size.
352 **/
353GOutputStream *
354g_buffered_output_stream_new_sized (GOutputStream *base_stream,
355                                    guint          size)
356{
357  GOutputStream *stream;
358
359  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
360
361  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
362                         "base-stream", base_stream,
363                         "buffer-size", size,
364                         NULL);
365
366  return stream;
367}
368
369static gboolean
370flush_buffer (GBufferedOutputStream  *stream,
371              GCancellable           *cancellable,
372              GError                 **error)
373{
374  GBufferedOutputStreamPrivate *priv;
375  GOutputStream                *base_stream;
376  gboolean                      res;
377  gsize                         bytes_written;
378  gsize                         count;
379
380  priv = stream->priv;
381  bytes_written = 0;
382  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
383
384  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
385
386  res = g_output_stream_write_all (base_stream,
387                                   priv->buffer,
388                                   priv->pos,
389                                   &bytes_written,
390                                   cancellable,
391                                   error);
392
393  count = priv->pos - bytes_written;
394
395  if (count > 0)
396    g_memmove (priv->buffer, priv->buffer + bytes_written, count);
397
398  priv->pos -= bytes_written;
399
400  return res;
401}
402
403static gssize
404g_buffered_output_stream_write  (GOutputStream *stream,
405                                 const void    *buffer,
406                                 gsize          count,
407                                 GCancellable  *cancellable,
408                                 GError       **error)
409{
410  GBufferedOutputStream        *bstream;
411  GBufferedOutputStreamPrivate *priv;
412  gboolean res;
413  gsize    n;
414  gsize new_size;
415
416  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
417  priv = bstream->priv;
418
419  n = priv->len - priv->pos;
420
421  if (priv->auto_grow && n < count)
422    {
423      new_size = MAX (priv->len * 2, priv->len + count);
424      g_buffered_output_stream_set_buffer_size (bstream, new_size);
425    }
426  else if (n == 0)
427    {
428      res = flush_buffer (bstream, cancellable, error);
429
430      if (res == FALSE)
431	return -1;
432    }
433
434  n = priv->len - priv->pos;
435
436  count = MIN (count, n);
437  memcpy (priv->buffer + priv->pos, buffer, count);
438  priv->pos += count;
439
440  return count;
441}
442
443static gboolean
444g_buffered_output_stream_flush (GOutputStream  *stream,
445                                GCancellable   *cancellable,
446                                GError        **error)
447{
448  GBufferedOutputStream *bstream;
449  GBufferedOutputStreamPrivate *priv;
450  GOutputStream                *base_stream;
451  gboolean res;
452
453  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
454  priv = bstream->priv;
455  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
456
457  res = flush_buffer (bstream, cancellable, error);
458
459  if (res == FALSE) {
460    return FALSE;
461  }
462
463  res = g_output_stream_flush (base_stream,
464                               cancellable,
465                               error);
466  return res;
467}
468
469static gboolean
470g_buffered_output_stream_close (GOutputStream  *stream,
471                                GCancellable   *cancellable,
472                                GError        **error)
473{
474  GBufferedOutputStream        *bstream;
475  GBufferedOutputStreamPrivate *priv;
476  GOutputStream                *base_stream;
477  gboolean                      res;
478
479  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
480  priv = bstream->priv;
481  base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
482
483  res = flush_buffer (bstream, cancellable, error);
484
485  /* report the first error but still close the stream */
486  if (res)
487    {
488      res = g_output_stream_close (base_stream,
489                                   cancellable,
490                                   error);
491    }
492  else
493    {
494      g_output_stream_close (base_stream,
495                             cancellable,
496                             NULL);
497    }
498
499  return res;
500}
501
502/* ************************** */
503/* Async stuff implementation */
504/* ************************** */
505
506/* TODO: This should be using the base class async ops, not threads */
507
508typedef struct {
509
510  guint flush_stream : 1;
511  guint close_stream : 1;
512
513} FlushData;
514
515static void
516free_flush_data (gpointer data)
517{
518  g_slice_free (FlushData, data);
519}
520
521/* This function is used by all three (i.e.
522 * _write, _flush, _close) functions since
523 * all of them will need to flush the buffer
524 * and so closing and writing is just a special
525 * case of flushing + some addition stuff */
526static void
527flush_buffer_thread (GSimpleAsyncResult *result,
528                     GObject            *object,
529                     GCancellable       *cancellable)
530{
531  GBufferedOutputStream *stream;
532  GOutputStream *base_stream;
533  FlushData     *fdata;
534  gboolean       res;
535  GError        *error = NULL;
536
537  stream = G_BUFFERED_OUTPUT_STREAM (object);
538  fdata = g_simple_async_result_get_op_res_gpointer (result);
539  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
540
541  res = flush_buffer (stream, cancellable, &error);
542
543  /* if flushing the buffer didn't work don't even bother
544   * to flush the stream but just report that error */
545  if (res && fdata->flush_stream)
546    {
547      res = g_output_stream_flush (base_stream,
548                                   cancellable,
549                                   &error);
550    }
551
552  if (fdata->close_stream)
553    {
554
555      /* if flushing the buffer or the stream returned
556       * an error report that first error but still try
557       * close the stream */
558      if (res == FALSE)
559        {
560          g_output_stream_close (base_stream,
561                                 cancellable,
562                                 NULL);
563        }
564      else
565        {
566          res = g_output_stream_close (base_stream,
567                                       cancellable,
568                                       &error);
569        }
570
571    }
572
573  if (res == FALSE)
574    {
575      g_simple_async_result_set_from_error (result, error);
576      g_error_free (error);
577    }
578}
579
580typedef struct {
581
582  FlushData fdata;
583
584  gsize  count;
585  const void  *buffer;
586
587} WriteData;
588
589static void
590free_write_data (gpointer data)
591{
592  g_slice_free (WriteData, data);
593}
594
595static void
596g_buffered_output_stream_write_async (GOutputStream        *stream,
597                                      const void           *buffer,
598                                      gsize                 count,
599                                      int                   io_priority,
600                                      GCancellable         *cancellable,
601                                      GAsyncReadyCallback   callback,
602                                      gpointer              data)
603{
604  GBufferedOutputStream *buffered_stream;
605  GBufferedOutputStreamPrivate *priv;
606  GSimpleAsyncResult *res;
607  WriteData *wdata;
608
609  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
610  priv = buffered_stream->priv;
611
612  wdata = g_slice_new (WriteData);
613  wdata->count  = count;
614  wdata->buffer = buffer;
615
616  res = g_simple_async_result_new (G_OBJECT (stream),
617                                   callback,
618                                   data,
619                                   g_buffered_output_stream_write_async);
620
621  g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
622
623  /* if we have space left directly call the
624   * callback (from idle) otherwise schedule a buffer
625   * flush in the thread. In both cases the actual
626   * copying of the data to the buffer will be done in
627   * the write_finish () func since that should
628   * be fast enough */
629  if (priv->len - priv->pos > 0)
630    {
631      g_simple_async_result_complete_in_idle (res);
632    }
633  else
634    {
635      wdata->fdata.flush_stream = FALSE;
636      wdata->fdata.close_stream = FALSE;
637      g_simple_async_result_run_in_thread (res,
638                                           flush_buffer_thread,
639                                           io_priority,
640                                           cancellable);
641      g_object_unref (res);
642    }
643}
644
645static gssize
646g_buffered_output_stream_write_finish (GOutputStream        *stream,
647                                       GAsyncResult         *result,
648                                       GError              **error)
649{
650  GBufferedOutputStreamPrivate *priv;
651  GBufferedOutputStream        *buffered_stream;
652  GSimpleAsyncResult *simple;
653  WriteData          *wdata;
654  gssize              count;
655
656  simple = G_SIMPLE_ASYNC_RESULT (result);
657  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
658  priv = buffered_stream->priv;
659
660  g_assert (g_simple_async_result_get_source_tag (simple) ==
661            g_buffered_output_stream_write_async);
662
663  wdata = g_simple_async_result_get_op_res_gpointer (simple);
664
665  /* Now do the real copying of data to the buffer */
666  count = priv->len - priv->pos;
667  count = MIN (wdata->count, count);
668
669  memcpy (priv->buffer + priv->pos, wdata->buffer, count);
670
671  priv->pos += count;
672
673  return count;
674}
675
676static void
677g_buffered_output_stream_flush_async (GOutputStream        *stream,
678                                      int                   io_priority,
679                                      GCancellable         *cancellable,
680                                      GAsyncReadyCallback   callback,
681                                      gpointer              data)
682{
683  GSimpleAsyncResult *res;
684  FlushData          *fdata;
685
686  fdata = g_slice_new (FlushData);
687  fdata->flush_stream = TRUE;
688  fdata->close_stream = FALSE;
689
690  res = g_simple_async_result_new (G_OBJECT (stream),
691                                   callback,
692                                   data,
693                                   g_buffered_output_stream_flush_async);
694
695  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
696
697  g_simple_async_result_run_in_thread (res,
698                                       flush_buffer_thread,
699                                       io_priority,
700                                       cancellable);
701  g_object_unref (res);
702}
703
704static gboolean
705g_buffered_output_stream_flush_finish (GOutputStream        *stream,
706                                       GAsyncResult         *result,
707                                       GError              **error)
708{
709  GSimpleAsyncResult *simple;
710
711  simple = G_SIMPLE_ASYNC_RESULT (result);
712
713  g_assert (g_simple_async_result_get_source_tag (simple) ==
714            g_buffered_output_stream_flush_async);
715
716  return TRUE;
717}
718
719static void
720g_buffered_output_stream_close_async (GOutputStream        *stream,
721                                      int                   io_priority,
722                                      GCancellable         *cancellable,
723                                      GAsyncReadyCallback   callback,
724                                      gpointer              data)
725{
726  GSimpleAsyncResult *res;
727  FlushData          *fdata;
728
729  fdata = g_slice_new (FlushData);
730  fdata->close_stream = TRUE;
731
732  res = g_simple_async_result_new (G_OBJECT (stream),
733                                   callback,
734                                   data,
735                                   g_buffered_output_stream_close_async);
736
737  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
738
739  g_simple_async_result_run_in_thread (res,
740                                       flush_buffer_thread,
741                                       io_priority,
742                                       cancellable);
743  g_object_unref (res);
744}
745
746static gboolean
747g_buffered_output_stream_close_finish (GOutputStream        *stream,
748                                       GAsyncResult         *result,
749                                       GError              **error)
750{
751  GSimpleAsyncResult *simple;
752
753  simple = G_SIMPLE_ASYNC_RESULT (result);
754
755  g_assert (g_simple_async_result_get_source_tag (simple) ==
756            g_buffered_output_stream_flush_async);
757
758  return TRUE;
759}
760
761/* vim: ts=2 sw=2 et */
762