gbufferedoutputstream.c revision e919f0198b8e2106313eab55af29872c83fa6dc9
1/* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Christian Kellner <gicmo@gnome.org> 21 */ 22 23#include <config.h> 24#include "gbufferedoutputstream.h" 25#include "goutputstream.h" 26#include "gsimpleasyncresult.h" 27#include "string.h" 28#include "glibintl.h" 29 30#include <gioalias.h> 31 32/** 33 * SECTION:gbufferedoutputstream 34 * @short_description: Buffered Output Stream 35 * @see_also: #GFilterOutputStream, #GOutputStream 36 * 37 * Buffered output stream implements #GFilterOutputStream and provides 38 * for buffered writes. 39 * 40 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes. 41 * 42 * To create a buffered output stream, use g_buffered_output_stream_new(), 43 * or g_buffered_output_stream_new_sized() to specify the buffer's size 44 * at construction. 45 * 46 * To get the size of a buffer within a buffered input stream, use 47 * g_buffered_output_stream_get_buffer_size(). To change the size of a 48 * buffered output stream's buffer, use 49 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 50 * size cannot be reduced below the size of the data within the buffer. 51 **/ 52 53#define DEFAULT_BUFFER_SIZE 4096 54 55struct _GBufferedOutputStreamPrivate { 56 guint8 *buffer; 57 gsize len; 58 goffset pos; 59 gboolean auto_grow; 60}; 61 62enum { 63 PROP_0, 64 PROP_BUFSIZE, 65 PROP_AUTO_GROW 66}; 67 68static void g_buffered_output_stream_set_property (GObject *object, 69 guint prop_id, 70 const GValue *value, 71 GParamSpec *pspec); 72 73static void g_buffered_output_stream_get_property (GObject *object, 74 guint prop_id, 75 GValue *value, 76 GParamSpec *pspec); 77static void g_buffered_output_stream_finalize (GObject *object); 78 79 80static gssize g_buffered_output_stream_write (GOutputStream *stream, 81 const void *buffer, 82 gsize count, 83 GCancellable *cancellable, 84 GError **error); 85static gboolean g_buffered_output_stream_flush (GOutputStream *stream, 86 GCancellable *cancellable, 87 GError **error); 88static gboolean g_buffered_output_stream_close (GOutputStream *stream, 89 GCancellable *cancellable, 90 GError **error); 91 92static void g_buffered_output_stream_write_async (GOutputStream *stream, 93 const void *buffer, 94 gsize count, 95 int io_priority, 96 GCancellable *cancellable, 97 GAsyncReadyCallback callback, 98 gpointer data); 99static gssize g_buffered_output_stream_write_finish (GOutputStream *stream, 100 GAsyncResult *result, 101 GError **error); 102static void g_buffered_output_stream_flush_async (GOutputStream *stream, 103 int io_priority, 104 GCancellable *cancellable, 105 GAsyncReadyCallback callback, 106 gpointer data); 107static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream, 108 GAsyncResult *result, 109 GError **error); 110static void g_buffered_output_stream_close_async (GOutputStream *stream, 111 int io_priority, 112 GCancellable *cancellable, 113 GAsyncReadyCallback callback, 114 gpointer data); 115static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream, 116 GAsyncResult *result, 117 GError **error); 118 119G_DEFINE_TYPE (GBufferedOutputStream, 120 g_buffered_output_stream, 121 G_TYPE_FILTER_OUTPUT_STREAM) 122 123 124static void 125g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass) 126{ 127 GObjectClass *object_class; 128 GOutputStreamClass *ostream_class; 129 130 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate)); 131 132 object_class = G_OBJECT_CLASS (klass); 133 object_class->get_property = g_buffered_output_stream_get_property; 134 object_class->set_property = g_buffered_output_stream_set_property; 135 object_class->finalize = g_buffered_output_stream_finalize; 136 137 ostream_class = G_OUTPUT_STREAM_CLASS (klass); 138 ostream_class->write = g_buffered_output_stream_write; 139 ostream_class->flush = g_buffered_output_stream_flush; 140 ostream_class->close = g_buffered_output_stream_close; 141 ostream_class->write_async = g_buffered_output_stream_write_async; 142 ostream_class->write_finish = g_buffered_output_stream_write_finish; 143 ostream_class->flush_async = g_buffered_output_stream_flush_async; 144 ostream_class->flush_finish = g_buffered_output_stream_flush_finish; 145 ostream_class->close_async = g_buffered_output_stream_close_async; 146 ostream_class->close_finish = g_buffered_output_stream_close_finish; 147 148 g_object_class_install_property (object_class, 149 PROP_BUFSIZE, 150 g_param_spec_uint ("buffer-size", 151 P_("Buffer Size"), 152 P_("The size of the backend buffer"), 153 1, 154 G_MAXUINT, 155 DEFAULT_BUFFER_SIZE, 156 G_PARAM_READWRITE| 157 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 158 159 g_object_class_install_property (object_class, 160 PROP_AUTO_GROW, 161 g_param_spec_boolean ("auto-grow", 162 P_("Auto-grow"), 163 P_("Whether the buffer should automatically grow"), 164 FALSE, 165 G_PARAM_READWRITE| 166 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 167 168} 169 170/** 171 * g_buffered_output_stream_get_buffer_size: 172 * @stream: a #GBufferedOutputStream. 173 * 174 * Gets the size of the buffer in the @stream. 175 * 176 * Returns: the current size of the buffer. 177 **/ 178gsize 179g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream) 180{ 181 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1); 182 183 return stream->priv->len; 184} 185 186/** 187 * g_buffered_output_stream_set_buffer_size: 188 * @stream: a #GBufferedOutputStream. 189 * @size: a #gsize. 190 * 191 * Sets the size of the internal buffer to @size. 192 **/ 193void 194g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream, 195 gsize size) 196{ 197 GBufferedOutputStreamPrivate *priv; 198 guint8 *buffer; 199 200 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 201 202 priv = stream->priv; 203 204 if (size == priv->len) 205 return; 206 207 if (priv->buffer) 208 { 209 size = MAX (size, priv->pos); 210 211 buffer = g_malloc (size); 212 memcpy (buffer, priv->buffer, priv->pos); 213 g_free (priv->buffer); 214 priv->buffer = buffer; 215 priv->len = size; 216 /* Keep old pos */ 217 } 218 else 219 { 220 priv->buffer = g_malloc (size); 221 priv->len = size; 222 priv->pos = 0; 223 } 224 225 g_object_notify (G_OBJECT (stream), "buffer-size"); 226} 227 228/** 229 * g_buffered_output_stream_get_auto_grow: 230 * @stream: a #GBufferedOutputStream. 231 * 232 * Checks if the buffer automatically grows as data is added. 233 * 234 * Returns: %TRUE if the @stream's buffer automatically grows, 235 * %FALSE otherwise. 236 **/ 237gboolean 238g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream) 239{ 240 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE); 241 242 return stream->priv->auto_grow; 243} 244 245/** 246 * g_buffered_output_stream_set_auto_grow: 247 * @stream: a #GBufferedOutputStream. 248 * @auto_grow: a #gboolean. 249 * 250 * Sets whether or not the @stream's buffer should automatically grow. 251 **/ 252void 253g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream, 254 gboolean auto_grow) 255{ 256 GBufferedOutputStreamPrivate *priv; 257 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 258 priv = stream->priv; 259 auto_grow = auto_grow != FALSE; 260 if (priv->auto_grow != auto_grow) 261 { 262 priv->auto_grow = auto_grow; 263 g_object_notify (G_OBJECT (stream), "auto-grow"); 264 } 265} 266 267static void 268g_buffered_output_stream_set_property (GObject *object, 269 guint prop_id, 270 const GValue *value, 271 GParamSpec *pspec) 272{ 273 GBufferedOutputStream *stream; 274 275 stream = G_BUFFERED_OUTPUT_STREAM (object); 276 277 switch (prop_id) 278 { 279 case PROP_BUFSIZE: 280 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value)); 281 break; 282 283 case PROP_AUTO_GROW: 284 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value)); 285 break; 286 287 default: 288 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 289 break; 290 } 291 292} 293 294static void 295g_buffered_output_stream_get_property (GObject *object, 296 guint prop_id, 297 GValue *value, 298 GParamSpec *pspec) 299{ 300 GBufferedOutputStream *buffered_stream; 301 GBufferedOutputStreamPrivate *priv; 302 303 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 304 priv = buffered_stream->priv; 305 306 switch (prop_id) 307 { 308 case PROP_BUFSIZE: 309 g_value_set_uint (value, priv->len); 310 break; 311 312 case PROP_AUTO_GROW: 313 g_value_set_boolean (value, priv->auto_grow); 314 break; 315 316 default: 317 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 318 break; 319 } 320 321} 322 323static void 324g_buffered_output_stream_finalize (GObject *object) 325{ 326 GBufferedOutputStream *stream; 327 GBufferedOutputStreamPrivate *priv; 328 329 stream = G_BUFFERED_OUTPUT_STREAM (object); 330 priv = stream->priv; 331 332 g_free (priv->buffer); 333 334 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) 335 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object); 336} 337 338static void 339g_buffered_output_stream_init (GBufferedOutputStream *stream) 340{ 341 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 342 G_TYPE_BUFFERED_OUTPUT_STREAM, 343 GBufferedOutputStreamPrivate); 344 345} 346 347/** 348 * g_buffered_output_stream_new: 349 * @base_stream: a #GOutputStream. 350 * 351 * Creates a new buffered output stream for a base stream. 352 * 353 * Returns: a #GOutputStream for the given @base_stream. 354 **/ 355GOutputStream * 356g_buffered_output_stream_new (GOutputStream *base_stream) 357{ 358 GOutputStream *stream; 359 360 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 361 362 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 363 "base-stream", base_stream, 364 NULL); 365 366 return stream; 367} 368 369/** 370 * g_buffered_output_stream_new_sized: 371 * @base_stream: a #GOutputStream. 372 * @size: a #gsize. 373 * 374 * Creates a new buffered output stream with a given buffer size. 375 * 376 * Returns: a #GOutputStream with an internal buffer set to @size. 377 **/ 378GOutputStream * 379g_buffered_output_stream_new_sized (GOutputStream *base_stream, 380 guint size) 381{ 382 GOutputStream *stream; 383 384 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 385 386 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 387 "base-stream", base_stream, 388 "buffer-size", size, 389 NULL); 390 391 return stream; 392} 393 394static gboolean 395flush_buffer (GBufferedOutputStream *stream, 396 GCancellable *cancellable, 397 GError **error) 398{ 399 GBufferedOutputStreamPrivate *priv; 400 GOutputStream *base_stream; 401 gboolean res; 402 gsize bytes_written; 403 gsize count; 404 405 priv = stream->priv; 406 bytes_written = 0; 407 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 408 409 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE); 410 411 res = g_output_stream_write_all (base_stream, 412 priv->buffer, 413 priv->pos, 414 &bytes_written, 415 cancellable, 416 error); 417 418 count = priv->pos - bytes_written; 419 420 if (count > 0) 421 g_memmove (priv->buffer, priv->buffer + bytes_written, count); 422 423 priv->pos -= bytes_written; 424 425 return res; 426} 427 428static gssize 429g_buffered_output_stream_write (GOutputStream *stream, 430 const void *buffer, 431 gsize count, 432 GCancellable *cancellable, 433 GError **error) 434{ 435 GBufferedOutputStream *bstream; 436 GBufferedOutputStreamPrivate *priv; 437 gboolean res; 438 gsize n; 439 gsize new_size; 440 441 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 442 priv = bstream->priv; 443 444 n = priv->len - priv->pos; 445 446 if (priv->auto_grow && n < count) 447 { 448 new_size = MAX (priv->len * 2, priv->len + count); 449 g_buffered_output_stream_set_buffer_size (bstream, new_size); 450 } 451 else if (n == 0) 452 { 453 res = flush_buffer (bstream, cancellable, error); 454 455 if (res == FALSE) 456 return -1; 457 } 458 459 n = priv->len - priv->pos; 460 461 count = MIN (count, n); 462 memcpy (priv->buffer + priv->pos, buffer, count); 463 priv->pos += count; 464 465 return count; 466} 467 468static gboolean 469g_buffered_output_stream_flush (GOutputStream *stream, 470 GCancellable *cancellable, 471 GError **error) 472{ 473 GBufferedOutputStream *bstream; 474 GBufferedOutputStreamPrivate *priv; 475 GOutputStream *base_stream; 476 gboolean res; 477 478 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 479 priv = bstream->priv; 480 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 481 482 res = flush_buffer (bstream, cancellable, error); 483 484 if (res == FALSE) 485 return FALSE; 486 487 res = g_output_stream_flush (base_stream, cancellable, error); 488 489 return res; 490} 491 492static gboolean 493g_buffered_output_stream_close (GOutputStream *stream, 494 GCancellable *cancellable, 495 GError **error) 496{ 497 GBufferedOutputStream *bstream; 498 GBufferedOutputStreamPrivate *priv; 499 GOutputStream *base_stream; 500 gboolean res; 501 502 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 503 priv = bstream->priv; 504 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream; 505 506 res = flush_buffer (bstream, cancellable, error); 507 508 /* report the first error but still close the stream */ 509 if (res) 510 res = g_output_stream_close (base_stream, cancellable, error); 511 else 512 g_output_stream_close (base_stream, cancellable, NULL); 513 514 return res; 515} 516 517/* ************************** */ 518/* Async stuff implementation */ 519/* ************************** */ 520 521/* TODO: This should be using the base class async ops, not threads */ 522 523typedef struct { 524 525 guint flush_stream : 1; 526 guint close_stream : 1; 527 528} FlushData; 529 530static void 531free_flush_data (gpointer data) 532{ 533 g_slice_free (FlushData, data); 534} 535 536/* This function is used by all three (i.e. 537 * _write, _flush, _close) functions since 538 * all of them will need to flush the buffer 539 * and so closing and writing is just a special 540 * case of flushing + some addition stuff */ 541static void 542flush_buffer_thread (GSimpleAsyncResult *result, 543 GObject *object, 544 GCancellable *cancellable) 545{ 546 GBufferedOutputStream *stream; 547 GOutputStream *base_stream; 548 FlushData *fdata; 549 gboolean res; 550 GError *error = NULL; 551 552 stream = G_BUFFERED_OUTPUT_STREAM (object); 553 fdata = g_simple_async_result_get_op_res_gpointer (result); 554 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 555 556 res = flush_buffer (stream, cancellable, &error); 557 558 /* if flushing the buffer didn't work don't even bother 559 * to flush the stream but just report that error */ 560 if (res && fdata->flush_stream) 561 res = g_output_stream_flush (base_stream, cancellable, &error); 562 563 if (fdata->close_stream) 564 { 565 566 /* if flushing the buffer or the stream returned 567 * an error report that first error but still try 568 * close the stream */ 569 if (res == FALSE) 570 g_output_stream_close (base_stream, cancellable, NULL); 571 else 572 res = g_output_stream_close (base_stream, cancellable, &error); 573 } 574 575 if (res == FALSE) 576 { 577 g_simple_async_result_set_from_error (result, error); 578 g_error_free (error); 579 } 580} 581 582typedef struct { 583 584 FlushData fdata; 585 586 gsize count; 587 const void *buffer; 588 589} WriteData; 590 591static void 592free_write_data (gpointer data) 593{ 594 g_slice_free (WriteData, data); 595} 596 597static void 598g_buffered_output_stream_write_async (GOutputStream *stream, 599 const void *buffer, 600 gsize count, 601 int io_priority, 602 GCancellable *cancellable, 603 GAsyncReadyCallback callback, 604 gpointer data) 605{ 606 GBufferedOutputStream *buffered_stream; 607 GBufferedOutputStreamPrivate *priv; 608 GSimpleAsyncResult *res; 609 WriteData *wdata; 610 611 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 612 priv = buffered_stream->priv; 613 614 wdata = g_slice_new (WriteData); 615 wdata->count = count; 616 wdata->buffer = buffer; 617 618 res = g_simple_async_result_new (G_OBJECT (stream), 619 callback, 620 data, 621 g_buffered_output_stream_write_async); 622 623 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data); 624 625 /* if we have space left directly call the 626 * callback (from idle) otherwise schedule a buffer 627 * flush in the thread. In both cases the actual 628 * copying of the data to the buffer will be done in 629 * the write_finish () func since that should 630 * be fast enough */ 631 if (priv->len - priv->pos > 0) 632 { 633 g_simple_async_result_complete_in_idle (res); 634 } 635 else 636 { 637 wdata->fdata.flush_stream = FALSE; 638 wdata->fdata.close_stream = FALSE; 639 g_simple_async_result_run_in_thread (res, 640 flush_buffer_thread, 641 io_priority, 642 cancellable); 643 g_object_unref (res); 644 } 645} 646 647static gssize 648g_buffered_output_stream_write_finish (GOutputStream *stream, 649 GAsyncResult *result, 650 GError **error) 651{ 652 GBufferedOutputStreamPrivate *priv; 653 GBufferedOutputStream *buffered_stream; 654 GSimpleAsyncResult *simple; 655 WriteData *wdata; 656 gssize count; 657 658 simple = G_SIMPLE_ASYNC_RESULT (result); 659 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 660 priv = buffered_stream->priv; 661 662 g_assert (g_simple_async_result_get_source_tag (simple) == 663 g_buffered_output_stream_write_async); 664 665 wdata = g_simple_async_result_get_op_res_gpointer (simple); 666 667 /* Now do the real copying of data to the buffer */ 668 count = priv->len - priv->pos; 669 count = MIN (wdata->count, count); 670 671 memcpy (priv->buffer + priv->pos, wdata->buffer, count); 672 673 priv->pos += count; 674 675 return count; 676} 677 678static void 679g_buffered_output_stream_flush_async (GOutputStream *stream, 680 int io_priority, 681 GCancellable *cancellable, 682 GAsyncReadyCallback callback, 683 gpointer data) 684{ 685 GSimpleAsyncResult *res; 686 FlushData *fdata; 687 688 fdata = g_slice_new (FlushData); 689 fdata->flush_stream = TRUE; 690 fdata->close_stream = FALSE; 691 692 res = g_simple_async_result_new (G_OBJECT (stream), 693 callback, 694 data, 695 g_buffered_output_stream_flush_async); 696 697 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 698 699 g_simple_async_result_run_in_thread (res, 700 flush_buffer_thread, 701 io_priority, 702 cancellable); 703 g_object_unref (res); 704} 705 706static gboolean 707g_buffered_output_stream_flush_finish (GOutputStream *stream, 708 GAsyncResult *result, 709 GError **error) 710{ 711 GSimpleAsyncResult *simple; 712 713 simple = G_SIMPLE_ASYNC_RESULT (result); 714 715 g_assert (g_simple_async_result_get_source_tag (simple) == 716 g_buffered_output_stream_flush_async); 717 718 return TRUE; 719} 720 721static void 722g_buffered_output_stream_close_async (GOutputStream *stream, 723 int io_priority, 724 GCancellable *cancellable, 725 GAsyncReadyCallback callback, 726 gpointer data) 727{ 728 GSimpleAsyncResult *res; 729 FlushData *fdata; 730 731 fdata = g_slice_new (FlushData); 732 fdata->close_stream = TRUE; 733 734 res = g_simple_async_result_new (G_OBJECT (stream), 735 callback, 736 data, 737 g_buffered_output_stream_close_async); 738 739 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 740 741 g_simple_async_result_run_in_thread (res, 742 flush_buffer_thread, 743 io_priority, 744 cancellable); 745 g_object_unref (res); 746} 747 748static gboolean 749g_buffered_output_stream_close_finish (GOutputStream *stream, 750 GAsyncResult *result, 751 GError **error) 752{ 753 GSimpleAsyncResult *simple; 754 755 simple = G_SIMPLE_ASYNC_RESULT (result); 756 757 g_assert (g_simple_async_result_get_source_tag (simple) == 758 g_buffered_output_stream_flush_async); 759 760 return TRUE; 761} 762 763#define __G_BUFFERED_OUTPUT_STREAM_C__ 764#include "gioaliasdef.c" 765