gbufferedoutputstream.c revision a2ca589703273fca80cb126430a8b058aba3eb52
1/* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Christian Kellner <gicmo@gnome.org> 21 */ 22 23#include <config.h> 24#include "gbufferedoutputstream.h" 25#include "goutputstream.h" 26#include "gsimpleasyncresult.h" 27#include "string.h" 28#include "glibintl.h" 29 30#include <gioalias.h> 31 32/** 33 * SECTION:gbufferedoutputstream 34 * @short_description: Buffered Output Stream 35 * @see_also: #GFilterOutputStream, #GOutputStream 36 * 37 * Buffered output stream implements #GFilterOutputStream and provides 38 * for buffered writes. 39 * 40 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes. 41 * 42 * To create a buffered output stream, use g_buffered_output_stream_new(), or 43 * g_buffered_output_stream_new_sized() to specify the buffer's size at construction. 44 * 45 * To get the size of a buffer within a buffered input stream, use 46 * g_buffered_output_stream_get_buffer_size(). To change the size of a 47 * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size(). 48 * Note: the buffer's size cannot be reduced below the size of the data within the 49 * buffer. 50 * 51 **/ 52 53 54 55#define DEFAULT_BUFFER_SIZE 4096 56 57struct _GBufferedOutputStreamPrivate { 58 guint8 *buffer; 59 gsize len; 60 goffset pos; 61 gboolean auto_grow; 62}; 63 64enum { 65 PROP_0, 66 PROP_BUFSIZE 67}; 68 69static void g_buffered_output_stream_set_property (GObject *object, 70 guint prop_id, 71 const GValue *value, 72 GParamSpec *pspec); 73 74static void g_buffered_output_stream_get_property (GObject *object, 75 guint prop_id, 76 GValue *value, 77 GParamSpec *pspec); 78static void g_buffered_output_stream_finalize (GObject *object); 79 80 81static gssize g_buffered_output_stream_write (GOutputStream *stream, 82 const void *buffer, 83 gsize count, 84 GCancellable *cancellable, 85 GError **error); 86static gboolean g_buffered_output_stream_flush (GOutputStream *stream, 87 GCancellable *cancellable, 88 GError **error); 89static gboolean g_buffered_output_stream_close (GOutputStream *stream, 90 GCancellable *cancellable, 91 GError **error); 92 93static void g_buffered_output_stream_write_async (GOutputStream *stream, 94 const void *buffer, 95 gsize count, 96 int io_priority, 97 GCancellable *cancellable, 98 GAsyncReadyCallback callback, 99 gpointer data); 100static gssize g_buffered_output_stream_write_finish (GOutputStream *stream, 101 GAsyncResult *result, 102 GError **error); 103static void g_buffered_output_stream_flush_async (GOutputStream *stream, 104 int io_priority, 105 GCancellable *cancellable, 106 GAsyncReadyCallback callback, 107 gpointer data); 108static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream, 109 GAsyncResult *result, 110 GError **error); 111static void g_buffered_output_stream_close_async (GOutputStream *stream, 112 int io_priority, 113 GCancellable *cancellable, 114 GAsyncReadyCallback callback, 115 gpointer data); 116static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream, 117 GAsyncResult *result, 118 GError **error); 119 120G_DEFINE_TYPE (GBufferedOutputStream, 121 g_buffered_output_stream, 122 G_TYPE_FILTER_OUTPUT_STREAM) 123 124 125static void 126g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass) 127{ 128 GObjectClass *object_class; 129 GOutputStreamClass *ostream_class; 130 131 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate)); 132 133 object_class = G_OBJECT_CLASS (klass); 134 object_class->get_property = g_buffered_output_stream_get_property; 135 object_class->set_property = g_buffered_output_stream_set_property; 136 object_class->finalize = g_buffered_output_stream_finalize; 137 138 ostream_class = G_OUTPUT_STREAM_CLASS (klass); 139 ostream_class->write = g_buffered_output_stream_write; 140 ostream_class->flush = g_buffered_output_stream_flush; 141 ostream_class->close = g_buffered_output_stream_close; 142 ostream_class->write_async = g_buffered_output_stream_write_async; 143 ostream_class->write_finish = g_buffered_output_stream_write_finish; 144 ostream_class->flush_async = g_buffered_output_stream_flush_async; 145 ostream_class->flush_finish = g_buffered_output_stream_flush_finish; 146 ostream_class->close_async = g_buffered_output_stream_close_async; 147 ostream_class->close_finish = g_buffered_output_stream_close_finish; 148 149 g_object_class_install_property (object_class, 150 PROP_BUFSIZE, 151 g_param_spec_uint ("buffer-size", 152 P_("Buffer Size"), 153 P_("The size of the backend buffer"), 154 1, 155 G_MAXUINT, 156 DEFAULT_BUFFER_SIZE, 157 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | 158 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 159 160} 161 162/** 163 * g_buffered_output_stream_get_buffer_size: 164 * @stream: a #GBufferedOutputStream. 165 * 166 * Gets the size of the buffer in the @stream. 167 * 168 * Returns: the current size of the buffer. 169 **/ 170gsize 171g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream) 172{ 173 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1); 174 175 return stream->priv->len; 176} 177 178/** 179 * g_buffered_output_stream_set_buffer_size: 180 * @stream: a #GBufferedOutputStream. 181 * @size: a #gsize. 182 * 183 * Sets the size of the internal buffer to @size. 184 **/ 185void 186g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream, 187 gsize size) 188{ 189 GBufferedOutputStreamPrivate *priv; 190 guint8 *buffer; 191 192 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 193 194 priv = stream->priv; 195 196 if (priv->buffer) 197 { 198 size = MAX (size, priv->pos); 199 200 buffer = g_malloc (size); 201 memcpy (buffer, priv->buffer, priv->pos); 202 g_free (priv->buffer); 203 priv->buffer = buffer; 204 priv->len = size; 205 /* Keep old pos */ 206 } 207 else 208 { 209 priv->buffer = g_malloc (size); 210 priv->len = size; 211 priv->pos = 0; 212 } 213} 214 215/** 216 * g_buffered_output_stream_get_auto_grow: 217 * @stream: a #GBufferedOutputStream. 218 * 219 * Checks if the buffer automatically grows as data is added. 220 * 221 * Returns: %TRUE if the @stream's buffer automatically grows, 222 * %FALSE otherwise. 223 **/ 224gboolean 225g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream) 226{ 227 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE); 228 229 return stream->priv->auto_grow; 230} 231 232/** 233 * g_buffered_output_stream_set_auto_grow: 234 * @stream: a #GBufferedOutputStream. 235 * @auto_grow: a #gboolean. 236 * 237 * Sets whether or not the @stream's buffer should automatically grow. 238 **/ 239void 240g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream, 241 gboolean auto_grow) 242{ 243 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 244 245 stream->priv->auto_grow = auto_grow; 246} 247 248static void 249g_buffered_output_stream_set_property (GObject *object, 250 guint prop_id, 251 const GValue *value, 252 GParamSpec *pspec) 253{ 254 GBufferedOutputStream *buffered_stream; 255 GBufferedOutputStreamPrivate *priv; 256 257 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 258 priv = buffered_stream->priv; 259 260 switch (prop_id) 261 { 262 263 case PROP_BUFSIZE: 264 g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value)); 265 break; 266 267 default: 268 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 269 break; 270 } 271 272} 273 274static void 275g_buffered_output_stream_get_property (GObject *object, 276 guint prop_id, 277 GValue *value, 278 GParamSpec *pspec) 279{ 280 GBufferedOutputStream *buffered_stream; 281 GBufferedOutputStreamPrivate *priv; 282 283 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 284 priv = buffered_stream->priv; 285 286 switch (prop_id) 287 { 288 289 case PROP_BUFSIZE: 290 g_value_set_uint (value, priv->len); 291 break; 292 293 default: 294 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 295 break; 296 } 297 298} 299 300static void 301g_buffered_output_stream_finalize (GObject *object) 302{ 303 GBufferedOutputStream *stream; 304 GBufferedOutputStreamPrivate *priv; 305 306 stream = G_BUFFERED_OUTPUT_STREAM (object); 307 priv = stream->priv; 308 309 g_free (priv->buffer); 310 311 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) 312 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object); 313} 314 315static void 316g_buffered_output_stream_init (GBufferedOutputStream *stream) 317{ 318 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 319 G_TYPE_BUFFERED_OUTPUT_STREAM, 320 GBufferedOutputStreamPrivate); 321 322} 323 324/** 325 * g_buffered_output_stream_new: 326 * @base_stream: a #GOutputStream. 327 * 328 * Creates a new buffered output stream for a base stream. 329 * 330 * Returns: a #GOutputStream for the given @base_stream. 331 **/ 332GOutputStream * 333g_buffered_output_stream_new (GOutputStream *base_stream) 334{ 335 GOutputStream *stream; 336 337 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 338 339 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 340 "base-stream", base_stream, 341 NULL); 342 343 return stream; 344} 345 346/** 347 * g_buffered_output_stream_new_sized: 348 * @base_stream: a #GOutputStream. 349 * @size: a #gsize. 350 * 351 * Creates a new buffered output stream with a given buffer size. 352 * 353 * Returns: a #GOutputStream with an internal buffer set to @size. 354 **/ 355GOutputStream * 356g_buffered_output_stream_new_sized (GOutputStream *base_stream, 357 guint size) 358{ 359 GOutputStream *stream; 360 361 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 362 363 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 364 "base-stream", base_stream, 365 "buffer-size", size, 366 NULL); 367 368 return stream; 369} 370 371static gboolean 372flush_buffer (GBufferedOutputStream *stream, 373 GCancellable *cancellable, 374 GError **error) 375{ 376 GBufferedOutputStreamPrivate *priv; 377 GOutputStream *base_stream; 378 gboolean res; 379 gsize bytes_written; 380 gsize count; 381 382 priv = stream->priv; 383 bytes_written = 0; 384 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 385 386 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE); 387 388 res = g_output_stream_write_all (base_stream, 389 priv->buffer, 390 priv->pos, 391 &bytes_written, 392 cancellable, 393 error); 394 395 count = priv->pos - bytes_written; 396 397 if (count > 0) 398 g_memmove (priv->buffer, priv->buffer + bytes_written, count); 399 400 priv->pos -= bytes_written; 401 402 return res; 403} 404 405static gssize 406g_buffered_output_stream_write (GOutputStream *stream, 407 const void *buffer, 408 gsize count, 409 GCancellable *cancellable, 410 GError **error) 411{ 412 GBufferedOutputStream *bstream; 413 GBufferedOutputStreamPrivate *priv; 414 gboolean res; 415 gsize n; 416 gsize new_size; 417 418 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 419 priv = bstream->priv; 420 421 n = priv->len - priv->pos; 422 423 if (priv->auto_grow && n < count) 424 { 425 new_size = MAX (priv->len * 2, priv->len + count); 426 g_buffered_output_stream_set_buffer_size (bstream, new_size); 427 } 428 else if (n == 0) 429 { 430 res = flush_buffer (bstream, cancellable, error); 431 432 if (res == FALSE) 433 return -1; 434 } 435 436 n = priv->len - priv->pos; 437 438 count = MIN (count, n); 439 memcpy (priv->buffer + priv->pos, buffer, count); 440 priv->pos += count; 441 442 return count; 443} 444 445static gboolean 446g_buffered_output_stream_flush (GOutputStream *stream, 447 GCancellable *cancellable, 448 GError **error) 449{ 450 GBufferedOutputStream *bstream; 451 GBufferedOutputStreamPrivate *priv; 452 GOutputStream *base_stream; 453 gboolean res; 454 455 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 456 priv = bstream->priv; 457 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 458 459 res = flush_buffer (bstream, cancellable, error); 460 461 if (res == FALSE) 462 return FALSE; 463 464 res = g_output_stream_flush (base_stream, cancellable, error); 465 466 return res; 467} 468 469static gboolean 470g_buffered_output_stream_close (GOutputStream *stream, 471 GCancellable *cancellable, 472 GError **error) 473{ 474 GBufferedOutputStream *bstream; 475 GBufferedOutputStreamPrivate *priv; 476 GOutputStream *base_stream; 477 gboolean res; 478 479 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 480 priv = bstream->priv; 481 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream; 482 483 res = flush_buffer (bstream, cancellable, error); 484 485 /* report the first error but still close the stream */ 486 if (res) 487 res = g_output_stream_close (base_stream, cancellable, error); 488 else 489 g_output_stream_close (base_stream, cancellable, NULL); 490 491 return res; 492} 493 494/* ************************** */ 495/* Async stuff implementation */ 496/* ************************** */ 497 498/* TODO: This should be using the base class async ops, not threads */ 499 500typedef struct { 501 502 guint flush_stream : 1; 503 guint close_stream : 1; 504 505} FlushData; 506 507static void 508free_flush_data (gpointer data) 509{ 510 g_slice_free (FlushData, data); 511} 512 513/* This function is used by all three (i.e. 514 * _write, _flush, _close) functions since 515 * all of them will need to flush the buffer 516 * and so closing and writing is just a special 517 * case of flushing + some addition stuff */ 518static void 519flush_buffer_thread (GSimpleAsyncResult *result, 520 GObject *object, 521 GCancellable *cancellable) 522{ 523 GBufferedOutputStream *stream; 524 GOutputStream *base_stream; 525 FlushData *fdata; 526 gboolean res; 527 GError *error = NULL; 528 529 stream = G_BUFFERED_OUTPUT_STREAM (object); 530 fdata = g_simple_async_result_get_op_res_gpointer (result); 531 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 532 533 res = flush_buffer (stream, cancellable, &error); 534 535 /* if flushing the buffer didn't work don't even bother 536 * to flush the stream but just report that error */ 537 if (res && fdata->flush_stream) 538 res = g_output_stream_flush (base_stream, cancellable, &error); 539 540 if (fdata->close_stream) 541 { 542 543 /* if flushing the buffer or the stream returned 544 * an error report that first error but still try 545 * close the stream */ 546 if (res == FALSE) 547 g_output_stream_close (base_stream, cancellable, NULL); 548 else 549 res = g_output_stream_close (base_stream, cancellable, &error); 550 } 551 552 if (res == FALSE) 553 { 554 g_simple_async_result_set_from_error (result, error); 555 g_error_free (error); 556 } 557} 558 559typedef struct { 560 561 FlushData fdata; 562 563 gsize count; 564 const void *buffer; 565 566} WriteData; 567 568static void 569free_write_data (gpointer data) 570{ 571 g_slice_free (WriteData, data); 572} 573 574static void 575g_buffered_output_stream_write_async (GOutputStream *stream, 576 const void *buffer, 577 gsize count, 578 int io_priority, 579 GCancellable *cancellable, 580 GAsyncReadyCallback callback, 581 gpointer data) 582{ 583 GBufferedOutputStream *buffered_stream; 584 GBufferedOutputStreamPrivate *priv; 585 GSimpleAsyncResult *res; 586 WriteData *wdata; 587 588 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 589 priv = buffered_stream->priv; 590 591 wdata = g_slice_new (WriteData); 592 wdata->count = count; 593 wdata->buffer = buffer; 594 595 res = g_simple_async_result_new (G_OBJECT (stream), 596 callback, 597 data, 598 g_buffered_output_stream_write_async); 599 600 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data); 601 602 /* if we have space left directly call the 603 * callback (from idle) otherwise schedule a buffer 604 * flush in the thread. In both cases the actual 605 * copying of the data to the buffer will be done in 606 * the write_finish () func since that should 607 * be fast enough */ 608 if (priv->len - priv->pos > 0) 609 { 610 g_simple_async_result_complete_in_idle (res); 611 } 612 else 613 { 614 wdata->fdata.flush_stream = FALSE; 615 wdata->fdata.close_stream = FALSE; 616 g_simple_async_result_run_in_thread (res, 617 flush_buffer_thread, 618 io_priority, 619 cancellable); 620 g_object_unref (res); 621 } 622} 623 624static gssize 625g_buffered_output_stream_write_finish (GOutputStream *stream, 626 GAsyncResult *result, 627 GError **error) 628{ 629 GBufferedOutputStreamPrivate *priv; 630 GBufferedOutputStream *buffered_stream; 631 GSimpleAsyncResult *simple; 632 WriteData *wdata; 633 gssize count; 634 635 simple = G_SIMPLE_ASYNC_RESULT (result); 636 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 637 priv = buffered_stream->priv; 638 639 g_assert (g_simple_async_result_get_source_tag (simple) == 640 g_buffered_output_stream_write_async); 641 642 wdata = g_simple_async_result_get_op_res_gpointer (simple); 643 644 /* Now do the real copying of data to the buffer */ 645 count = priv->len - priv->pos; 646 count = MIN (wdata->count, count); 647 648 memcpy (priv->buffer + priv->pos, wdata->buffer, count); 649 650 priv->pos += count; 651 652 return count; 653} 654 655static void 656g_buffered_output_stream_flush_async (GOutputStream *stream, 657 int io_priority, 658 GCancellable *cancellable, 659 GAsyncReadyCallback callback, 660 gpointer data) 661{ 662 GSimpleAsyncResult *res; 663 FlushData *fdata; 664 665 fdata = g_slice_new (FlushData); 666 fdata->flush_stream = TRUE; 667 fdata->close_stream = FALSE; 668 669 res = g_simple_async_result_new (G_OBJECT (stream), 670 callback, 671 data, 672 g_buffered_output_stream_flush_async); 673 674 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 675 676 g_simple_async_result_run_in_thread (res, 677 flush_buffer_thread, 678 io_priority, 679 cancellable); 680 g_object_unref (res); 681} 682 683static gboolean 684g_buffered_output_stream_flush_finish (GOutputStream *stream, 685 GAsyncResult *result, 686 GError **error) 687{ 688 GSimpleAsyncResult *simple; 689 690 simple = G_SIMPLE_ASYNC_RESULT (result); 691 692 g_assert (g_simple_async_result_get_source_tag (simple) == 693 g_buffered_output_stream_flush_async); 694 695 return TRUE; 696} 697 698static void 699g_buffered_output_stream_close_async (GOutputStream *stream, 700 int io_priority, 701 GCancellable *cancellable, 702 GAsyncReadyCallback callback, 703 gpointer data) 704{ 705 GSimpleAsyncResult *res; 706 FlushData *fdata; 707 708 fdata = g_slice_new (FlushData); 709 fdata->close_stream = TRUE; 710 711 res = g_simple_async_result_new (G_OBJECT (stream), 712 callback, 713 data, 714 g_buffered_output_stream_close_async); 715 716 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 717 718 g_simple_async_result_run_in_thread (res, 719 flush_buffer_thread, 720 io_priority, 721 cancellable); 722 g_object_unref (res); 723} 724 725static gboolean 726g_buffered_output_stream_close_finish (GOutputStream *stream, 727 GAsyncResult *result, 728 GError **error) 729{ 730 GSimpleAsyncResult *simple; 731 732 simple = G_SIMPLE_ASYNC_RESULT (result); 733 734 g_assert (g_simple_async_result_get_source_tag (simple) == 735 g_buffered_output_stream_flush_async); 736 737 return TRUE; 738} 739 740#define __G_BUFFERED_OUTPUT_STREAM_C__ 741#include "gioaliasdef.c" 742