gbufferedoutputstream.c revision 61582bd91cba508362d0e28db4d6e3f307b27b48
1/* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Christian Kellner <gicmo@gnome.org> 21 */ 22 23#include <config.h> 24#include "gbufferedoutputstream.h" 25#include "goutputstream.h" 26#include "gsimpleasyncresult.h" 27#include "string.h" 28#include "glibintl.h" 29 30/** 31 * SECTION:gbufferedoutputstream 32 * @short_description: Buffered Output Stream 33 * @see_also: #GFilterOutputStream, #GOutputStream 34 * 35 * Buffered output stream implements #GFilterOutputStream and provides 36 * for buffered writes. 37 * 38 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes. 39 * 40 * To create a buffered output stream, use g_buffered_output_stream_new(), or 41 * g_buffered_output_stream_new_sized() to specify the buffer's size at construction. 42 * 43 * To get the size of a buffer within a buffered input stream, use 44 * g_buffered_output_stream_get_buffer_size(). To change the size of a 45 * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size(). 46 * Note: the buffer's size cannot be reduced below the size of the data within the 47 * buffer. 48 * 49 **/ 50 51 52 53#define DEFAULT_BUFFER_SIZE 4096 54 55struct _GBufferedOutputStreamPrivate { 56 guint8 *buffer; 57 gsize len; 58 goffset pos; 59 gboolean auto_grow; 60}; 61 62enum { 63 PROP_0, 64 PROP_BUFSIZE 65}; 66 67static void g_buffered_output_stream_set_property (GObject *object, 68 guint prop_id, 69 const GValue *value, 70 GParamSpec *pspec); 71 72static void g_buffered_output_stream_get_property (GObject *object, 73 guint prop_id, 74 GValue *value, 75 GParamSpec *pspec); 76static void g_buffered_output_stream_finalize (GObject *object); 77 78 79static gssize g_buffered_output_stream_write (GOutputStream *stream, 80 const void *buffer, 81 gsize count, 82 GCancellable *cancellable, 83 GError **error); 84static gboolean g_buffered_output_stream_flush (GOutputStream *stream, 85 GCancellable *cancellable, 86 GError **error); 87static gboolean g_buffered_output_stream_close (GOutputStream *stream, 88 GCancellable *cancellable, 89 GError **error); 90 91static void g_buffered_output_stream_write_async (GOutputStream *stream, 92 const void *buffer, 93 gsize count, 94 int io_priority, 95 GCancellable *cancellable, 96 GAsyncReadyCallback callback, 97 gpointer data); 98static gssize g_buffered_output_stream_write_finish (GOutputStream *stream, 99 GAsyncResult *result, 100 GError **error); 101static void g_buffered_output_stream_flush_async (GOutputStream *stream, 102 int io_priority, 103 GCancellable *cancellable, 104 GAsyncReadyCallback callback, 105 gpointer data); 106static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream, 107 GAsyncResult *result, 108 GError **error); 109static void g_buffered_output_stream_close_async (GOutputStream *stream, 110 int io_priority, 111 GCancellable *cancellable, 112 GAsyncReadyCallback callback, 113 gpointer data); 114static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream, 115 GAsyncResult *result, 116 GError **error); 117 118G_DEFINE_TYPE (GBufferedOutputStream, 119 g_buffered_output_stream, 120 G_TYPE_FILTER_OUTPUT_STREAM) 121 122 123static void 124g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass) 125{ 126 GObjectClass *object_class; 127 GOutputStreamClass *ostream_class; 128 129 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate)); 130 131 object_class = G_OBJECT_CLASS (klass); 132 object_class->get_property = g_buffered_output_stream_get_property; 133 object_class->set_property = g_buffered_output_stream_set_property; 134 object_class->finalize = g_buffered_output_stream_finalize; 135 136 ostream_class = G_OUTPUT_STREAM_CLASS (klass); 137 ostream_class->write = g_buffered_output_stream_write; 138 ostream_class->flush = g_buffered_output_stream_flush; 139 ostream_class->close = g_buffered_output_stream_close; 140 ostream_class->write_async = g_buffered_output_stream_write_async; 141 ostream_class->write_finish = g_buffered_output_stream_write_finish; 142 ostream_class->flush_async = g_buffered_output_stream_flush_async; 143 ostream_class->flush_finish = g_buffered_output_stream_flush_finish; 144 ostream_class->close_async = g_buffered_output_stream_close_async; 145 ostream_class->close_finish = g_buffered_output_stream_close_finish; 146 147 g_object_class_install_property (object_class, 148 PROP_BUFSIZE, 149 g_param_spec_uint ("buffer-size", 150 P_("Buffer Size"), 151 P_("The size of the backend buffer"), 152 1, 153 G_MAXUINT, 154 DEFAULT_BUFFER_SIZE, 155 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | 156 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 157 158} 159 160/** 161 * g_buffered_output_stream_get_buffer_size: 162 * @stream: a #GBufferedOutputStream. 163 * 164 * Gets the size of the buffer in the @stream. 165 * 166 * Returns: the current size of the buffer. 167 **/ 168gsize 169g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream) 170{ 171 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1); 172 173 return stream->priv->len; 174} 175 176/** 177 * g_buffered_output_stream_set_buffer_size: 178 * @stream: a #GBufferedOutputStream. 179 * @size: a #gsize. 180 * 181 * Sets the size of the internal buffer to @size. 182 **/ 183void 184g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream, 185 gsize size) 186{ 187 GBufferedOutputStreamPrivate *priv; 188 guint8 *buffer; 189 190 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 191 192 priv = stream->priv; 193 194 if (priv->buffer) 195 { 196 size = MAX (size, priv->pos); 197 198 buffer = g_malloc (size); 199 memcpy (buffer, priv->buffer, priv->pos); 200 g_free (priv->buffer); 201 priv->buffer = buffer; 202 priv->len = size; 203 /* Keep old pos */ 204 } 205 else 206 { 207 priv->buffer = g_malloc (size); 208 priv->len = size; 209 priv->pos = 0; 210 } 211} 212 213/** 214 * g_buffered_output_stream_get_auto_grow: 215 * @stream: a #GBufferedOutputStream. 216 * 217 * Checks if the buffer automatically grows as data is added. 218 * 219 * Returns: %TRUE if the @stream's buffer automatically grows, 220 * %FALSE otherwise. 221 **/ 222gboolean 223g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream) 224{ 225 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE); 226 227 return stream->priv->auto_grow; 228} 229 230/** 231 * g_buffered_output_stream_set_auto_grow: 232 * @stream: a #GBufferedOutputStream. 233 * @auto_grow: a #gboolean. 234 * 235 * Sets whether or not the @stream's buffer should automatically grow. 236 **/ 237void 238g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream, 239 gboolean auto_grow) 240{ 241 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 242 243 stream->priv->auto_grow = auto_grow; 244} 245 246static void 247g_buffered_output_stream_set_property (GObject *object, 248 guint prop_id, 249 const GValue *value, 250 GParamSpec *pspec) 251{ 252 GBufferedOutputStream *buffered_stream; 253 GBufferedOutputStreamPrivate *priv; 254 255 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 256 priv = buffered_stream->priv; 257 258 switch (prop_id) 259 { 260 261 case PROP_BUFSIZE: 262 g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value)); 263 break; 264 265 default: 266 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 267 break; 268 } 269 270} 271 272static void 273g_buffered_output_stream_get_property (GObject *object, 274 guint prop_id, 275 GValue *value, 276 GParamSpec *pspec) 277{ 278 GBufferedOutputStream *buffered_stream; 279 GBufferedOutputStreamPrivate *priv; 280 281 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 282 priv = buffered_stream->priv; 283 284 switch (prop_id) 285 { 286 287 case PROP_BUFSIZE: 288 g_value_set_uint (value, priv->len); 289 break; 290 291 default: 292 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 293 break; 294 } 295 296} 297 298static void 299g_buffered_output_stream_finalize (GObject *object) 300{ 301 GBufferedOutputStream *stream; 302 GBufferedOutputStreamPrivate *priv; 303 304 stream = G_BUFFERED_OUTPUT_STREAM (object); 305 priv = stream->priv; 306 307 g_free (priv->buffer); 308 309 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) 310 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object); 311} 312 313static void 314g_buffered_output_stream_init (GBufferedOutputStream *stream) 315{ 316 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 317 G_TYPE_BUFFERED_OUTPUT_STREAM, 318 GBufferedOutputStreamPrivate); 319 320} 321 322/** 323 * g_buffered_output_stream_new: 324 * @base_stream: a #GOutputStream. 325 * 326 * Creates a new buffered output stream for a base stream. 327 * 328 * Returns: a #GOutputStream for the given @base_stream. 329 **/ 330GOutputStream * 331g_buffered_output_stream_new (GOutputStream *base_stream) 332{ 333 GOutputStream *stream; 334 335 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 336 337 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 338 "base-stream", base_stream, 339 NULL); 340 341 return stream; 342} 343 344/** 345 * g_buffered_output_stream_new_sized: 346 * @base_stream: a #GOutputStream. 347 * @size: a #gsize. 348 * 349 * Creates a new buffered output stream with a given buffer size. 350 * 351 * Returns: a #GOutputStream with an internal buffer set to @size. 352 **/ 353GOutputStream * 354g_buffered_output_stream_new_sized (GOutputStream *base_stream, 355 guint size) 356{ 357 GOutputStream *stream; 358 359 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 360 361 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 362 "base-stream", base_stream, 363 "buffer-size", size, 364 NULL); 365 366 return stream; 367} 368 369static gboolean 370flush_buffer (GBufferedOutputStream *stream, 371 GCancellable *cancellable, 372 GError **error) 373{ 374 GBufferedOutputStreamPrivate *priv; 375 GOutputStream *base_stream; 376 gboolean res; 377 gsize bytes_written; 378 gsize count; 379 380 priv = stream->priv; 381 bytes_written = 0; 382 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 383 384 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE); 385 386 res = g_output_stream_write_all (base_stream, 387 priv->buffer, 388 priv->pos, 389 &bytes_written, 390 cancellable, 391 error); 392 393 count = priv->pos - bytes_written; 394 395 if (count > 0) 396 g_memmove (priv->buffer, priv->buffer + bytes_written, count); 397 398 priv->pos -= bytes_written; 399 400 return res; 401} 402 403static gssize 404g_buffered_output_stream_write (GOutputStream *stream, 405 const void *buffer, 406 gsize count, 407 GCancellable *cancellable, 408 GError **error) 409{ 410 GBufferedOutputStream *bstream; 411 GBufferedOutputStreamPrivate *priv; 412 gboolean res; 413 gsize n; 414 gsize new_size; 415 416 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 417 priv = bstream->priv; 418 419 n = priv->len - priv->pos; 420 421 if (priv->auto_grow && n < count) 422 { 423 new_size = MAX (priv->len * 2, priv->len + count); 424 g_buffered_output_stream_set_buffer_size (bstream, new_size); 425 } 426 else if (n == 0) 427 { 428 res = flush_buffer (bstream, cancellable, error); 429 430 if (res == FALSE) 431 return -1; 432 } 433 434 n = priv->len - priv->pos; 435 436 count = MIN (count, n); 437 memcpy (priv->buffer + priv->pos, buffer, count); 438 priv->pos += count; 439 440 return count; 441} 442 443static gboolean 444g_buffered_output_stream_flush (GOutputStream *stream, 445 GCancellable *cancellable, 446 GError **error) 447{ 448 GBufferedOutputStream *bstream; 449 GBufferedOutputStreamPrivate *priv; 450 GOutputStream *base_stream; 451 gboolean res; 452 453 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 454 priv = bstream->priv; 455 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 456 457 res = flush_buffer (bstream, cancellable, error); 458 459 if (res == FALSE) { 460 return FALSE; 461 } 462 463 res = g_output_stream_flush (base_stream, 464 cancellable, 465 error); 466 return res; 467} 468 469static gboolean 470g_buffered_output_stream_close (GOutputStream *stream, 471 GCancellable *cancellable, 472 GError **error) 473{ 474 GBufferedOutputStream *bstream; 475 GBufferedOutputStreamPrivate *priv; 476 GOutputStream *base_stream; 477 gboolean res; 478 479 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 480 priv = bstream->priv; 481 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream; 482 483 res = flush_buffer (bstream, cancellable, error); 484 485 /* report the first error but still close the stream */ 486 if (res) 487 { 488 res = g_output_stream_close (base_stream, 489 cancellable, 490 error); 491 } 492 else 493 { 494 g_output_stream_close (base_stream, 495 cancellable, 496 NULL); 497 } 498 499 return res; 500} 501 502/* ************************** */ 503/* Async stuff implementation */ 504/* ************************** */ 505 506/* TODO: This should be using the base class async ops, not threads */ 507 508typedef struct { 509 510 guint flush_stream : 1; 511 guint close_stream : 1; 512 513} FlushData; 514 515static void 516free_flush_data (gpointer data) 517{ 518 g_slice_free (FlushData, data); 519} 520 521/* This function is used by all three (i.e. 522 * _write, _flush, _close) functions since 523 * all of them will need to flush the buffer 524 * and so closing and writing is just a special 525 * case of flushing + some addition stuff */ 526static void 527flush_buffer_thread (GSimpleAsyncResult *result, 528 GObject *object, 529 GCancellable *cancellable) 530{ 531 GBufferedOutputStream *stream; 532 GOutputStream *base_stream; 533 FlushData *fdata; 534 gboolean res; 535 GError *error = NULL; 536 537 stream = G_BUFFERED_OUTPUT_STREAM (object); 538 fdata = g_simple_async_result_get_op_res_gpointer (result); 539 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 540 541 res = flush_buffer (stream, cancellable, &error); 542 543 /* if flushing the buffer didn't work don't even bother 544 * to flush the stream but just report that error */ 545 if (res && fdata->flush_stream) 546 { 547 res = g_output_stream_flush (base_stream, 548 cancellable, 549 &error); 550 } 551 552 if (fdata->close_stream) 553 { 554 555 /* if flushing the buffer or the stream returned 556 * an error report that first error but still try 557 * close the stream */ 558 if (res == FALSE) 559 { 560 g_output_stream_close (base_stream, 561 cancellable, 562 NULL); 563 } 564 else 565 { 566 res = g_output_stream_close (base_stream, 567 cancellable, 568 &error); 569 } 570 571 } 572 573 if (res == FALSE) 574 { 575 g_simple_async_result_set_from_error (result, error); 576 g_error_free (error); 577 } 578} 579 580typedef struct { 581 582 FlushData fdata; 583 584 gsize count; 585 const void *buffer; 586 587} WriteData; 588 589static void 590free_write_data (gpointer data) 591{ 592 g_slice_free (WriteData, data); 593} 594 595static void 596g_buffered_output_stream_write_async (GOutputStream *stream, 597 const void *buffer, 598 gsize count, 599 int io_priority, 600 GCancellable *cancellable, 601 GAsyncReadyCallback callback, 602 gpointer data) 603{ 604 GBufferedOutputStream *buffered_stream; 605 GBufferedOutputStreamPrivate *priv; 606 GSimpleAsyncResult *res; 607 WriteData *wdata; 608 609 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 610 priv = buffered_stream->priv; 611 612 wdata = g_slice_new (WriteData); 613 wdata->count = count; 614 wdata->buffer = buffer; 615 616 res = g_simple_async_result_new (G_OBJECT (stream), 617 callback, 618 data, 619 g_buffered_output_stream_write_async); 620 621 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data); 622 623 /* if we have space left directly call the 624 * callback (from idle) otherwise schedule a buffer 625 * flush in the thread. In both cases the actual 626 * copying of the data to the buffer will be done in 627 * the write_finish () func since that should 628 * be fast enough */ 629 if (priv->len - priv->pos > 0) 630 { 631 g_simple_async_result_complete_in_idle (res); 632 } 633 else 634 { 635 wdata->fdata.flush_stream = FALSE; 636 wdata->fdata.close_stream = FALSE; 637 g_simple_async_result_run_in_thread (res, 638 flush_buffer_thread, 639 io_priority, 640 cancellable); 641 g_object_unref (res); 642 } 643} 644 645static gssize 646g_buffered_output_stream_write_finish (GOutputStream *stream, 647 GAsyncResult *result, 648 GError **error) 649{ 650 GBufferedOutputStreamPrivate *priv; 651 GBufferedOutputStream *buffered_stream; 652 GSimpleAsyncResult *simple; 653 WriteData *wdata; 654 gssize count; 655 656 simple = G_SIMPLE_ASYNC_RESULT (result); 657 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 658 priv = buffered_stream->priv; 659 660 g_assert (g_simple_async_result_get_source_tag (simple) == 661 g_buffered_output_stream_write_async); 662 663 wdata = g_simple_async_result_get_op_res_gpointer (simple); 664 665 /* Now do the real copying of data to the buffer */ 666 count = priv->len - priv->pos; 667 count = MIN (wdata->count, count); 668 669 memcpy (priv->buffer + priv->pos, wdata->buffer, count); 670 671 priv->pos += count; 672 673 return count; 674} 675 676static void 677g_buffered_output_stream_flush_async (GOutputStream *stream, 678 int io_priority, 679 GCancellable *cancellable, 680 GAsyncReadyCallback callback, 681 gpointer data) 682{ 683 GSimpleAsyncResult *res; 684 FlushData *fdata; 685 686 fdata = g_slice_new (FlushData); 687 fdata->flush_stream = TRUE; 688 fdata->close_stream = FALSE; 689 690 res = g_simple_async_result_new (G_OBJECT (stream), 691 callback, 692 data, 693 g_buffered_output_stream_flush_async); 694 695 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 696 697 g_simple_async_result_run_in_thread (res, 698 flush_buffer_thread, 699 io_priority, 700 cancellable); 701 g_object_unref (res); 702} 703 704static gboolean 705g_buffered_output_stream_flush_finish (GOutputStream *stream, 706 GAsyncResult *result, 707 GError **error) 708{ 709 GSimpleAsyncResult *simple; 710 711 simple = G_SIMPLE_ASYNC_RESULT (result); 712 713 g_assert (g_simple_async_result_get_source_tag (simple) == 714 g_buffered_output_stream_flush_async); 715 716 return TRUE; 717} 718 719static void 720g_buffered_output_stream_close_async (GOutputStream *stream, 721 int io_priority, 722 GCancellable *cancellable, 723 GAsyncReadyCallback callback, 724 gpointer data) 725{ 726 GSimpleAsyncResult *res; 727 FlushData *fdata; 728 729 fdata = g_slice_new (FlushData); 730 fdata->close_stream = TRUE; 731 732 res = g_simple_async_result_new (G_OBJECT (stream), 733 callback, 734 data, 735 g_buffered_output_stream_close_async); 736 737 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 738 739 g_simple_async_result_run_in_thread (res, 740 flush_buffer_thread, 741 io_priority, 742 cancellable); 743 g_object_unref (res); 744} 745 746static gboolean 747g_buffered_output_stream_close_finish (GOutputStream *stream, 748 GAsyncResult *result, 749 GError **error) 750{ 751 GSimpleAsyncResult *simple; 752 753 simple = G_SIMPLE_ASYNC_RESULT (result); 754 755 g_assert (g_simple_async_result_get_source_tag (simple) == 756 g_buffered_output_stream_flush_async); 757 758 return TRUE; 759} 760 761/* vim: ts=2 sw=2 et */ 762