gbufferedinputstream.c revision b22aa6dde67cb0517f03b375959d379bec06f460
1/* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
20 *
21 * Author: Christian Kellner <gicmo@gnome.org>
22 */
23
24#include <config.h>
25#include "gbufferedinputstream.h"
26#include "ginputstream.h"
27#include "gsimpleasyncresult.h"
28#include <string.h>
29#include "glibintl.h"
30
31#include "gioalias.h"
32
33/**
34 * SECTION:gbufferedinputstream
35 * @short_description: Buffered Input Stream
36 * @see_also: #GFilterInputStream, #GInputStream
37 *
38 * Buffered input stream implements #GFilterInputStream and provides
39 * for buffered reads.
40 *
41 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42 *
43 * To create a buffered input stream, use g_buffered_input_stream_new(),
44 * or g_buffered_input_stream_new_sized() to specify the buffer's size at
45 * construction.
46 *
47 * To get the size of a buffer within a buffered input stream, use
48 * g_buffered_input_stream_get_buffer_size(). To change the size of a
49 * buffered input stream's buffer, use
50 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
51 * cannot be reduced below the size of the data within the buffer.
52 *
53 **/
54
55
56
57#define DEFAULT_BUFFER_SIZE 4096
58
59struct _GBufferedInputStreamPrivate {
60  guint8 *buffer;
61  gsize   len;
62  gsize   pos;
63  gsize   end;
64  GAsyncReadyCallback outstanding_callback;
65};
66
67enum {
68  PROP_0,
69  PROP_BUFSIZE
70};
71
72static void g_buffered_input_stream_set_property  (GObject      *object,
73                                                   guint         prop_id,
74                                                   const GValue *value,
75                                                   GParamSpec   *pspec);
76
77static void g_buffered_input_stream_get_property  (GObject      *object,
78                                                   guint         prop_id,
79                                                   GValue       *value,
80                                                   GParamSpec   *pspec);
81static void g_buffered_input_stream_finalize      (GObject *object);
82
83
84static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
85							gsize                  count,
86							GCancellable          *cancellable,
87							GError               **error);
88static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
89							gsize                  count,
90							int                    io_priority,
91							GCancellable          *cancellable,
92							GAsyncReadyCallback    callback,
93							gpointer               user_data);
94static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
95							GAsyncResult          *result,
96							GError               **error);
97static gssize g_buffered_input_stream_read             (GInputStream          *stream,
98							void                  *buffer,
99							gsize                  count,
100							GCancellable          *cancellable,
101							GError               **error);
102static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
103							void                  *buffer,
104							gsize                  count,
105							int                    io_priority,
106							GCancellable          *cancellable,
107							GAsyncReadyCallback    callback,
108							gpointer               user_data);
109static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
110							GAsyncResult          *result,
111							GError               **error);
112static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
113							gssize                 count,
114							GCancellable          *cancellable,
115							GError               **error);
116static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
117							gssize                 count,
118							int                    io_priority,
119							GCancellable          *cancellable,
120							GAsyncReadyCallback    callback,
121							gpointer               user_data);
122static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
123							GAsyncResult          *result,
124							GError               **error);
125
126static void compact_buffer (GBufferedInputStream *stream);
127
128G_DEFINE_TYPE (GBufferedInputStream,
129               g_buffered_input_stream,
130               G_TYPE_FILTER_INPUT_STREAM)
131
132
133static void
134g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
135{
136  GObjectClass *object_class;
137  GInputStreamClass *istream_class;
138  GBufferedInputStreamClass *bstream_class;
139
140  g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
141
142  object_class = G_OBJECT_CLASS (klass);
143  object_class->get_property = g_buffered_input_stream_get_property;
144  object_class->set_property = g_buffered_input_stream_set_property;
145  object_class->finalize     = g_buffered_input_stream_finalize;
146
147  istream_class = G_INPUT_STREAM_CLASS (klass);
148  istream_class->skip = g_buffered_input_stream_skip;
149  istream_class->skip_async  = g_buffered_input_stream_skip_async;
150  istream_class->skip_finish = g_buffered_input_stream_skip_finish;
151  istream_class->read = g_buffered_input_stream_read;
152  istream_class->read_async  = g_buffered_input_stream_read_async;
153  istream_class->read_finish = g_buffered_input_stream_read_finish;
154
155  bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
156  bstream_class->fill = g_buffered_input_stream_real_fill;
157  bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
158  bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
159
160  g_object_class_install_property (object_class,
161                                   PROP_BUFSIZE,
162                                   g_param_spec_uint ("buffer-size",
163                                                      P_("Buffer Size"),
164                                                      P_("The size of the backend buffer"),
165                                                      1,
166                                                      G_MAXUINT,
167                                                      DEFAULT_BUFFER_SIZE,
168                                                      G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
169                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
170
171
172}
173
174/**
175 * g_buffered_input_stream_get_buffer_size:
176 * @stream: #GBufferedInputStream.
177 *
178 * Gets the size of the input buffer.
179 *
180 * Returns: the current buffer size, or %-1 on error.
181 **/
182gsize
183g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
184{
185  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
186
187  return stream->priv->len;
188}
189
190/**
191 * g_buffered_input_stream_set_buffer_size:
192 * @stream: #GBufferedInputStream.
193 * @size: a #gsize.
194 *
195 * Sets the size of the internal buffer of @stream to @size, or to the
196 * size of the contents of the buffer. The buffer can never be resized
197 * smaller than its current contents.
198 **/
199void
200g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
201                                         gsize                  size)
202{
203  GBufferedInputStreamPrivate *priv;
204  gsize in_buffer;
205  guint8 *buffer;
206
207  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
208
209  priv = stream->priv;
210
211  if (priv->len == size)
212    return;
213
214  if (priv->buffer)
215    {
216      in_buffer = priv->end - priv->pos;
217
218      /* Never resize smaller than current buffer contents */
219      size = MAX (size, in_buffer);
220
221      buffer = g_malloc (size);
222      memcpy (buffer, priv->buffer + priv->pos, in_buffer);
223      priv->len = size;
224      priv->pos = 0;
225      priv->end = in_buffer;
226      g_free (priv->buffer);
227      priv->buffer = buffer;
228    }
229  else
230    {
231      priv->len = size;
232      priv->pos = 0;
233      priv->end = 0;
234      priv->buffer = g_malloc (size);
235    }
236
237  g_object_notify (G_OBJECT (stream), "buffer-size");
238}
239
240static void
241g_buffered_input_stream_set_property (GObject      *object,
242                                      guint         prop_id,
243                                      const GValue *value,
244                                      GParamSpec   *pspec)
245{
246  GBufferedInputStreamPrivate *priv;
247  GBufferedInputStream        *bstream;
248
249  bstream = G_BUFFERED_INPUT_STREAM (object);
250  priv = bstream->priv;
251
252  switch (prop_id)
253    {
254    case PROP_BUFSIZE:
255      g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
256      break;
257
258    default:
259      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
260      break;
261    }
262
263}
264
265static void
266g_buffered_input_stream_get_property (GObject    *object,
267                                      guint       prop_id,
268                                      GValue     *value,
269                                      GParamSpec *pspec)
270{
271  GBufferedInputStreamPrivate *priv;
272  GBufferedInputStream        *bstream;
273
274  bstream = G_BUFFERED_INPUT_STREAM (object);
275  priv = bstream->priv;
276
277  switch (prop_id)
278    {
279    case PROP_BUFSIZE:
280      g_value_set_uint (value, priv->len);
281      break;
282
283    default:
284      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
285      break;
286    }
287}
288
289static void
290g_buffered_input_stream_finalize (GObject *object)
291{
292  GBufferedInputStreamPrivate *priv;
293  GBufferedInputStream        *stream;
294
295  stream = G_BUFFERED_INPUT_STREAM (object);
296  priv = stream->priv;
297
298  g_free (priv->buffer);
299
300  if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
301    (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
302}
303
304static void
305g_buffered_input_stream_init (GBufferedInputStream *stream)
306{
307  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
308                                              G_TYPE_BUFFERED_INPUT_STREAM,
309                                              GBufferedInputStreamPrivate);
310}
311
312
313/**
314 * g_buffered_input_stream_new:
315 * @base_stream: a #GInputStream.
316 *
317 * Creates a new #GInputStream from the given @base_stream, with
318 * a buffer set to the default size (4 kilobytes).
319 *
320 * Returns: a #GInputStream for the given @base_stream.
321 **/
322GInputStream *
323g_buffered_input_stream_new (GInputStream *base_stream)
324{
325  GInputStream *stream;
326
327  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
328
329  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
330                         "base-stream", base_stream,
331                         NULL);
332
333  return stream;
334}
335
336/**
337 * g_buffered_input_stream_new_sized:
338 * @base_stream: a #GOutputStream.
339 * @size: a #gsize.
340 *
341 * Creates a new #GBufferedInputStream from the given @base_stream,
342 * with a buffer set to @size.
343 *
344 * Returns: a #GInputStream.
345 **/
346GInputStream *
347g_buffered_input_stream_new_sized (GInputStream *base_stream,
348                                   gsize         size)
349{
350  GInputStream *stream;
351
352  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
353
354  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
355                         "base-stream", base_stream,
356                         "buffer-size", (guint)size,
357                         NULL);
358
359  return stream;
360}
361
362/**
363 * g_buffered_input_stream_fill:
364 * @stream: #GBufferedInputStream.
365 * @count: the number of bytes that will be read from the stream.
366 * @cancellable: optional #GCancellable object, %NULL to ignore.
367 * @error: location to store the error occuring, or %NULL to ignore.
368 *
369 * Tries to read @count bytes from the stream into the buffer.
370 * Will block during this read.
371 *
372 * If @count is zero, returns zero and does nothing. A value of @count
373 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
374 *
375 * On success, the number of bytes read into the buffer is returned.
376 * It is not an error if this is not the same as the requested size, as it
377 * can happen e.g. near the end of a file. Zero is returned on end of file
378 * (or if @count is zero),  but never otherwise.
379 *
380 * If @cancellable is not %NULL, then the operation can be cancelled by
381 * triggering the cancellable object from another thread. If the operation
382 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383 * operation was partially finished when the operation was cancelled the
384 * partial result will be returned, without an error.
385 *
386 * On error -1 is returned and @error is set accordingly.
387 *
388 * For the asynchronous, non-blocking, version of this function, see
389 * g_buffered_input_stream_fill_async().
390 *
391 * Returns: the number of bytes read into @stream's buffer, up to @count,
392 *     or -1 on error.
393 **/
394gssize
395g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                              gssize                 count,
397                              GCancellable          *cancellable,
398                              GError               **error)
399{
400  GBufferedInputStreamClass *class;
401  GInputStream *input_stream;
402  gssize res;
403
404  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405
406  input_stream = G_INPUT_STREAM (stream);
407
408  if (g_input_stream_is_closed (input_stream))
409    {
410      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
411		   _("Stream is already closed"));
412      return -1;
413    }
414
415  if (g_input_stream_has_pending (input_stream))
416    {
417      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
418		   _("Stream has outstanding operation"));
419      return -1;
420    }
421
422  g_input_stream_set_pending (input_stream, TRUE);
423
424  if (cancellable)
425    g_push_current_cancellable (cancellable);
426
427  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428  res = class->fill (stream, count, cancellable, error);
429
430  if (cancellable)
431    g_pop_current_cancellable (cancellable);
432
433  g_input_stream_set_pending (input_stream, FALSE);
434
435  return res;
436}
437
438static void
439async_fill_callback_wrapper (GObject      *source_object,
440                             GAsyncResult *res,
441                             gpointer      user_data)
442{
443  GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444
445  g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
446  (*stream->priv->outstanding_callback) (source_object, res, user_data);
447  g_object_unref (stream);
448}
449
450/**
451 * g_buffered_input_stream_fill_async:
452 * @stream: #GBufferedInputStream.
453 * @count: a #gssize.
454 * @io_priority: the <link linkend="io-priority">I/O priority</link>
455 *     of the request.
456 * @cancellable: optional #GCancellable object
457 * @callback: a #GAsyncReadyCallback.
458 * @user_data: a #gpointer.
459 *
460 * Reads data into @stream's buffer asynchronously, up to @count size.
461 * @io_priority can be used to prioritize reads. For the synchronous
462 * version of this function, see g_buffered_input_stream_fill().
463 **/
464void
465g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
466                                    gssize                count,
467                                    int                   io_priority,
468                                    GCancellable         *cancellable,
469                                    GAsyncReadyCallback   callback,
470                                    gpointer              user_data)
471{
472  GBufferedInputStreamClass *class;
473  GSimpleAsyncResult *simple;
474
475  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
476
477  if (count == 0)
478    {
479      simple = g_simple_async_result_new (G_OBJECT (stream),
480					  callback,
481					  user_data,
482					  g_buffered_input_stream_fill_async);
483      g_simple_async_result_complete_in_idle (simple);
484      g_object_unref (simple);
485      return;
486    }
487
488  if (((gssize) count) < 0)
489    {
490      g_simple_async_report_error_in_idle (G_OBJECT (stream),
491					   callback,
492					   user_data,
493					   G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
494					   _("Too large count value passed to g_input_stream_read_async"));
495      return;
496    }
497
498  if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
499    {
500      g_simple_async_report_error_in_idle (G_OBJECT (stream),
501					   callback,
502					   user_data,
503					   G_IO_ERROR, G_IO_ERROR_CLOSED,
504					   _("Stream is already closed"));
505      return;
506    }
507
508  if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
509    {
510      g_simple_async_report_error_in_idle (G_OBJECT (stream),
511					   callback,
512					   user_data,
513					   G_IO_ERROR, G_IO_ERROR_PENDING,
514					   _("Stream has outstanding operation"));
515      return;
516    }
517
518  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
519
520  g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
521  stream->priv->outstanding_callback = callback;
522  g_object_ref (stream);
523  class->fill_async (stream, count, io_priority, cancellable,
524                     async_fill_callback_wrapper, user_data);
525}
526
527/**
528 * g_buffered_input_stream_fill_finish:
529 * @stream: a #GBufferedInputStream.
530 * @result: a #GAsyncResult.
531 * @error: a #GError.
532 *
533 * Finishes an asynchronous read.
534 *
535 * Returns: a #gssize of the read stream, or %-1 on an error.
536 **/
537gssize
538g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
539                                     GAsyncResult          *result,
540                                     GError               **error)
541{
542  GSimpleAsyncResult *simple;
543  GBufferedInputStreamClass *class;
544
545  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
546  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
547
548  if (G_IS_SIMPLE_ASYNC_RESULT (result))
549    {
550      simple = G_SIMPLE_ASYNC_RESULT (result);
551      if (g_simple_async_result_propagate_error (simple, error))
552        return -1;
553
554      /* Special case read of 0 bytes */
555      if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
556        return 0;
557    }
558
559  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
560  return class->fill_finish (stream, result, error);
561}
562
563/**
564 * g_buffered_input_stream_get_available:
565 * @stream: #GBufferedInputStream.
566 *
567 * Gets the size of the available data within the stream.
568 *
569 * Returns: size of the available stream.
570 **/
571gsize
572g_buffered_input_stream_get_available (GBufferedInputStream *stream)
573{
574  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
575
576  return stream->priv->end - stream->priv->pos;
577}
578
579/**
580 * g_buffered_input_stream_peek:
581 * @stream: a #GBufferedInputStream.
582 * @buffer: a pointer to an allocated chunk of memory.
583 * @offset: a #gsize.
584 * @count: a #gsize.
585 *
586 * Peeks in the buffer, copying data of size @count into @buffer,
587 * offset @offset bytes.
588 *
589 * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
590 **/
591gsize
592g_buffered_input_stream_peek (GBufferedInputStream *stream,
593                              void                 *buffer,
594                              gsize                 offset,
595                              gsize                 count)
596{
597  gsize available;
598  gsize end;
599
600  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
601  g_return_val_if_fail (buffer != NULL, -1);
602
603  available = g_buffered_input_stream_get_available (stream);
604
605  if (offset > available)
606    return 0;
607
608  end = MIN (offset + count, available);
609  count = end - offset;
610
611  memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
612  return count;
613}
614
615/**
616 * g_buffered_input_stream_peek_buffer:
617 * @stream: a #GBufferedInputStream.
618 * @count: a #gsize to get the number of bytes available in the buffer.
619 *
620 * Returns the buffer with the currently available bytes. The returned
621 * buffer must not be modified and will become invalid when reading from
622 * the stream or filling the buffer.
623 *
624 * Returns: read-only buffer
625 **/
626const void*
627g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
628                                     gsize                *count)
629{
630  GBufferedInputStreamPrivate *priv;
631
632  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
633
634  priv = stream->priv;
635
636  if (count)
637    *count = priv->end - priv->pos;
638
639  return priv->buffer + priv->pos;
640}
641
642static void
643compact_buffer (GBufferedInputStream *stream)
644{
645  GBufferedInputStreamPrivate *priv;
646  gsize current_size;
647
648  priv = stream->priv;
649
650  current_size = priv->end - priv->pos;
651
652  g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
653
654  priv->pos = 0;
655  priv->end = current_size;
656}
657
658static gssize
659g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
660                                   gssize                 count,
661                                   GCancellable          *cancellable,
662                                   GError               **error)
663{
664  GBufferedInputStreamPrivate *priv;
665  GInputStream *base_stream;
666  gssize nread;
667  gsize in_buffer;
668
669  priv = stream->priv;
670
671  if (count == -1)
672    count = priv->len;
673
674  in_buffer = priv->end - priv->pos;
675
676  /* Never fill more than can fit in the buffer */
677  count = MIN (count, priv->len - in_buffer);
678
679  /* If requested length does not fit at end, compact */
680  if (priv->len - priv->end < count)
681    compact_buffer (stream);
682
683  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
684  nread = g_input_stream_read (base_stream,
685                               priv->buffer + priv->end,
686                               count,
687                               cancellable,
688                               error);
689
690  if (nread > 0)
691    priv->end += nread;
692
693  return nread;
694}
695
696static gssize
697g_buffered_input_stream_skip (GInputStream  *stream,
698                              gsize          count,
699                              GCancellable  *cancellable,
700                              GError       **error)
701{
702  GBufferedInputStream        *bstream;
703  GBufferedInputStreamPrivate *priv;
704  GBufferedInputStreamClass *class;
705  GInputStream *base_stream;
706  gsize available, bytes_skipped;
707  gssize nread;
708
709  bstream = G_BUFFERED_INPUT_STREAM (stream);
710  priv = bstream->priv;
711
712  available = priv->end - priv->pos;
713
714  if (count <= available)
715    {
716      priv->pos += count;
717      return count;
718    }
719
720  /* Full request not available, skip all currently available and
721   * request refill for more
722   */
723
724  priv->pos = 0;
725  priv->end = 0;
726  bytes_skipped = available;
727  count -= available;
728
729  if (bytes_skipped > 0)
730    error = NULL; /* Ignore further errors if we already read some data */
731
732  if (count > priv->len)
733    {
734      /* Large request, shortcut buffer */
735
736      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
737
738      nread = g_input_stream_skip (base_stream,
739                                   count,
740                                   cancellable,
741                                   error);
742
743      if (nread < 0 && bytes_skipped == 0)
744        return -1;
745
746      if (nread > 0)
747        bytes_skipped += nread;
748
749      return bytes_skipped;
750    }
751
752  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
753  nread = class->fill (bstream, priv->len, cancellable, error);
754
755  if (nread < 0)
756    {
757      if (bytes_skipped == 0)
758        return -1;
759      else
760        return bytes_skipped;
761    }
762
763  available = priv->end - priv->pos;
764  count = MIN (count, available);
765
766  bytes_skipped += count;
767  priv->pos += count;
768
769  return bytes_skipped;
770}
771
772static gssize
773g_buffered_input_stream_read (GInputStream *stream,
774                              void         *buffer,
775                              gsize         count,
776                              GCancellable *cancellable,
777                              GError      **error)
778{
779  GBufferedInputStream        *bstream;
780  GBufferedInputStreamPrivate *priv;
781  GBufferedInputStreamClass *class;
782  GInputStream *base_stream;
783  gsize available, bytes_read;
784  gssize nread;
785
786  bstream = G_BUFFERED_INPUT_STREAM (stream);
787  priv = bstream->priv;
788
789  available = priv->end - priv->pos;
790
791  if (count <= available)
792    {
793      memcpy (buffer, priv->buffer + priv->pos, count);
794      priv->pos += count;
795      return count;
796    }
797
798  /* Full request not available, read all currently availbile and request refill for more */
799
800  memcpy (buffer, priv->buffer + priv->pos, available);
801  priv->pos = 0;
802  priv->end = 0;
803  bytes_read = available;
804  count -= available;
805
806  if (bytes_read > 0)
807    error = NULL; /* Ignore further errors if we already read some data */
808
809  if (count > priv->len)
810    {
811      /* Large request, shortcut buffer */
812
813      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
814
815      nread = g_input_stream_read (base_stream,
816				   (char *)buffer + bytes_read,
817				   count,
818				   cancellable,
819				   error);
820
821      if (nread < 0 && bytes_read == 0)
822        return -1;
823
824      if (nread > 0)
825        bytes_read += nread;
826
827      return bytes_read;
828    }
829
830  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
831  nread = class->fill (bstream, priv->len, cancellable, error);
832  if (nread < 0)
833    {
834      if (bytes_read == 0)
835        return -1;
836      else
837        return bytes_read;
838    }
839
840  available = priv->end - priv->pos;
841  count = MIN (count, available);
842
843  memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
844  bytes_read += count;
845  priv->pos += count;
846
847  return bytes_read;
848}
849
850/**
851 * g_buffered_input_stream_read_byte:
852 * @stream: #GBufferedInputStream.
853 * @cancellable: optional #GCancellable object, %NULL to ignore.
854 * @error: location to store the error occuring, or %NULL to ignore.
855 *
856 * Tries to read a single byte from the stream or the buffer. Will block
857 * during this read.
858 *
859 * On success, the byte read from the stream is returned. On end of stream
860 * -1 is returned but it's not an exceptional error and @error is not set.
861 *
862 * If @cancellable is not %NULL, then the operation can be cancelled by
863 * triggering the cancellable object from another thread. If the operation
864 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
865 * operation was partially finished when the operation was cancelled the
866 * partial result will be returned, without an error.
867 *
868 * On error -1 is returned and @error is set accordingly.
869 *
870 * Returns: the byte read from the @stream, or -1 on end of stream or error.
871 **/
872int
873g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
874                                   GCancellable          *cancellable,
875                                   GError               **error)
876{
877  GBufferedInputStreamPrivate *priv;
878  GBufferedInputStreamClass *class;
879  GInputStream *input_stream;
880  gsize available;
881  gssize nread;
882
883  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
884
885  priv = stream->priv;
886  input_stream = G_INPUT_STREAM (stream);
887
888  if (g_input_stream_is_closed (input_stream))
889    {
890      g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
891       _("Stream is already closed"));
892      return -1;
893    }
894
895  if (g_input_stream_has_pending (input_stream))
896    {
897      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
898       _("Stream has outstanding operation"));
899      return -1;
900    }
901
902  available = priv->end - priv->pos;
903
904  if (available < 1)
905    return priv->buffer[priv->pos++];
906
907  /* Byte not available, request refill for more */
908
909  g_input_stream_set_pending (input_stream, TRUE);
910
911  if (cancellable)
912    g_push_current_cancellable (cancellable);
913
914  priv->pos = 0;
915  priv->end = 0;
916
917  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
918  nread = class->fill (stream, priv->len, cancellable, error);
919
920  if (cancellable)
921    g_pop_current_cancellable (cancellable);
922
923  g_input_stream_set_pending (input_stream, FALSE);
924
925  if (nread <= 0)
926    return -1; /* error or end of stream */
927
928  return priv->buffer[priv->pos++];
929}
930
931/* ************************** */
932/* Async stuff implementation */
933/* ************************** */
934
935static void
936fill_async_callback (GObject      *source_object,
937                     GAsyncResult *result,
938                     gpointer      user_data)
939{
940  GError *error;
941  gssize res;
942  GSimpleAsyncResult *simple;
943
944  simple = user_data;
945
946  error = NULL;
947  res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
948				    result, &error);
949
950  g_simple_async_result_set_op_res_gssize (simple, res);
951  if (res == -1)
952    {
953      g_simple_async_result_set_from_error (simple, error);
954      g_error_free (error);
955    }
956
957  /* Complete immediately, not in idle, since we're already in a mainloop callout */
958  g_simple_async_result_complete (simple);
959  g_object_unref (simple);
960}
961
962static void
963g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
964                                         gssize                count,
965                                         int                   io_priority,
966                                         GCancellable         *cancellable,
967                                         GAsyncReadyCallback   callback,
968                                         gpointer              user_data)
969{
970  GBufferedInputStreamPrivate *priv;
971  GInputStream *base_stream;
972  GSimpleAsyncResult *simple;
973  gsize in_buffer;
974
975  priv = stream->priv;
976
977  if (count == -1)
978    count = priv->len;
979
980  in_buffer = priv->end - priv->pos;
981
982  /* Never fill more than can fit in the buffer */
983  count = MIN (count, priv->len - in_buffer);
984
985  /* If requested length does not fit at end, compact */
986  if (priv->len - priv->end < count)
987    compact_buffer (stream);
988
989  simple = g_simple_async_result_new (G_OBJECT (stream),
990				      callback, user_data,
991				      g_buffered_input_stream_real_fill_async);
992
993  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
994  g_input_stream_read_async (base_stream,
995			     priv->buffer + priv->end,
996			     count,
997			     io_priority,
998			     cancellable,
999			     fill_async_callback,
1000			     simple);
1001}
1002
1003static gssize
1004g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1005					  GAsyncResult         *result,
1006					  GError              **error)
1007{
1008  GSimpleAsyncResult *simple;
1009  gssize nread;
1010
1011  simple = G_SIMPLE_ASYNC_RESULT (result);
1012  g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1013
1014  nread = g_simple_async_result_get_op_res_gssize (simple);
1015  return nread;
1016}
1017
1018typedef struct {
1019  gssize bytes_read;
1020  gssize count;
1021  void *buffer;
1022} ReadAsyncData;
1023
1024static void
1025free_read_async_data (gpointer _data)
1026{
1027  ReadAsyncData *data = _data;
1028  g_slice_free (ReadAsyncData, data);
1029}
1030
1031static void
1032large_read_callback (GObject *source_object,
1033		     GAsyncResult *result,
1034		     gpointer user_data)
1035{
1036  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1037  ReadAsyncData *data;
1038  GError *error;
1039  gssize nread;
1040
1041  data = g_simple_async_result_get_op_res_gpointer (simple);
1042
1043  error = NULL;
1044  nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1045				      result, &error);
1046
1047  /* Only report the error if we've not already read some data */
1048  if (nread < 0 && data->bytes_read == 0)
1049    g_simple_async_result_set_from_error (simple, error);
1050
1051  if (nread > 0)
1052    data->bytes_read += nread;
1053
1054  if (error)
1055    g_error_free (error);
1056
1057  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1058  g_simple_async_result_complete (simple);
1059  g_object_unref (simple);
1060}
1061
1062static void
1063read_fill_buffer_callback (GObject *source_object,
1064			   GAsyncResult *result,
1065			   gpointer user_data)
1066{
1067  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1068  GBufferedInputStream *bstream;
1069  GBufferedInputStreamPrivate *priv;
1070  ReadAsyncData *data;
1071  GError *error;
1072  gssize nread;
1073  gsize available;
1074
1075  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1076  priv = bstream->priv;
1077
1078  data = g_simple_async_result_get_op_res_gpointer (simple);
1079
1080  error = NULL;
1081  nread = g_buffered_input_stream_fill_finish (bstream,
1082					       result, &error);
1083
1084  if (nread < 0 && data->bytes_read == 0)
1085    g_simple_async_result_set_from_error (simple, error);
1086
1087
1088  if (nread > 0)
1089    {
1090      available = priv->end - priv->pos;
1091      data->count = MIN (data->count, available);
1092
1093      memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1094      data->bytes_read += data->count;
1095      priv->pos += data->count;
1096    }
1097
1098  if (error)
1099    g_error_free (error);
1100
1101  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1102  g_simple_async_result_complete (simple);
1103  g_object_unref (simple);
1104}
1105
1106static void
1107g_buffered_input_stream_read_async (GInputStream              *stream,
1108                                    void                      *buffer,
1109                                    gsize                      count,
1110                                    int                        io_priority,
1111                                    GCancellable              *cancellable,
1112                                    GAsyncReadyCallback        callback,
1113                                    gpointer                   user_data)
1114{
1115  GBufferedInputStream *bstream;
1116  GBufferedInputStreamPrivate *priv;
1117  GBufferedInputStreamClass *class;
1118  GInputStream *base_stream;
1119  gsize available;
1120  GSimpleAsyncResult *simple;
1121  ReadAsyncData *data;
1122
1123  bstream = G_BUFFERED_INPUT_STREAM (stream);
1124  priv = bstream->priv;
1125
1126  data = g_slice_new (ReadAsyncData);
1127  data->buffer = buffer;
1128  data->bytes_read = 0;
1129  simple = g_simple_async_result_new (G_OBJECT (stream),
1130				      callback, user_data,
1131				      g_buffered_input_stream_read_async);
1132  g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1133
1134  available = priv->end - priv->pos;
1135
1136  if (count <= available)
1137    {
1138      memcpy (buffer, priv->buffer + priv->pos, count);
1139      priv->pos += count;
1140      data->bytes_read = count;
1141
1142      g_simple_async_result_complete_in_idle (simple);
1143      g_object_unref (simple);
1144      return;
1145    }
1146
1147
1148  /* Full request not available, read all currently availbile and request refill for more */
1149
1150  memcpy (buffer, priv->buffer + priv->pos, available);
1151  priv->pos = 0;
1152  priv->end = 0;
1153
1154  count -= available;
1155
1156  data->bytes_read = available;
1157  data->count = count;
1158
1159  if (count > priv->len)
1160    {
1161      /* Large request, shortcut buffer */
1162
1163      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1164
1165      g_input_stream_read_async (base_stream,
1166				 (char *)buffer + data->bytes_read,
1167				 count,
1168				 io_priority, cancellable,
1169				 large_read_callback,
1170				 simple);
1171    }
1172  else
1173    {
1174      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1175      class->fill_async (bstream, priv->len, io_priority, cancellable,
1176			 read_fill_buffer_callback, simple);
1177    }
1178}
1179
1180static gssize
1181g_buffered_input_stream_read_finish (GInputStream   *stream,
1182                                     GAsyncResult   *result,
1183                                     GError        **error)
1184{
1185  GSimpleAsyncResult *simple;
1186  ReadAsyncData *data;
1187
1188  simple = G_SIMPLE_ASYNC_RESULT (result);
1189
1190  g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1191
1192  data = g_simple_async_result_get_op_res_gpointer (simple);
1193
1194  return data->bytes_read;
1195}
1196
1197typedef struct {
1198  gssize bytes_skipped;
1199  gssize count;
1200} SkipAsyncData;
1201
1202static void
1203free_skip_async_data (gpointer _data)
1204{
1205  SkipAsyncData *data = _data;
1206  g_slice_free (SkipAsyncData, data);
1207}
1208
1209static void
1210large_skip_callback (GObject *source_object,
1211		     GAsyncResult *result,
1212		     gpointer user_data)
1213{
1214  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1215  SkipAsyncData *data;
1216  GError *error;
1217  gssize nread;
1218
1219  data = g_simple_async_result_get_op_res_gpointer (simple);
1220
1221  error = NULL;
1222  nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1223				      result, &error);
1224
1225  /* Only report the error if we've not already read some data */
1226  if (nread < 0 && data->bytes_skipped == 0)
1227    g_simple_async_result_set_from_error (simple, error);
1228
1229  if (nread > 0)
1230    data->bytes_skipped += nread;
1231
1232  if (error)
1233    g_error_free (error);
1234
1235  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1236  g_simple_async_result_complete (simple);
1237  g_object_unref (simple);
1238}
1239
1240static void
1241skip_fill_buffer_callback (GObject *source_object,
1242			   GAsyncResult *result,
1243			   gpointer user_data)
1244{
1245  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1246  GBufferedInputStream *bstream;
1247  GBufferedInputStreamPrivate *priv;
1248  SkipAsyncData *data;
1249  GError *error;
1250  gssize nread;
1251  gsize available;
1252
1253  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1254  priv = bstream->priv;
1255
1256  data = g_simple_async_result_get_op_res_gpointer (simple);
1257
1258  error = NULL;
1259  nread = g_buffered_input_stream_fill_finish (bstream,
1260					       result, &error);
1261
1262  if (nread < 0 && data->bytes_skipped == 0)
1263    g_simple_async_result_set_from_error (simple, error);
1264
1265
1266  if (nread > 0)
1267    {
1268      available = priv->end - priv->pos;
1269      data->count = MIN (data->count, available);
1270
1271      data->bytes_skipped += data->count;
1272      priv->pos += data->count;
1273    }
1274
1275  if (error)
1276    g_error_free (error);
1277
1278  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1279  g_simple_async_result_complete (simple);
1280  g_object_unref (simple);
1281}
1282
1283static void
1284g_buffered_input_stream_skip_async (GInputStream              *stream,
1285                                    gsize                      count,
1286                                    int                        io_priority,
1287                                    GCancellable              *cancellable,
1288                                    GAsyncReadyCallback        callback,
1289                                    gpointer                   user_data)
1290{
1291  GBufferedInputStream *bstream;
1292  GBufferedInputStreamPrivate *priv;
1293  GBufferedInputStreamClass *class;
1294  GInputStream *base_stream;
1295  gsize available;
1296  GSimpleAsyncResult *simple;
1297  SkipAsyncData *data;
1298
1299  bstream = G_BUFFERED_INPUT_STREAM (stream);
1300  priv = bstream->priv;
1301
1302  data = g_slice_new (SkipAsyncData);
1303  data->bytes_skipped = 0;
1304  simple = g_simple_async_result_new (G_OBJECT (stream),
1305				      callback, user_data,
1306				      g_buffered_input_stream_skip_async);
1307  g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1308
1309  available = priv->end - priv->pos;
1310
1311  if (count <= available)
1312    {
1313      priv->pos += count;
1314      data->bytes_skipped = count;
1315
1316      g_simple_async_result_complete_in_idle (simple);
1317      g_object_unref (simple);
1318      return;
1319    }
1320
1321
1322  /* Full request not available, skip all currently availbile and request refill for more */
1323
1324  priv->pos = 0;
1325  priv->end = 0;
1326
1327  count -= available;
1328
1329  data->bytes_skipped = available;
1330  data->count = count;
1331
1332  if (count > priv->len)
1333    {
1334      /* Large request, shortcut buffer */
1335
1336      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1337
1338      g_input_stream_skip_async (base_stream,
1339				 count,
1340				 io_priority, cancellable,
1341				 large_skip_callback,
1342				 simple);
1343    }
1344  else
1345    {
1346      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1347      class->fill_async (bstream, priv->len, io_priority, cancellable,
1348			 skip_fill_buffer_callback, simple);
1349    }
1350}
1351
1352static gssize
1353g_buffered_input_stream_skip_finish (GInputStream   *stream,
1354                                     GAsyncResult   *result,
1355                                     GError        **error)
1356{
1357  GSimpleAsyncResult *simple;
1358  SkipAsyncData *data;
1359
1360  simple = G_SIMPLE_ASYNC_RESULT (result);
1361
1362  g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1363
1364  data = g_simple_async_result_get_op_res_gpointer (simple);
1365
1366  return data->bytes_skipped;
1367}
1368
1369
1370#define __G_BUFFERED_INPUT_STREAM_C__
1371#include "gioaliasdef.c"
1372