gbufferedinputstream.c revision b22aa6dde67cb0517f03b375959d379bec06f460
1/* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * Copyright (C) 2007 Jürg Billeter 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General 17 * Public License along with this library; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 19 * Boston, MA 02111-1307, USA. 20 * 21 * Author: Christian Kellner <gicmo@gnome.org> 22 */ 23 24#include <config.h> 25#include "gbufferedinputstream.h" 26#include "ginputstream.h" 27#include "gsimpleasyncresult.h" 28#include <string.h> 29#include "glibintl.h" 30 31#include "gioalias.h" 32 33/** 34 * SECTION:gbufferedinputstream 35 * @short_description: Buffered Input Stream 36 * @see_also: #GFilterInputStream, #GInputStream 37 * 38 * Buffered input stream implements #GFilterInputStream and provides 39 * for buffered reads. 40 * 41 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes. 42 * 43 * To create a buffered input stream, use g_buffered_input_stream_new(), 44 * or g_buffered_input_stream_new_sized() to specify the buffer's size at 45 * construction. 46 * 47 * To get the size of a buffer within a buffered input stream, use 48 * g_buffered_input_stream_get_buffer_size(). To change the size of a 49 * buffered input stream's buffer, use 50 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 51 * cannot be reduced below the size of the data within the buffer. 52 * 53 **/ 54 55 56 57#define DEFAULT_BUFFER_SIZE 4096 58 59struct _GBufferedInputStreamPrivate { 60 guint8 *buffer; 61 gsize len; 62 gsize pos; 63 gsize end; 64 GAsyncReadyCallback outstanding_callback; 65}; 66 67enum { 68 PROP_0, 69 PROP_BUFSIZE 70}; 71 72static void g_buffered_input_stream_set_property (GObject *object, 73 guint prop_id, 74 const GValue *value, 75 GParamSpec *pspec); 76 77static void g_buffered_input_stream_get_property (GObject *object, 78 guint prop_id, 79 GValue *value, 80 GParamSpec *pspec); 81static void g_buffered_input_stream_finalize (GObject *object); 82 83 84static gssize g_buffered_input_stream_skip (GInputStream *stream, 85 gsize count, 86 GCancellable *cancellable, 87 GError **error); 88static void g_buffered_input_stream_skip_async (GInputStream *stream, 89 gsize count, 90 int io_priority, 91 GCancellable *cancellable, 92 GAsyncReadyCallback callback, 93 gpointer user_data); 94static gssize g_buffered_input_stream_skip_finish (GInputStream *stream, 95 GAsyncResult *result, 96 GError **error); 97static gssize g_buffered_input_stream_read (GInputStream *stream, 98 void *buffer, 99 gsize count, 100 GCancellable *cancellable, 101 GError **error); 102static void g_buffered_input_stream_read_async (GInputStream *stream, 103 void *buffer, 104 gsize count, 105 int io_priority, 106 GCancellable *cancellable, 107 GAsyncReadyCallback callback, 108 gpointer user_data); 109static gssize g_buffered_input_stream_read_finish (GInputStream *stream, 110 GAsyncResult *result, 111 GError **error); 112static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 113 gssize count, 114 GCancellable *cancellable, 115 GError **error); 116static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 117 gssize count, 118 int io_priority, 119 GCancellable *cancellable, 120 GAsyncReadyCallback callback, 121 gpointer user_data); 122static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 123 GAsyncResult *result, 124 GError **error); 125 126static void compact_buffer (GBufferedInputStream *stream); 127 128G_DEFINE_TYPE (GBufferedInputStream, 129 g_buffered_input_stream, 130 G_TYPE_FILTER_INPUT_STREAM) 131 132 133static void 134g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass) 135{ 136 GObjectClass *object_class; 137 GInputStreamClass *istream_class; 138 GBufferedInputStreamClass *bstream_class; 139 140 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate)); 141 142 object_class = G_OBJECT_CLASS (klass); 143 object_class->get_property = g_buffered_input_stream_get_property; 144 object_class->set_property = g_buffered_input_stream_set_property; 145 object_class->finalize = g_buffered_input_stream_finalize; 146 147 istream_class = G_INPUT_STREAM_CLASS (klass); 148 istream_class->skip = g_buffered_input_stream_skip; 149 istream_class->skip_async = g_buffered_input_stream_skip_async; 150 istream_class->skip_finish = g_buffered_input_stream_skip_finish; 151 istream_class->read = g_buffered_input_stream_read; 152 istream_class->read_async = g_buffered_input_stream_read_async; 153 istream_class->read_finish = g_buffered_input_stream_read_finish; 154 155 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass); 156 bstream_class->fill = g_buffered_input_stream_real_fill; 157 bstream_class->fill_async = g_buffered_input_stream_real_fill_async; 158 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish; 159 160 g_object_class_install_property (object_class, 161 PROP_BUFSIZE, 162 g_param_spec_uint ("buffer-size", 163 P_("Buffer Size"), 164 P_("The size of the backend buffer"), 165 1, 166 G_MAXUINT, 167 DEFAULT_BUFFER_SIZE, 168 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | 169 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 170 171 172} 173 174/** 175 * g_buffered_input_stream_get_buffer_size: 176 * @stream: #GBufferedInputStream. 177 * 178 * Gets the size of the input buffer. 179 * 180 * Returns: the current buffer size, or %-1 on error. 181 **/ 182gsize 183g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream) 184{ 185 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 186 187 return stream->priv->len; 188} 189 190/** 191 * g_buffered_input_stream_set_buffer_size: 192 * @stream: #GBufferedInputStream. 193 * @size: a #gsize. 194 * 195 * Sets the size of the internal buffer of @stream to @size, or to the 196 * size of the contents of the buffer. The buffer can never be resized 197 * smaller than its current contents. 198 **/ 199void 200g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream, 201 gsize size) 202{ 203 GBufferedInputStreamPrivate *priv; 204 gsize in_buffer; 205 guint8 *buffer; 206 207 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 208 209 priv = stream->priv; 210 211 if (priv->len == size) 212 return; 213 214 if (priv->buffer) 215 { 216 in_buffer = priv->end - priv->pos; 217 218 /* Never resize smaller than current buffer contents */ 219 size = MAX (size, in_buffer); 220 221 buffer = g_malloc (size); 222 memcpy (buffer, priv->buffer + priv->pos, in_buffer); 223 priv->len = size; 224 priv->pos = 0; 225 priv->end = in_buffer; 226 g_free (priv->buffer); 227 priv->buffer = buffer; 228 } 229 else 230 { 231 priv->len = size; 232 priv->pos = 0; 233 priv->end = 0; 234 priv->buffer = g_malloc (size); 235 } 236 237 g_object_notify (G_OBJECT (stream), "buffer-size"); 238} 239 240static void 241g_buffered_input_stream_set_property (GObject *object, 242 guint prop_id, 243 const GValue *value, 244 GParamSpec *pspec) 245{ 246 GBufferedInputStreamPrivate *priv; 247 GBufferedInputStream *bstream; 248 249 bstream = G_BUFFERED_INPUT_STREAM (object); 250 priv = bstream->priv; 251 252 switch (prop_id) 253 { 254 case PROP_BUFSIZE: 255 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value)); 256 break; 257 258 default: 259 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 260 break; 261 } 262 263} 264 265static void 266g_buffered_input_stream_get_property (GObject *object, 267 guint prop_id, 268 GValue *value, 269 GParamSpec *pspec) 270{ 271 GBufferedInputStreamPrivate *priv; 272 GBufferedInputStream *bstream; 273 274 bstream = G_BUFFERED_INPUT_STREAM (object); 275 priv = bstream->priv; 276 277 switch (prop_id) 278 { 279 case PROP_BUFSIZE: 280 g_value_set_uint (value, priv->len); 281 break; 282 283 default: 284 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 285 break; 286 } 287} 288 289static void 290g_buffered_input_stream_finalize (GObject *object) 291{ 292 GBufferedInputStreamPrivate *priv; 293 GBufferedInputStream *stream; 294 295 stream = G_BUFFERED_INPUT_STREAM (object); 296 priv = stream->priv; 297 298 g_free (priv->buffer); 299 300 if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) 301 (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object); 302} 303 304static void 305g_buffered_input_stream_init (GBufferedInputStream *stream) 306{ 307 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 308 G_TYPE_BUFFERED_INPUT_STREAM, 309 GBufferedInputStreamPrivate); 310} 311 312 313/** 314 * g_buffered_input_stream_new: 315 * @base_stream: a #GInputStream. 316 * 317 * Creates a new #GInputStream from the given @base_stream, with 318 * a buffer set to the default size (4 kilobytes). 319 * 320 * Returns: a #GInputStream for the given @base_stream. 321 **/ 322GInputStream * 323g_buffered_input_stream_new (GInputStream *base_stream) 324{ 325 GInputStream *stream; 326 327 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 328 329 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 330 "base-stream", base_stream, 331 NULL); 332 333 return stream; 334} 335 336/** 337 * g_buffered_input_stream_new_sized: 338 * @base_stream: a #GOutputStream. 339 * @size: a #gsize. 340 * 341 * Creates a new #GBufferedInputStream from the given @base_stream, 342 * with a buffer set to @size. 343 * 344 * Returns: a #GInputStream. 345 **/ 346GInputStream * 347g_buffered_input_stream_new_sized (GInputStream *base_stream, 348 gsize size) 349{ 350 GInputStream *stream; 351 352 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 353 354 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 355 "base-stream", base_stream, 356 "buffer-size", (guint)size, 357 NULL); 358 359 return stream; 360} 361 362/** 363 * g_buffered_input_stream_fill: 364 * @stream: #GBufferedInputStream. 365 * @count: the number of bytes that will be read from the stream. 366 * @cancellable: optional #GCancellable object, %NULL to ignore. 367 * @error: location to store the error occuring, or %NULL to ignore. 368 * 369 * Tries to read @count bytes from the stream into the buffer. 370 * Will block during this read. 371 * 372 * If @count is zero, returns zero and does nothing. A value of @count 373 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 374 * 375 * On success, the number of bytes read into the buffer is returned. 376 * It is not an error if this is not the same as the requested size, as it 377 * can happen e.g. near the end of a file. Zero is returned on end of file 378 * (or if @count is zero), but never otherwise. 379 * 380 * If @cancellable is not %NULL, then the operation can be cancelled by 381 * triggering the cancellable object from another thread. If the operation 382 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 383 * operation was partially finished when the operation was cancelled the 384 * partial result will be returned, without an error. 385 * 386 * On error -1 is returned and @error is set accordingly. 387 * 388 * For the asynchronous, non-blocking, version of this function, see 389 * g_buffered_input_stream_fill_async(). 390 * 391 * Returns: the number of bytes read into @stream's buffer, up to @count, 392 * or -1 on error. 393 **/ 394gssize 395g_buffered_input_stream_fill (GBufferedInputStream *stream, 396 gssize count, 397 GCancellable *cancellable, 398 GError **error) 399{ 400 GBufferedInputStreamClass *class; 401 GInputStream *input_stream; 402 gssize res; 403 404 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 405 406 input_stream = G_INPUT_STREAM (stream); 407 408 if (g_input_stream_is_closed (input_stream)) 409 { 410 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, 411 _("Stream is already closed")); 412 return -1; 413 } 414 415 if (g_input_stream_has_pending (input_stream)) 416 { 417 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING, 418 _("Stream has outstanding operation")); 419 return -1; 420 } 421 422 g_input_stream_set_pending (input_stream, TRUE); 423 424 if (cancellable) 425 g_push_current_cancellable (cancellable); 426 427 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 428 res = class->fill (stream, count, cancellable, error); 429 430 if (cancellable) 431 g_pop_current_cancellable (cancellable); 432 433 g_input_stream_set_pending (input_stream, FALSE); 434 435 return res; 436} 437 438static void 439async_fill_callback_wrapper (GObject *source_object, 440 GAsyncResult *res, 441 gpointer user_data) 442{ 443 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object); 444 445 g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE); 446 (*stream->priv->outstanding_callback) (source_object, res, user_data); 447 g_object_unref (stream); 448} 449 450/** 451 * g_buffered_input_stream_fill_async: 452 * @stream: #GBufferedInputStream. 453 * @count: a #gssize. 454 * @io_priority: the <link linkend="io-priority">I/O priority</link> 455 * of the request. 456 * @cancellable: optional #GCancellable object 457 * @callback: a #GAsyncReadyCallback. 458 * @user_data: a #gpointer. 459 * 460 * Reads data into @stream's buffer asynchronously, up to @count size. 461 * @io_priority can be used to prioritize reads. For the synchronous 462 * version of this function, see g_buffered_input_stream_fill(). 463 **/ 464void 465g_buffered_input_stream_fill_async (GBufferedInputStream *stream, 466 gssize count, 467 int io_priority, 468 GCancellable *cancellable, 469 GAsyncReadyCallback callback, 470 gpointer user_data) 471{ 472 GBufferedInputStreamClass *class; 473 GSimpleAsyncResult *simple; 474 475 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 476 477 if (count == 0) 478 { 479 simple = g_simple_async_result_new (G_OBJECT (stream), 480 callback, 481 user_data, 482 g_buffered_input_stream_fill_async); 483 g_simple_async_result_complete_in_idle (simple); 484 g_object_unref (simple); 485 return; 486 } 487 488 if (((gssize) count) < 0) 489 { 490 g_simple_async_report_error_in_idle (G_OBJECT (stream), 491 callback, 492 user_data, 493 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 494 _("Too large count value passed to g_input_stream_read_async")); 495 return; 496 } 497 498 if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) 499 { 500 g_simple_async_report_error_in_idle (G_OBJECT (stream), 501 callback, 502 user_data, 503 G_IO_ERROR, G_IO_ERROR_CLOSED, 504 _("Stream is already closed")); 505 return; 506 } 507 508 if (g_input_stream_has_pending (G_INPUT_STREAM (stream))) 509 { 510 g_simple_async_report_error_in_idle (G_OBJECT (stream), 511 callback, 512 user_data, 513 G_IO_ERROR, G_IO_ERROR_PENDING, 514 _("Stream has outstanding operation")); 515 return; 516 } 517 518 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 519 520 g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE); 521 stream->priv->outstanding_callback = callback; 522 g_object_ref (stream); 523 class->fill_async (stream, count, io_priority, cancellable, 524 async_fill_callback_wrapper, user_data); 525} 526 527/** 528 * g_buffered_input_stream_fill_finish: 529 * @stream: a #GBufferedInputStream. 530 * @result: a #GAsyncResult. 531 * @error: a #GError. 532 * 533 * Finishes an asynchronous read. 534 * 535 * Returns: a #gssize of the read stream, or %-1 on an error. 536 **/ 537gssize 538g_buffered_input_stream_fill_finish (GBufferedInputStream *stream, 539 GAsyncResult *result, 540 GError **error) 541{ 542 GSimpleAsyncResult *simple; 543 GBufferedInputStreamClass *class; 544 545 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 546 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); 547 548 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 549 { 550 simple = G_SIMPLE_ASYNC_RESULT (result); 551 if (g_simple_async_result_propagate_error (simple, error)) 552 return -1; 553 554 /* Special case read of 0 bytes */ 555 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async) 556 return 0; 557 } 558 559 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 560 return class->fill_finish (stream, result, error); 561} 562 563/** 564 * g_buffered_input_stream_get_available: 565 * @stream: #GBufferedInputStream. 566 * 567 * Gets the size of the available data within the stream. 568 * 569 * Returns: size of the available stream. 570 **/ 571gsize 572g_buffered_input_stream_get_available (GBufferedInputStream *stream) 573{ 574 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 575 576 return stream->priv->end - stream->priv->pos; 577} 578 579/** 580 * g_buffered_input_stream_peek: 581 * @stream: a #GBufferedInputStream. 582 * @buffer: a pointer to an allocated chunk of memory. 583 * @offset: a #gsize. 584 * @count: a #gsize. 585 * 586 * Peeks in the buffer, copying data of size @count into @buffer, 587 * offset @offset bytes. 588 * 589 * Returns: a #gsize of the number of bytes peeked, or %-1 on error. 590 **/ 591gsize 592g_buffered_input_stream_peek (GBufferedInputStream *stream, 593 void *buffer, 594 gsize offset, 595 gsize count) 596{ 597 gsize available; 598 gsize end; 599 600 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 601 g_return_val_if_fail (buffer != NULL, -1); 602 603 available = g_buffered_input_stream_get_available (stream); 604 605 if (offset > available) 606 return 0; 607 608 end = MIN (offset + count, available); 609 count = end - offset; 610 611 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count); 612 return count; 613} 614 615/** 616 * g_buffered_input_stream_peek_buffer: 617 * @stream: a #GBufferedInputStream. 618 * @count: a #gsize to get the number of bytes available in the buffer. 619 * 620 * Returns the buffer with the currently available bytes. The returned 621 * buffer must not be modified and will become invalid when reading from 622 * the stream or filling the buffer. 623 * 624 * Returns: read-only buffer 625 **/ 626const void* 627g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream, 628 gsize *count) 629{ 630 GBufferedInputStreamPrivate *priv; 631 632 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL); 633 634 priv = stream->priv; 635 636 if (count) 637 *count = priv->end - priv->pos; 638 639 return priv->buffer + priv->pos; 640} 641 642static void 643compact_buffer (GBufferedInputStream *stream) 644{ 645 GBufferedInputStreamPrivate *priv; 646 gsize current_size; 647 648 priv = stream->priv; 649 650 current_size = priv->end - priv->pos; 651 652 g_memmove (priv->buffer, priv->buffer + priv->pos, current_size); 653 654 priv->pos = 0; 655 priv->end = current_size; 656} 657 658static gssize 659g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 660 gssize count, 661 GCancellable *cancellable, 662 GError **error) 663{ 664 GBufferedInputStreamPrivate *priv; 665 GInputStream *base_stream; 666 gssize nread; 667 gsize in_buffer; 668 669 priv = stream->priv; 670 671 if (count == -1) 672 count = priv->len; 673 674 in_buffer = priv->end - priv->pos; 675 676 /* Never fill more than can fit in the buffer */ 677 count = MIN (count, priv->len - in_buffer); 678 679 /* If requested length does not fit at end, compact */ 680 if (priv->len - priv->end < count) 681 compact_buffer (stream); 682 683 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 684 nread = g_input_stream_read (base_stream, 685 priv->buffer + priv->end, 686 count, 687 cancellable, 688 error); 689 690 if (nread > 0) 691 priv->end += nread; 692 693 return nread; 694} 695 696static gssize 697g_buffered_input_stream_skip (GInputStream *stream, 698 gsize count, 699 GCancellable *cancellable, 700 GError **error) 701{ 702 GBufferedInputStream *bstream; 703 GBufferedInputStreamPrivate *priv; 704 GBufferedInputStreamClass *class; 705 GInputStream *base_stream; 706 gsize available, bytes_skipped; 707 gssize nread; 708 709 bstream = G_BUFFERED_INPUT_STREAM (stream); 710 priv = bstream->priv; 711 712 available = priv->end - priv->pos; 713 714 if (count <= available) 715 { 716 priv->pos += count; 717 return count; 718 } 719 720 /* Full request not available, skip all currently available and 721 * request refill for more 722 */ 723 724 priv->pos = 0; 725 priv->end = 0; 726 bytes_skipped = available; 727 count -= available; 728 729 if (bytes_skipped > 0) 730 error = NULL; /* Ignore further errors if we already read some data */ 731 732 if (count > priv->len) 733 { 734 /* Large request, shortcut buffer */ 735 736 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 737 738 nread = g_input_stream_skip (base_stream, 739 count, 740 cancellable, 741 error); 742 743 if (nread < 0 && bytes_skipped == 0) 744 return -1; 745 746 if (nread > 0) 747 bytes_skipped += nread; 748 749 return bytes_skipped; 750 } 751 752 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 753 nread = class->fill (bstream, priv->len, cancellable, error); 754 755 if (nread < 0) 756 { 757 if (bytes_skipped == 0) 758 return -1; 759 else 760 return bytes_skipped; 761 } 762 763 available = priv->end - priv->pos; 764 count = MIN (count, available); 765 766 bytes_skipped += count; 767 priv->pos += count; 768 769 return bytes_skipped; 770} 771 772static gssize 773g_buffered_input_stream_read (GInputStream *stream, 774 void *buffer, 775 gsize count, 776 GCancellable *cancellable, 777 GError **error) 778{ 779 GBufferedInputStream *bstream; 780 GBufferedInputStreamPrivate *priv; 781 GBufferedInputStreamClass *class; 782 GInputStream *base_stream; 783 gsize available, bytes_read; 784 gssize nread; 785 786 bstream = G_BUFFERED_INPUT_STREAM (stream); 787 priv = bstream->priv; 788 789 available = priv->end - priv->pos; 790 791 if (count <= available) 792 { 793 memcpy (buffer, priv->buffer + priv->pos, count); 794 priv->pos += count; 795 return count; 796 } 797 798 /* Full request not available, read all currently availbile and request refill for more */ 799 800 memcpy (buffer, priv->buffer + priv->pos, available); 801 priv->pos = 0; 802 priv->end = 0; 803 bytes_read = available; 804 count -= available; 805 806 if (bytes_read > 0) 807 error = NULL; /* Ignore further errors if we already read some data */ 808 809 if (count > priv->len) 810 { 811 /* Large request, shortcut buffer */ 812 813 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 814 815 nread = g_input_stream_read (base_stream, 816 (char *)buffer + bytes_read, 817 count, 818 cancellable, 819 error); 820 821 if (nread < 0 && bytes_read == 0) 822 return -1; 823 824 if (nread > 0) 825 bytes_read += nread; 826 827 return bytes_read; 828 } 829 830 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 831 nread = class->fill (bstream, priv->len, cancellable, error); 832 if (nread < 0) 833 { 834 if (bytes_read == 0) 835 return -1; 836 else 837 return bytes_read; 838 } 839 840 available = priv->end - priv->pos; 841 count = MIN (count, available); 842 843 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count); 844 bytes_read += count; 845 priv->pos += count; 846 847 return bytes_read; 848} 849 850/** 851 * g_buffered_input_stream_read_byte: 852 * @stream: #GBufferedInputStream. 853 * @cancellable: optional #GCancellable object, %NULL to ignore. 854 * @error: location to store the error occuring, or %NULL to ignore. 855 * 856 * Tries to read a single byte from the stream or the buffer. Will block 857 * during this read. 858 * 859 * On success, the byte read from the stream is returned. On end of stream 860 * -1 is returned but it's not an exceptional error and @error is not set. 861 * 862 * If @cancellable is not %NULL, then the operation can be cancelled by 863 * triggering the cancellable object from another thread. If the operation 864 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 865 * operation was partially finished when the operation was cancelled the 866 * partial result will be returned, without an error. 867 * 868 * On error -1 is returned and @error is set accordingly. 869 * 870 * Returns: the byte read from the @stream, or -1 on end of stream or error. 871 **/ 872int 873g_buffered_input_stream_read_byte (GBufferedInputStream *stream, 874 GCancellable *cancellable, 875 GError **error) 876{ 877 GBufferedInputStreamPrivate *priv; 878 GBufferedInputStreamClass *class; 879 GInputStream *input_stream; 880 gsize available; 881 gssize nread; 882 883 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 884 885 priv = stream->priv; 886 input_stream = G_INPUT_STREAM (stream); 887 888 if (g_input_stream_is_closed (input_stream)) 889 { 890 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, 891 _("Stream is already closed")); 892 return -1; 893 } 894 895 if (g_input_stream_has_pending (input_stream)) 896 { 897 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING, 898 _("Stream has outstanding operation")); 899 return -1; 900 } 901 902 available = priv->end - priv->pos; 903 904 if (available < 1) 905 return priv->buffer[priv->pos++]; 906 907 /* Byte not available, request refill for more */ 908 909 g_input_stream_set_pending (input_stream, TRUE); 910 911 if (cancellable) 912 g_push_current_cancellable (cancellable); 913 914 priv->pos = 0; 915 priv->end = 0; 916 917 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 918 nread = class->fill (stream, priv->len, cancellable, error); 919 920 if (cancellable) 921 g_pop_current_cancellable (cancellable); 922 923 g_input_stream_set_pending (input_stream, FALSE); 924 925 if (nread <= 0) 926 return -1; /* error or end of stream */ 927 928 return priv->buffer[priv->pos++]; 929} 930 931/* ************************** */ 932/* Async stuff implementation */ 933/* ************************** */ 934 935static void 936fill_async_callback (GObject *source_object, 937 GAsyncResult *result, 938 gpointer user_data) 939{ 940 GError *error; 941 gssize res; 942 GSimpleAsyncResult *simple; 943 944 simple = user_data; 945 946 error = NULL; 947 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 948 result, &error); 949 950 g_simple_async_result_set_op_res_gssize (simple, res); 951 if (res == -1) 952 { 953 g_simple_async_result_set_from_error (simple, error); 954 g_error_free (error); 955 } 956 957 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 958 g_simple_async_result_complete (simple); 959 g_object_unref (simple); 960} 961 962static void 963g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 964 gssize count, 965 int io_priority, 966 GCancellable *cancellable, 967 GAsyncReadyCallback callback, 968 gpointer user_data) 969{ 970 GBufferedInputStreamPrivate *priv; 971 GInputStream *base_stream; 972 GSimpleAsyncResult *simple; 973 gsize in_buffer; 974 975 priv = stream->priv; 976 977 if (count == -1) 978 count = priv->len; 979 980 in_buffer = priv->end - priv->pos; 981 982 /* Never fill more than can fit in the buffer */ 983 count = MIN (count, priv->len - in_buffer); 984 985 /* If requested length does not fit at end, compact */ 986 if (priv->len - priv->end < count) 987 compact_buffer (stream); 988 989 simple = g_simple_async_result_new (G_OBJECT (stream), 990 callback, user_data, 991 g_buffered_input_stream_real_fill_async); 992 993 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 994 g_input_stream_read_async (base_stream, 995 priv->buffer + priv->end, 996 count, 997 io_priority, 998 cancellable, 999 fill_async_callback, 1000 simple); 1001} 1002 1003static gssize 1004g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 1005 GAsyncResult *result, 1006 GError **error) 1007{ 1008 GSimpleAsyncResult *simple; 1009 gssize nread; 1010 1011 simple = G_SIMPLE_ASYNC_RESULT (result); 1012 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async); 1013 1014 nread = g_simple_async_result_get_op_res_gssize (simple); 1015 return nread; 1016} 1017 1018typedef struct { 1019 gssize bytes_read; 1020 gssize count; 1021 void *buffer; 1022} ReadAsyncData; 1023 1024static void 1025free_read_async_data (gpointer _data) 1026{ 1027 ReadAsyncData *data = _data; 1028 g_slice_free (ReadAsyncData, data); 1029} 1030 1031static void 1032large_read_callback (GObject *source_object, 1033 GAsyncResult *result, 1034 gpointer user_data) 1035{ 1036 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1037 ReadAsyncData *data; 1038 GError *error; 1039 gssize nread; 1040 1041 data = g_simple_async_result_get_op_res_gpointer (simple); 1042 1043 error = NULL; 1044 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 1045 result, &error); 1046 1047 /* Only report the error if we've not already read some data */ 1048 if (nread < 0 && data->bytes_read == 0) 1049 g_simple_async_result_set_from_error (simple, error); 1050 1051 if (nread > 0) 1052 data->bytes_read += nread; 1053 1054 if (error) 1055 g_error_free (error); 1056 1057 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1058 g_simple_async_result_complete (simple); 1059 g_object_unref (simple); 1060} 1061 1062static void 1063read_fill_buffer_callback (GObject *source_object, 1064 GAsyncResult *result, 1065 gpointer user_data) 1066{ 1067 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1068 GBufferedInputStream *bstream; 1069 GBufferedInputStreamPrivate *priv; 1070 ReadAsyncData *data; 1071 GError *error; 1072 gssize nread; 1073 gsize available; 1074 1075 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1076 priv = bstream->priv; 1077 1078 data = g_simple_async_result_get_op_res_gpointer (simple); 1079 1080 error = NULL; 1081 nread = g_buffered_input_stream_fill_finish (bstream, 1082 result, &error); 1083 1084 if (nread < 0 && data->bytes_read == 0) 1085 g_simple_async_result_set_from_error (simple, error); 1086 1087 1088 if (nread > 0) 1089 { 1090 available = priv->end - priv->pos; 1091 data->count = MIN (data->count, available); 1092 1093 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count); 1094 data->bytes_read += data->count; 1095 priv->pos += data->count; 1096 } 1097 1098 if (error) 1099 g_error_free (error); 1100 1101 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1102 g_simple_async_result_complete (simple); 1103 g_object_unref (simple); 1104} 1105 1106static void 1107g_buffered_input_stream_read_async (GInputStream *stream, 1108 void *buffer, 1109 gsize count, 1110 int io_priority, 1111 GCancellable *cancellable, 1112 GAsyncReadyCallback callback, 1113 gpointer user_data) 1114{ 1115 GBufferedInputStream *bstream; 1116 GBufferedInputStreamPrivate *priv; 1117 GBufferedInputStreamClass *class; 1118 GInputStream *base_stream; 1119 gsize available; 1120 GSimpleAsyncResult *simple; 1121 ReadAsyncData *data; 1122 1123 bstream = G_BUFFERED_INPUT_STREAM (stream); 1124 priv = bstream->priv; 1125 1126 data = g_slice_new (ReadAsyncData); 1127 data->buffer = buffer; 1128 data->bytes_read = 0; 1129 simple = g_simple_async_result_new (G_OBJECT (stream), 1130 callback, user_data, 1131 g_buffered_input_stream_read_async); 1132 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data); 1133 1134 available = priv->end - priv->pos; 1135 1136 if (count <= available) 1137 { 1138 memcpy (buffer, priv->buffer + priv->pos, count); 1139 priv->pos += count; 1140 data->bytes_read = count; 1141 1142 g_simple_async_result_complete_in_idle (simple); 1143 g_object_unref (simple); 1144 return; 1145 } 1146 1147 1148 /* Full request not available, read all currently availbile and request refill for more */ 1149 1150 memcpy (buffer, priv->buffer + priv->pos, available); 1151 priv->pos = 0; 1152 priv->end = 0; 1153 1154 count -= available; 1155 1156 data->bytes_read = available; 1157 data->count = count; 1158 1159 if (count > priv->len) 1160 { 1161 /* Large request, shortcut buffer */ 1162 1163 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1164 1165 g_input_stream_read_async (base_stream, 1166 (char *)buffer + data->bytes_read, 1167 count, 1168 io_priority, cancellable, 1169 large_read_callback, 1170 simple); 1171 } 1172 else 1173 { 1174 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1175 class->fill_async (bstream, priv->len, io_priority, cancellable, 1176 read_fill_buffer_callback, simple); 1177 } 1178} 1179 1180static gssize 1181g_buffered_input_stream_read_finish (GInputStream *stream, 1182 GAsyncResult *result, 1183 GError **error) 1184{ 1185 GSimpleAsyncResult *simple; 1186 ReadAsyncData *data; 1187 1188 simple = G_SIMPLE_ASYNC_RESULT (result); 1189 1190 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async); 1191 1192 data = g_simple_async_result_get_op_res_gpointer (simple); 1193 1194 return data->bytes_read; 1195} 1196 1197typedef struct { 1198 gssize bytes_skipped; 1199 gssize count; 1200} SkipAsyncData; 1201 1202static void 1203free_skip_async_data (gpointer _data) 1204{ 1205 SkipAsyncData *data = _data; 1206 g_slice_free (SkipAsyncData, data); 1207} 1208 1209static void 1210large_skip_callback (GObject *source_object, 1211 GAsyncResult *result, 1212 gpointer user_data) 1213{ 1214 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1215 SkipAsyncData *data; 1216 GError *error; 1217 gssize nread; 1218 1219 data = g_simple_async_result_get_op_res_gpointer (simple); 1220 1221 error = NULL; 1222 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object), 1223 result, &error); 1224 1225 /* Only report the error if we've not already read some data */ 1226 if (nread < 0 && data->bytes_skipped == 0) 1227 g_simple_async_result_set_from_error (simple, error); 1228 1229 if (nread > 0) 1230 data->bytes_skipped += nread; 1231 1232 if (error) 1233 g_error_free (error); 1234 1235 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1236 g_simple_async_result_complete (simple); 1237 g_object_unref (simple); 1238} 1239 1240static void 1241skip_fill_buffer_callback (GObject *source_object, 1242 GAsyncResult *result, 1243 gpointer user_data) 1244{ 1245 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1246 GBufferedInputStream *bstream; 1247 GBufferedInputStreamPrivate *priv; 1248 SkipAsyncData *data; 1249 GError *error; 1250 gssize nread; 1251 gsize available; 1252 1253 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1254 priv = bstream->priv; 1255 1256 data = g_simple_async_result_get_op_res_gpointer (simple); 1257 1258 error = NULL; 1259 nread = g_buffered_input_stream_fill_finish (bstream, 1260 result, &error); 1261 1262 if (nread < 0 && data->bytes_skipped == 0) 1263 g_simple_async_result_set_from_error (simple, error); 1264 1265 1266 if (nread > 0) 1267 { 1268 available = priv->end - priv->pos; 1269 data->count = MIN (data->count, available); 1270 1271 data->bytes_skipped += data->count; 1272 priv->pos += data->count; 1273 } 1274 1275 if (error) 1276 g_error_free (error); 1277 1278 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1279 g_simple_async_result_complete (simple); 1280 g_object_unref (simple); 1281} 1282 1283static void 1284g_buffered_input_stream_skip_async (GInputStream *stream, 1285 gsize count, 1286 int io_priority, 1287 GCancellable *cancellable, 1288 GAsyncReadyCallback callback, 1289 gpointer user_data) 1290{ 1291 GBufferedInputStream *bstream; 1292 GBufferedInputStreamPrivate *priv; 1293 GBufferedInputStreamClass *class; 1294 GInputStream *base_stream; 1295 gsize available; 1296 GSimpleAsyncResult *simple; 1297 SkipAsyncData *data; 1298 1299 bstream = G_BUFFERED_INPUT_STREAM (stream); 1300 priv = bstream->priv; 1301 1302 data = g_slice_new (SkipAsyncData); 1303 data->bytes_skipped = 0; 1304 simple = g_simple_async_result_new (G_OBJECT (stream), 1305 callback, user_data, 1306 g_buffered_input_stream_skip_async); 1307 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data); 1308 1309 available = priv->end - priv->pos; 1310 1311 if (count <= available) 1312 { 1313 priv->pos += count; 1314 data->bytes_skipped = count; 1315 1316 g_simple_async_result_complete_in_idle (simple); 1317 g_object_unref (simple); 1318 return; 1319 } 1320 1321 1322 /* Full request not available, skip all currently availbile and request refill for more */ 1323 1324 priv->pos = 0; 1325 priv->end = 0; 1326 1327 count -= available; 1328 1329 data->bytes_skipped = available; 1330 data->count = count; 1331 1332 if (count > priv->len) 1333 { 1334 /* Large request, shortcut buffer */ 1335 1336 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1337 1338 g_input_stream_skip_async (base_stream, 1339 count, 1340 io_priority, cancellable, 1341 large_skip_callback, 1342 simple); 1343 } 1344 else 1345 { 1346 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1347 class->fill_async (bstream, priv->len, io_priority, cancellable, 1348 skip_fill_buffer_callback, simple); 1349 } 1350} 1351 1352static gssize 1353g_buffered_input_stream_skip_finish (GInputStream *stream, 1354 GAsyncResult *result, 1355 GError **error) 1356{ 1357 GSimpleAsyncResult *simple; 1358 SkipAsyncData *data; 1359 1360 simple = G_SIMPLE_ASYNC_RESULT (result); 1361 1362 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async); 1363 1364 data = g_simple_async_result_get_op_res_gpointer (simple); 1365 1366 return data->bytes_skipped; 1367} 1368 1369 1370#define __G_BUFFERED_INPUT_STREAM_C__ 1371#include "gioaliasdef.c" 1372