gbufferedinputstream.c revision 3d93bf6968884d75dd2706ef85e2014305eb92f2
1/* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * Copyright (C) 2007 Jürg Billeter 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General 17 * Public License along with this library; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 19 * Boston, MA 02111-1307, USA. 20 * 21 * Author: Christian Kellner <gicmo@gnome.org> 22 */ 23 24#include "config.h" 25#include "gbufferedinputstream.h" 26#include "ginputstream.h" 27#include "gcancellable.h" 28#include "gasyncresult.h" 29#include "gsimpleasyncresult.h" 30#include "gioerror.h" 31#include <string.h> 32#include "glibintl.h" 33 34#include "gioalias.h" 35 36/** 37 * SECTION:gbufferedinputstream 38 * @short_description: Buffered Input Stream 39 * @include: gio/gio.h 40 * @see_also: #GFilterInputStream, #GInputStream 41 * 42 * Buffered input stream implements #GFilterInputStream and provides 43 * for buffered reads. 44 * 45 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes. 46 * 47 * To create a buffered input stream, use g_buffered_input_stream_new(), 48 * or g_buffered_input_stream_new_sized() to specify the buffer's size at 49 * construction. 50 * 51 * To get the size of a buffer within a buffered input stream, use 52 * g_buffered_input_stream_get_buffer_size(). To change the size of a 53 * buffered input stream's buffer, use 54 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 55 * cannot be reduced below the size of the data within the buffer. 56 * 57 **/ 58 59 60 61#define DEFAULT_BUFFER_SIZE 4096 62 63struct _GBufferedInputStreamPrivate { 64 guint8 *buffer; 65 gsize len; 66 gsize pos; 67 gsize end; 68 GAsyncReadyCallback outstanding_callback; 69}; 70 71enum { 72 PROP_0, 73 PROP_BUFSIZE 74}; 75 76static void g_buffered_input_stream_set_property (GObject *object, 77 guint prop_id, 78 const GValue *value, 79 GParamSpec *pspec); 80 81static void g_buffered_input_stream_get_property (GObject *object, 82 guint prop_id, 83 GValue *value, 84 GParamSpec *pspec); 85static void g_buffered_input_stream_finalize (GObject *object); 86 87 88static gssize g_buffered_input_stream_skip (GInputStream *stream, 89 gsize count, 90 GCancellable *cancellable, 91 GError **error); 92static void g_buffered_input_stream_skip_async (GInputStream *stream, 93 gsize count, 94 int io_priority, 95 GCancellable *cancellable, 96 GAsyncReadyCallback callback, 97 gpointer user_data); 98static gssize g_buffered_input_stream_skip_finish (GInputStream *stream, 99 GAsyncResult *result, 100 GError **error); 101static gssize g_buffered_input_stream_read (GInputStream *stream, 102 void *buffer, 103 gsize count, 104 GCancellable *cancellable, 105 GError **error); 106static void g_buffered_input_stream_read_async (GInputStream *stream, 107 void *buffer, 108 gsize count, 109 int io_priority, 110 GCancellable *cancellable, 111 GAsyncReadyCallback callback, 112 gpointer user_data); 113static gssize g_buffered_input_stream_read_finish (GInputStream *stream, 114 GAsyncResult *result, 115 GError **error); 116static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 117 gssize count, 118 GCancellable *cancellable, 119 GError **error); 120static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 121 gssize count, 122 int io_priority, 123 GCancellable *cancellable, 124 GAsyncReadyCallback callback, 125 gpointer user_data); 126static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 127 GAsyncResult *result, 128 GError **error); 129 130static void compact_buffer (GBufferedInputStream *stream); 131 132G_DEFINE_TYPE (GBufferedInputStream, 133 g_buffered_input_stream, 134 G_TYPE_FILTER_INPUT_STREAM) 135 136 137static void 138g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass) 139{ 140 GObjectClass *object_class; 141 GInputStreamClass *istream_class; 142 GBufferedInputStreamClass *bstream_class; 143 144 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate)); 145 146 object_class = G_OBJECT_CLASS (klass); 147 object_class->get_property = g_buffered_input_stream_get_property; 148 object_class->set_property = g_buffered_input_stream_set_property; 149 object_class->finalize = g_buffered_input_stream_finalize; 150 151 istream_class = G_INPUT_STREAM_CLASS (klass); 152 istream_class->skip = g_buffered_input_stream_skip; 153 istream_class->skip_async = g_buffered_input_stream_skip_async; 154 istream_class->skip_finish = g_buffered_input_stream_skip_finish; 155 istream_class->read_fn = g_buffered_input_stream_read; 156 istream_class->read_async = g_buffered_input_stream_read_async; 157 istream_class->read_finish = g_buffered_input_stream_read_finish; 158 159 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass); 160 bstream_class->fill = g_buffered_input_stream_real_fill; 161 bstream_class->fill_async = g_buffered_input_stream_real_fill_async; 162 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish; 163 164 g_object_class_install_property (object_class, 165 PROP_BUFSIZE, 166 g_param_spec_uint ("buffer-size", 167 P_("Buffer Size"), 168 P_("The size of the backend buffer"), 169 1, 170 G_MAXUINT, 171 DEFAULT_BUFFER_SIZE, 172 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | 173 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 174 175 176} 177 178/** 179 * g_buffered_input_stream_get_buffer_size: 180 * @stream: #GBufferedInputStream. 181 * 182 * Gets the size of the input buffer. 183 * 184 * Returns: the current buffer size. 185 **/ 186gsize 187g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream) 188{ 189 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0); 190 191 return stream->priv->len; 192} 193 194/** 195 * g_buffered_input_stream_set_buffer_size: 196 * @stream: #GBufferedInputStream. 197 * @size: a #gsize. 198 * 199 * Sets the size of the internal buffer of @stream to @size, or to the 200 * size of the contents of the buffer. The buffer can never be resized 201 * smaller than its current contents. 202 **/ 203void 204g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream, 205 gsize size) 206{ 207 GBufferedInputStreamPrivate *priv; 208 gsize in_buffer; 209 guint8 *buffer; 210 211 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 212 213 priv = stream->priv; 214 215 if (priv->len == size) 216 return; 217 218 if (priv->buffer) 219 { 220 in_buffer = priv->end - priv->pos; 221 222 /* Never resize smaller than current buffer contents */ 223 size = MAX (size, in_buffer); 224 225 buffer = g_malloc (size); 226 memcpy (buffer, priv->buffer + priv->pos, in_buffer); 227 priv->len = size; 228 priv->pos = 0; 229 priv->end = in_buffer; 230 g_free (priv->buffer); 231 priv->buffer = buffer; 232 } 233 else 234 { 235 priv->len = size; 236 priv->pos = 0; 237 priv->end = 0; 238 priv->buffer = g_malloc (size); 239 } 240 241 g_object_notify (G_OBJECT (stream), "buffer-size"); 242} 243 244static void 245g_buffered_input_stream_set_property (GObject *object, 246 guint prop_id, 247 const GValue *value, 248 GParamSpec *pspec) 249{ 250 GBufferedInputStreamPrivate *priv; 251 GBufferedInputStream *bstream; 252 253 bstream = G_BUFFERED_INPUT_STREAM (object); 254 priv = bstream->priv; 255 256 switch (prop_id) 257 { 258 case PROP_BUFSIZE: 259 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value)); 260 break; 261 262 default: 263 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 264 break; 265 } 266 267} 268 269static void 270g_buffered_input_stream_get_property (GObject *object, 271 guint prop_id, 272 GValue *value, 273 GParamSpec *pspec) 274{ 275 GBufferedInputStreamPrivate *priv; 276 GBufferedInputStream *bstream; 277 278 bstream = G_BUFFERED_INPUT_STREAM (object); 279 priv = bstream->priv; 280 281 switch (prop_id) 282 { 283 case PROP_BUFSIZE: 284 g_value_set_uint (value, priv->len); 285 break; 286 287 default: 288 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 289 break; 290 } 291} 292 293static void 294g_buffered_input_stream_finalize (GObject *object) 295{ 296 GBufferedInputStreamPrivate *priv; 297 GBufferedInputStream *stream; 298 299 stream = G_BUFFERED_INPUT_STREAM (object); 300 priv = stream->priv; 301 302 g_free (priv->buffer); 303 304 G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object); 305} 306 307static void 308g_buffered_input_stream_init (GBufferedInputStream *stream) 309{ 310 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 311 G_TYPE_BUFFERED_INPUT_STREAM, 312 GBufferedInputStreamPrivate); 313} 314 315 316/** 317 * g_buffered_input_stream_new: 318 * @base_stream: a #GInputStream. 319 * 320 * Creates a new #GInputStream from the given @base_stream, with 321 * a buffer set to the default size (4 kilobytes). 322 * 323 * Returns: a #GInputStream for the given @base_stream. 324 **/ 325GInputStream * 326g_buffered_input_stream_new (GInputStream *base_stream) 327{ 328 GInputStream *stream; 329 330 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 331 332 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 333 "base-stream", base_stream, 334 NULL); 335 336 return stream; 337} 338 339/** 340 * g_buffered_input_stream_new_sized: 341 * @base_stream: a #GInputStream. 342 * @size: a #gsize. 343 * 344 * Creates a new #GBufferedInputStream from the given @base_stream, 345 * with a buffer set to @size. 346 * 347 * Returns: a #GInputStream. 348 **/ 349GInputStream * 350g_buffered_input_stream_new_sized (GInputStream *base_stream, 351 gsize size) 352{ 353 GInputStream *stream; 354 355 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 356 357 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 358 "base-stream", base_stream, 359 "buffer-size", (guint)size, 360 NULL); 361 362 return stream; 363} 364 365/** 366 * g_buffered_input_stream_fill: 367 * @stream: #GBufferedInputStream. 368 * @count: the number of bytes that will be read from the stream. 369 * @cancellable: optional #GCancellable object, %NULL to ignore. 370 * @error: location to store the error occuring, or %NULL to ignore. 371 * 372 * Tries to read @count bytes from the stream into the buffer. 373 * Will block during this read. 374 * 375 * If @count is zero, returns zero and does nothing. A value of @count 376 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 377 * 378 * On success, the number of bytes read into the buffer is returned. 379 * It is not an error if this is not the same as the requested size, as it 380 * can happen e.g. near the end of a file. Zero is returned on end of file 381 * (or if @count is zero), but never otherwise. 382 * 383 * If @cancellable is not %NULL, then the operation can be cancelled by 384 * triggering the cancellable object from another thread. If the operation 385 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 386 * operation was partially finished when the operation was cancelled the 387 * partial result will be returned, without an error. 388 * 389 * On error -1 is returned and @error is set accordingly. 390 * 391 * For the asynchronous, non-blocking, version of this function, see 392 * g_buffered_input_stream_fill_async(). 393 * 394 * Returns: the number of bytes read into @stream's buffer, up to @count, 395 * or -1 on error. 396 **/ 397gssize 398g_buffered_input_stream_fill (GBufferedInputStream *stream, 399 gssize count, 400 GCancellable *cancellable, 401 GError **error) 402{ 403 GBufferedInputStreamClass *class; 404 GInputStream *input_stream; 405 gssize res; 406 407 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 408 409 input_stream = G_INPUT_STREAM (stream); 410 411 if (!g_input_stream_set_pending (input_stream, error)) 412 return -1; 413 414 if (cancellable) 415 g_cancellable_push_current (cancellable); 416 417 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 418 res = class->fill (stream, count, cancellable, error); 419 420 if (cancellable) 421 g_cancellable_pop_current (cancellable); 422 423 g_input_stream_clear_pending (input_stream); 424 425 return res; 426} 427 428static void 429async_fill_callback_wrapper (GObject *source_object, 430 GAsyncResult *res, 431 gpointer user_data) 432{ 433 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object); 434 435 g_input_stream_clear_pending (G_INPUT_STREAM (stream)); 436 (*stream->priv->outstanding_callback) (source_object, res, user_data); 437 g_object_unref (stream); 438} 439 440/** 441 * g_buffered_input_stream_fill_async: 442 * @stream: #GBufferedInputStream. 443 * @count: a #gssize. 444 * @io_priority: the <link linkend="io-priority">I/O priority</link> 445 * of the request. 446 * @cancellable: optional #GCancellable object 447 * @callback: a #GAsyncReadyCallback. 448 * @user_data: a #gpointer. 449 * 450 * Reads data into @stream's buffer asynchronously, up to @count size. 451 * @io_priority can be used to prioritize reads. For the synchronous 452 * version of this function, see g_buffered_input_stream_fill(). 453 **/ 454void 455g_buffered_input_stream_fill_async (GBufferedInputStream *stream, 456 gssize count, 457 int io_priority, 458 GCancellable *cancellable, 459 GAsyncReadyCallback callback, 460 gpointer user_data) 461{ 462 GBufferedInputStreamClass *class; 463 GSimpleAsyncResult *simple; 464 GError *error = NULL; 465 466 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 467 468 if (count == 0) 469 { 470 simple = g_simple_async_result_new (G_OBJECT (stream), 471 callback, 472 user_data, 473 g_buffered_input_stream_fill_async); 474 g_simple_async_result_complete_in_idle (simple); 475 g_object_unref (simple); 476 return; 477 } 478 479 if (((gssize) count) < 0) 480 { 481 g_simple_async_report_error_in_idle (G_OBJECT (stream), 482 callback, 483 user_data, 484 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 485 _("Too large count value passed to %s"), 486 G_STRFUNC); 487 return; 488 } 489 490 if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error)) 491 { 492 g_simple_async_report_gerror_in_idle (G_OBJECT (stream), 493 callback, 494 user_data, 495 error); 496 g_error_free (error); 497 return; 498 } 499 500 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 501 502 stream->priv->outstanding_callback = callback; 503 g_object_ref (stream); 504 class->fill_async (stream, count, io_priority, cancellable, 505 async_fill_callback_wrapper, user_data); 506} 507 508/** 509 * g_buffered_input_stream_fill_finish: 510 * @stream: a #GBufferedInputStream. 511 * @result: a #GAsyncResult. 512 * @error: a #GError. 513 * 514 * Finishes an asynchronous read. 515 * 516 * Returns: a #gssize of the read stream, or %-1 on an error. 517 **/ 518gssize 519g_buffered_input_stream_fill_finish (GBufferedInputStream *stream, 520 GAsyncResult *result, 521 GError **error) 522{ 523 GSimpleAsyncResult *simple; 524 GBufferedInputStreamClass *class; 525 526 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 527 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); 528 529 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 530 { 531 simple = G_SIMPLE_ASYNC_RESULT (result); 532 if (g_simple_async_result_propagate_error (simple, error)) 533 return -1; 534 535 /* Special case read of 0 bytes */ 536 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async) 537 return 0; 538 } 539 540 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 541 return class->fill_finish (stream, result, error); 542} 543 544/** 545 * g_buffered_input_stream_get_available: 546 * @stream: #GBufferedInputStream. 547 * 548 * Gets the size of the available data within the stream. 549 * 550 * Returns: size of the available stream. 551 **/ 552gsize 553g_buffered_input_stream_get_available (GBufferedInputStream *stream) 554{ 555 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 556 557 return stream->priv->end - stream->priv->pos; 558} 559 560/** 561 * g_buffered_input_stream_peek: 562 * @stream: a #GBufferedInputStream. 563 * @buffer: a pointer to an allocated chunk of memory. 564 * @offset: a #gsize. 565 * @count: a #gsize. 566 * 567 * Peeks in the buffer, copying data of size @count into @buffer, 568 * offset @offset bytes. 569 * 570 * Returns: a #gsize of the number of bytes peeked, or %-1 on error. 571 **/ 572gsize 573g_buffered_input_stream_peek (GBufferedInputStream *stream, 574 void *buffer, 575 gsize offset, 576 gsize count) 577{ 578 gsize available; 579 gsize end; 580 581 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 582 g_return_val_if_fail (buffer != NULL, -1); 583 584 available = g_buffered_input_stream_get_available (stream); 585 586 if (offset > available) 587 return 0; 588 589 end = MIN (offset + count, available); 590 count = end - offset; 591 592 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count); 593 return count; 594} 595 596/** 597 * g_buffered_input_stream_peek_buffer: 598 * @stream: a #GBufferedInputStream. 599 * @count: a #gsize to get the number of bytes available in the buffer. 600 * 601 * Returns the buffer with the currently available bytes. The returned 602 * buffer must not be modified and will become invalid when reading from 603 * the stream or filling the buffer. 604 * 605 * Returns: read-only buffer 606 **/ 607const void* 608g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream, 609 gsize *count) 610{ 611 GBufferedInputStreamPrivate *priv; 612 613 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL); 614 615 priv = stream->priv; 616 617 if (count) 618 *count = priv->end - priv->pos; 619 620 return priv->buffer + priv->pos; 621} 622 623static void 624compact_buffer (GBufferedInputStream *stream) 625{ 626 GBufferedInputStreamPrivate *priv; 627 gsize current_size; 628 629 priv = stream->priv; 630 631 current_size = priv->end - priv->pos; 632 633 g_memmove (priv->buffer, priv->buffer + priv->pos, current_size); 634 635 priv->pos = 0; 636 priv->end = current_size; 637} 638 639static gssize 640g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 641 gssize count, 642 GCancellable *cancellable, 643 GError **error) 644{ 645 GBufferedInputStreamPrivate *priv; 646 GInputStream *base_stream; 647 gssize nread; 648 gsize in_buffer; 649 650 priv = stream->priv; 651 652 if (count == -1) 653 count = priv->len; 654 655 in_buffer = priv->end - priv->pos; 656 657 /* Never fill more than can fit in the buffer */ 658 count = MIN (count, priv->len - in_buffer); 659 660 /* If requested length does not fit at end, compact */ 661 if (priv->len - priv->end < count) 662 compact_buffer (stream); 663 664 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 665 nread = g_input_stream_read (base_stream, 666 priv->buffer + priv->end, 667 count, 668 cancellable, 669 error); 670 671 if (nread > 0) 672 priv->end += nread; 673 674 return nread; 675} 676 677static gssize 678g_buffered_input_stream_skip (GInputStream *stream, 679 gsize count, 680 GCancellable *cancellable, 681 GError **error) 682{ 683 GBufferedInputStream *bstream; 684 GBufferedInputStreamPrivate *priv; 685 GBufferedInputStreamClass *class; 686 GInputStream *base_stream; 687 gsize available, bytes_skipped; 688 gssize nread; 689 690 bstream = G_BUFFERED_INPUT_STREAM (stream); 691 priv = bstream->priv; 692 693 available = priv->end - priv->pos; 694 695 if (count <= available) 696 { 697 priv->pos += count; 698 return count; 699 } 700 701 /* Full request not available, skip all currently available and 702 * request refill for more 703 */ 704 705 priv->pos = 0; 706 priv->end = 0; 707 bytes_skipped = available; 708 count -= available; 709 710 if (bytes_skipped > 0) 711 error = NULL; /* Ignore further errors if we already read some data */ 712 713 if (count > priv->len) 714 { 715 /* Large request, shortcut buffer */ 716 717 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 718 719 nread = g_input_stream_skip (base_stream, 720 count, 721 cancellable, 722 error); 723 724 if (nread < 0 && bytes_skipped == 0) 725 return -1; 726 727 if (nread > 0) 728 bytes_skipped += nread; 729 730 return bytes_skipped; 731 } 732 733 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 734 nread = class->fill (bstream, priv->len, cancellable, error); 735 736 if (nread < 0) 737 { 738 if (bytes_skipped == 0) 739 return -1; 740 else 741 return bytes_skipped; 742 } 743 744 available = priv->end - priv->pos; 745 count = MIN (count, available); 746 747 bytes_skipped += count; 748 priv->pos += count; 749 750 return bytes_skipped; 751} 752 753static gssize 754g_buffered_input_stream_read (GInputStream *stream, 755 void *buffer, 756 gsize count, 757 GCancellable *cancellable, 758 GError **error) 759{ 760 GBufferedInputStream *bstream; 761 GBufferedInputStreamPrivate *priv; 762 GBufferedInputStreamClass *class; 763 GInputStream *base_stream; 764 gsize available, bytes_read; 765 gssize nread; 766 767 bstream = G_BUFFERED_INPUT_STREAM (stream); 768 priv = bstream->priv; 769 770 available = priv->end - priv->pos; 771 772 if (count <= available) 773 { 774 memcpy (buffer, priv->buffer + priv->pos, count); 775 priv->pos += count; 776 return count; 777 } 778 779 /* Full request not available, read all currently availbile and request refill for more */ 780 781 memcpy (buffer, priv->buffer + priv->pos, available); 782 priv->pos = 0; 783 priv->end = 0; 784 bytes_read = available; 785 count -= available; 786 787 if (bytes_read > 0) 788 error = NULL; /* Ignore further errors if we already read some data */ 789 790 if (count > priv->len) 791 { 792 /* Large request, shortcut buffer */ 793 794 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 795 796 nread = g_input_stream_read (base_stream, 797 (char *)buffer + bytes_read, 798 count, 799 cancellable, 800 error); 801 802 if (nread < 0 && bytes_read == 0) 803 return -1; 804 805 if (nread > 0) 806 bytes_read += nread; 807 808 return bytes_read; 809 } 810 811 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 812 nread = class->fill (bstream, priv->len, cancellable, error); 813 if (nread < 0) 814 { 815 if (bytes_read == 0) 816 return -1; 817 else 818 return bytes_read; 819 } 820 821 available = priv->end - priv->pos; 822 count = MIN (count, available); 823 824 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count); 825 bytes_read += count; 826 priv->pos += count; 827 828 return bytes_read; 829} 830 831/** 832 * g_buffered_input_stream_read_byte: 833 * @stream: #GBufferedInputStream. 834 * @cancellable: optional #GCancellable object, %NULL to ignore. 835 * @error: location to store the error occuring, or %NULL to ignore. 836 * 837 * Tries to read a single byte from the stream or the buffer. Will block 838 * during this read. 839 * 840 * On success, the byte read from the stream is returned. On end of stream 841 * -1 is returned but it's not an exceptional error and @error is not set. 842 * 843 * If @cancellable is not %NULL, then the operation can be cancelled by 844 * triggering the cancellable object from another thread. If the operation 845 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 846 * operation was partially finished when the operation was cancelled the 847 * partial result will be returned, without an error. 848 * 849 * On error -1 is returned and @error is set accordingly. 850 * 851 * Returns: the byte read from the @stream, or -1 on end of stream or error. 852 **/ 853int 854g_buffered_input_stream_read_byte (GBufferedInputStream *stream, 855 GCancellable *cancellable, 856 GError **error) 857{ 858 GBufferedInputStreamPrivate *priv; 859 GBufferedInputStreamClass *class; 860 GInputStream *input_stream; 861 gsize available; 862 gssize nread; 863 864 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 865 866 priv = stream->priv; 867 input_stream = G_INPUT_STREAM (stream); 868 869 if (g_input_stream_is_closed (input_stream)) 870 { 871 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, 872 _("Stream is already closed")); 873 return -1; 874 } 875 876 if (!g_input_stream_set_pending (input_stream, error)) 877 return -1; 878 879 available = priv->end - priv->pos; 880 881 if (available < 1) 882 { 883 g_input_stream_clear_pending (input_stream); 884 return priv->buffer[priv->pos++]; 885 } 886 887 /* Byte not available, request refill for more */ 888 889 if (cancellable) 890 g_cancellable_push_current (cancellable); 891 892 priv->pos = 0; 893 priv->end = 0; 894 895 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 896 nread = class->fill (stream, priv->len, cancellable, error); 897 898 if (cancellable) 899 g_cancellable_pop_current (cancellable); 900 901 g_input_stream_clear_pending (input_stream); 902 903 if (nread <= 0) 904 return -1; /* error or end of stream */ 905 906 return priv->buffer[priv->pos++]; 907} 908 909/* ************************** */ 910/* Async stuff implementation */ 911/* ************************** */ 912 913static void 914fill_async_callback (GObject *source_object, 915 GAsyncResult *result, 916 gpointer user_data) 917{ 918 GError *error; 919 gssize res; 920 GSimpleAsyncResult *simple; 921 922 simple = user_data; 923 924 error = NULL; 925 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 926 result, &error); 927 928 g_simple_async_result_set_op_res_gssize (simple, res); 929 if (res == -1) 930 { 931 g_simple_async_result_set_from_error (simple, error); 932 g_error_free (error); 933 } 934 935 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 936 g_simple_async_result_complete (simple); 937 g_object_unref (simple); 938} 939 940static void 941g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 942 gssize count, 943 int io_priority, 944 GCancellable *cancellable, 945 GAsyncReadyCallback callback, 946 gpointer user_data) 947{ 948 GBufferedInputStreamPrivate *priv; 949 GInputStream *base_stream; 950 GSimpleAsyncResult *simple; 951 gsize in_buffer; 952 953 priv = stream->priv; 954 955 if (count == -1) 956 count = priv->len; 957 958 in_buffer = priv->end - priv->pos; 959 960 /* Never fill more than can fit in the buffer */ 961 count = MIN (count, priv->len - in_buffer); 962 963 /* If requested length does not fit at end, compact */ 964 if (priv->len - priv->end < count) 965 compact_buffer (stream); 966 967 simple = g_simple_async_result_new (G_OBJECT (stream), 968 callback, user_data, 969 g_buffered_input_stream_real_fill_async); 970 971 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 972 g_input_stream_read_async (base_stream, 973 priv->buffer + priv->end, 974 count, 975 io_priority, 976 cancellable, 977 fill_async_callback, 978 simple); 979} 980 981static gssize 982g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 983 GAsyncResult *result, 984 GError **error) 985{ 986 GSimpleAsyncResult *simple; 987 gssize nread; 988 989 simple = G_SIMPLE_ASYNC_RESULT (result); 990 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async); 991 992 nread = g_simple_async_result_get_op_res_gssize (simple); 993 return nread; 994} 995 996typedef struct { 997 gssize bytes_read; 998 gssize count; 999 void *buffer; 1000} ReadAsyncData; 1001 1002static void 1003free_read_async_data (gpointer _data) 1004{ 1005 ReadAsyncData *data = _data; 1006 g_slice_free (ReadAsyncData, data); 1007} 1008 1009static void 1010large_read_callback (GObject *source_object, 1011 GAsyncResult *result, 1012 gpointer user_data) 1013{ 1014 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1015 ReadAsyncData *data; 1016 GError *error; 1017 gssize nread; 1018 1019 data = g_simple_async_result_get_op_res_gpointer (simple); 1020 1021 error = NULL; 1022 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 1023 result, &error); 1024 1025 /* Only report the error if we've not already read some data */ 1026 if (nread < 0 && data->bytes_read == 0) 1027 g_simple_async_result_set_from_error (simple, error); 1028 1029 if (nread > 0) 1030 data->bytes_read += nread; 1031 1032 if (error) 1033 g_error_free (error); 1034 1035 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1036 g_simple_async_result_complete (simple); 1037 g_object_unref (simple); 1038} 1039 1040static void 1041read_fill_buffer_callback (GObject *source_object, 1042 GAsyncResult *result, 1043 gpointer user_data) 1044{ 1045 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1046 GBufferedInputStream *bstream; 1047 GBufferedInputStreamPrivate *priv; 1048 ReadAsyncData *data; 1049 GError *error; 1050 gssize nread; 1051 gsize available; 1052 1053 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1054 priv = bstream->priv; 1055 1056 data = g_simple_async_result_get_op_res_gpointer (simple); 1057 1058 error = NULL; 1059 nread = g_buffered_input_stream_fill_finish (bstream, 1060 result, &error); 1061 1062 if (nread < 0 && data->bytes_read == 0) 1063 g_simple_async_result_set_from_error (simple, error); 1064 1065 1066 if (nread > 0) 1067 { 1068 available = priv->end - priv->pos; 1069 data->count = MIN (data->count, available); 1070 1071 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count); 1072 data->bytes_read += data->count; 1073 priv->pos += data->count; 1074 } 1075 1076 if (error) 1077 g_error_free (error); 1078 1079 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1080 g_simple_async_result_complete (simple); 1081 g_object_unref (simple); 1082} 1083 1084static void 1085g_buffered_input_stream_read_async (GInputStream *stream, 1086 void *buffer, 1087 gsize count, 1088 int io_priority, 1089 GCancellable *cancellable, 1090 GAsyncReadyCallback callback, 1091 gpointer user_data) 1092{ 1093 GBufferedInputStream *bstream; 1094 GBufferedInputStreamPrivate *priv; 1095 GBufferedInputStreamClass *class; 1096 GInputStream *base_stream; 1097 gsize available; 1098 GSimpleAsyncResult *simple; 1099 ReadAsyncData *data; 1100 1101 bstream = G_BUFFERED_INPUT_STREAM (stream); 1102 priv = bstream->priv; 1103 1104 data = g_slice_new (ReadAsyncData); 1105 data->buffer = buffer; 1106 data->bytes_read = 0; 1107 simple = g_simple_async_result_new (G_OBJECT (stream), 1108 callback, user_data, 1109 g_buffered_input_stream_read_async); 1110 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data); 1111 1112 available = priv->end - priv->pos; 1113 1114 if (count <= available) 1115 { 1116 memcpy (buffer, priv->buffer + priv->pos, count); 1117 priv->pos += count; 1118 data->bytes_read = count; 1119 1120 g_simple_async_result_complete_in_idle (simple); 1121 g_object_unref (simple); 1122 return; 1123 } 1124 1125 1126 /* Full request not available, read all currently availbile and request refill for more */ 1127 1128 memcpy (buffer, priv->buffer + priv->pos, available); 1129 priv->pos = 0; 1130 priv->end = 0; 1131 1132 count -= available; 1133 1134 data->bytes_read = available; 1135 data->count = count; 1136 1137 if (count > priv->len) 1138 { 1139 /* Large request, shortcut buffer */ 1140 1141 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1142 1143 g_input_stream_read_async (base_stream, 1144 (char *)buffer + data->bytes_read, 1145 count, 1146 io_priority, cancellable, 1147 large_read_callback, 1148 simple); 1149 } 1150 else 1151 { 1152 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1153 class->fill_async (bstream, priv->len, io_priority, cancellable, 1154 read_fill_buffer_callback, simple); 1155 } 1156} 1157 1158static gssize 1159g_buffered_input_stream_read_finish (GInputStream *stream, 1160 GAsyncResult *result, 1161 GError **error) 1162{ 1163 GSimpleAsyncResult *simple; 1164 ReadAsyncData *data; 1165 1166 simple = G_SIMPLE_ASYNC_RESULT (result); 1167 1168 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async); 1169 1170 data = g_simple_async_result_get_op_res_gpointer (simple); 1171 1172 return data->bytes_read; 1173} 1174 1175typedef struct { 1176 gssize bytes_skipped; 1177 gssize count; 1178} SkipAsyncData; 1179 1180static void 1181free_skip_async_data (gpointer _data) 1182{ 1183 SkipAsyncData *data = _data; 1184 g_slice_free (SkipAsyncData, data); 1185} 1186 1187static void 1188large_skip_callback (GObject *source_object, 1189 GAsyncResult *result, 1190 gpointer user_data) 1191{ 1192 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1193 SkipAsyncData *data; 1194 GError *error; 1195 gssize nread; 1196 1197 data = g_simple_async_result_get_op_res_gpointer (simple); 1198 1199 error = NULL; 1200 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object), 1201 result, &error); 1202 1203 /* Only report the error if we've not already read some data */ 1204 if (nread < 0 && data->bytes_skipped == 0) 1205 g_simple_async_result_set_from_error (simple, error); 1206 1207 if (nread > 0) 1208 data->bytes_skipped += nread; 1209 1210 if (error) 1211 g_error_free (error); 1212 1213 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1214 g_simple_async_result_complete (simple); 1215 g_object_unref (simple); 1216} 1217 1218static void 1219skip_fill_buffer_callback (GObject *source_object, 1220 GAsyncResult *result, 1221 gpointer user_data) 1222{ 1223 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1224 GBufferedInputStream *bstream; 1225 GBufferedInputStreamPrivate *priv; 1226 SkipAsyncData *data; 1227 GError *error; 1228 gssize nread; 1229 gsize available; 1230 1231 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1232 priv = bstream->priv; 1233 1234 data = g_simple_async_result_get_op_res_gpointer (simple); 1235 1236 error = NULL; 1237 nread = g_buffered_input_stream_fill_finish (bstream, 1238 result, &error); 1239 1240 if (nread < 0 && data->bytes_skipped == 0) 1241 g_simple_async_result_set_from_error (simple, error); 1242 1243 1244 if (nread > 0) 1245 { 1246 available = priv->end - priv->pos; 1247 data->count = MIN (data->count, available); 1248 1249 data->bytes_skipped += data->count; 1250 priv->pos += data->count; 1251 } 1252 1253 if (error) 1254 g_error_free (error); 1255 1256 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1257 g_simple_async_result_complete (simple); 1258 g_object_unref (simple); 1259} 1260 1261static void 1262g_buffered_input_stream_skip_async (GInputStream *stream, 1263 gsize count, 1264 int io_priority, 1265 GCancellable *cancellable, 1266 GAsyncReadyCallback callback, 1267 gpointer user_data) 1268{ 1269 GBufferedInputStream *bstream; 1270 GBufferedInputStreamPrivate *priv; 1271 GBufferedInputStreamClass *class; 1272 GInputStream *base_stream; 1273 gsize available; 1274 GSimpleAsyncResult *simple; 1275 SkipAsyncData *data; 1276 1277 bstream = G_BUFFERED_INPUT_STREAM (stream); 1278 priv = bstream->priv; 1279 1280 data = g_slice_new (SkipAsyncData); 1281 data->bytes_skipped = 0; 1282 simple = g_simple_async_result_new (G_OBJECT (stream), 1283 callback, user_data, 1284 g_buffered_input_stream_skip_async); 1285 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data); 1286 1287 available = priv->end - priv->pos; 1288 1289 if (count <= available) 1290 { 1291 priv->pos += count; 1292 data->bytes_skipped = count; 1293 1294 g_simple_async_result_complete_in_idle (simple); 1295 g_object_unref (simple); 1296 return; 1297 } 1298 1299 1300 /* Full request not available, skip all currently availbile and request refill for more */ 1301 1302 priv->pos = 0; 1303 priv->end = 0; 1304 1305 count -= available; 1306 1307 data->bytes_skipped = available; 1308 data->count = count; 1309 1310 if (count > priv->len) 1311 { 1312 /* Large request, shortcut buffer */ 1313 1314 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1315 1316 g_input_stream_skip_async (base_stream, 1317 count, 1318 io_priority, cancellable, 1319 large_skip_callback, 1320 simple); 1321 } 1322 else 1323 { 1324 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1325 class->fill_async (bstream, priv->len, io_priority, cancellable, 1326 skip_fill_buffer_callback, simple); 1327 } 1328} 1329 1330static gssize 1331g_buffered_input_stream_skip_finish (GInputStream *stream, 1332 GAsyncResult *result, 1333 GError **error) 1334{ 1335 GSimpleAsyncResult *simple; 1336 SkipAsyncData *data; 1337 1338 simple = G_SIMPLE_ASYNC_RESULT (result); 1339 1340 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async); 1341 1342 data = g_simple_async_result_get_op_res_gpointer (simple); 1343 1344 return data->bytes_skipped; 1345} 1346 1347 1348#define __G_BUFFERED_INPUT_STREAM_C__ 1349#include "gioaliasdef.c" 1350