1/* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
20 *
21 * Author: Christian Kellner <gicmo@gnome.org>
22 */
23
24#include "config.h"
25#include "gbufferedinputstream.h"
26#include "ginputstream.h"
27#include "gcancellable.h"
28#include "gasyncresult.h"
29#include "gsimpleasyncresult.h"
30#include "gioerror.h"
31#include <string.h>
32#include "glibintl.h"
33
34#include "gioalias.h"
35
36/**
37 * SECTION:gbufferedinputstream
38 * @short_description: Buffered Input Stream
39 * @include: gio/gio.h
40 * @see_also: #GFilterInputStream, #GInputStream
41 *
42 * Buffered input stream implements #GFilterInputStream and provides
43 * for buffered reads.
44 *
45 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46 *
47 * To create a buffered input stream, use g_buffered_input_stream_new(),
48 * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49 * construction.
50 *
51 * To get the size of a buffer within a buffered input stream, use
52 * g_buffered_input_stream_get_buffer_size(). To change the size of a
53 * buffered input stream's buffer, use
54 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55 * cannot be reduced below the size of the data within the buffer.
56 *
57 **/
58
59
60
61#define DEFAULT_BUFFER_SIZE 4096
62
63struct _GBufferedInputStreamPrivate {
64  guint8 *buffer;
65  gsize   len;
66  gsize   pos;
67  gsize   end;
68  GAsyncReadyCallback outstanding_callback;
69};
70
71enum {
72  PROP_0,
73  PROP_BUFSIZE
74};
75
76static void g_buffered_input_stream_set_property  (GObject      *object,
77                                                   guint         prop_id,
78                                                   const GValue *value,
79                                                   GParamSpec   *pspec);
80
81static void g_buffered_input_stream_get_property  (GObject      *object,
82                                                   guint         prop_id,
83                                                   GValue       *value,
84                                                   GParamSpec   *pspec);
85static void g_buffered_input_stream_finalize      (GObject *object);
86
87
88static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
89							gsize                  count,
90							GCancellable          *cancellable,
91							GError               **error);
92static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
93							gsize                  count,
94							int                    io_priority,
95							GCancellable          *cancellable,
96							GAsyncReadyCallback    callback,
97							gpointer               user_data);
98static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
99							GAsyncResult          *result,
100							GError               **error);
101static gssize g_buffered_input_stream_read             (GInputStream          *stream,
102							void                  *buffer,
103							gsize                  count,
104							GCancellable          *cancellable,
105							GError               **error);
106static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
107							void                  *buffer,
108							gsize                  count,
109							int                    io_priority,
110							GCancellable          *cancellable,
111							GAsyncReadyCallback    callback,
112							gpointer               user_data);
113static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
114							GAsyncResult          *result,
115							GError               **error);
116static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
117							gssize                 count,
118							GCancellable          *cancellable,
119							GError               **error);
120static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
121							gssize                 count,
122							int                    io_priority,
123							GCancellable          *cancellable,
124							GAsyncReadyCallback    callback,
125							gpointer               user_data);
126static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
127							GAsyncResult          *result,
128							GError               **error);
129
130static void compact_buffer (GBufferedInputStream *stream);
131
132G_DEFINE_TYPE (GBufferedInputStream,
133               g_buffered_input_stream,
134               G_TYPE_FILTER_INPUT_STREAM)
135
136
137static void
138g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139{
140  GObjectClass *object_class;
141  GInputStreamClass *istream_class;
142  GBufferedInputStreamClass *bstream_class;
143
144  g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
145
146  object_class = G_OBJECT_CLASS (klass);
147  object_class->get_property = g_buffered_input_stream_get_property;
148  object_class->set_property = g_buffered_input_stream_set_property;
149  object_class->finalize     = g_buffered_input_stream_finalize;
150
151  istream_class = G_INPUT_STREAM_CLASS (klass);
152  istream_class->skip = g_buffered_input_stream_skip;
153  istream_class->skip_async  = g_buffered_input_stream_skip_async;
154  istream_class->skip_finish = g_buffered_input_stream_skip_finish;
155  istream_class->read_fn = g_buffered_input_stream_read;
156  istream_class->read_async  = g_buffered_input_stream_read_async;
157  istream_class->read_finish = g_buffered_input_stream_read_finish;
158
159  bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
160  bstream_class->fill = g_buffered_input_stream_real_fill;
161  bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
162  bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
163
164  g_object_class_install_property (object_class,
165                                   PROP_BUFSIZE,
166                                   g_param_spec_uint ("buffer-size",
167                                                      P_("Buffer Size"),
168                                                      P_("The size of the backend buffer"),
169                                                      1,
170                                                      G_MAXUINT,
171                                                      DEFAULT_BUFFER_SIZE,
172                                                      G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
173                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
174
175
176}
177
178/**
179 * g_buffered_input_stream_get_buffer_size:
180 * @stream: #GBufferedInputStream.
181 *
182 * Gets the size of the input buffer.
183 *
184 * Returns: the current buffer size.
185 **/
186gsize
187g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
188{
189  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
190
191  return stream->priv->len;
192}
193
194/**
195 * g_buffered_input_stream_set_buffer_size:
196 * @stream: #GBufferedInputStream.
197 * @size: a #gsize.
198 *
199 * Sets the size of the internal buffer of @stream to @size, or to the
200 * size of the contents of the buffer. The buffer can never be resized
201 * smaller than its current contents.
202 **/
203void
204g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
205                                         gsize                  size)
206{
207  GBufferedInputStreamPrivate *priv;
208  gsize in_buffer;
209  guint8 *buffer;
210
211  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
212
213  priv = stream->priv;
214
215  if (priv->len == size)
216    return;
217
218  if (priv->buffer)
219    {
220      in_buffer = priv->end - priv->pos;
221
222      /* Never resize smaller than current buffer contents */
223      size = MAX (size, in_buffer);
224
225      buffer = g_malloc (size);
226      memcpy (buffer, priv->buffer + priv->pos, in_buffer);
227      priv->len = size;
228      priv->pos = 0;
229      priv->end = in_buffer;
230      g_free (priv->buffer);
231      priv->buffer = buffer;
232    }
233  else
234    {
235      priv->len = size;
236      priv->pos = 0;
237      priv->end = 0;
238      priv->buffer = g_malloc (size);
239    }
240
241  g_object_notify (G_OBJECT (stream), "buffer-size");
242}
243
244static void
245g_buffered_input_stream_set_property (GObject      *object,
246                                      guint         prop_id,
247                                      const GValue *value,
248                                      GParamSpec   *pspec)
249{
250  GBufferedInputStreamPrivate *priv;
251  GBufferedInputStream        *bstream;
252
253  bstream = G_BUFFERED_INPUT_STREAM (object);
254  priv = bstream->priv;
255
256  switch (prop_id)
257    {
258    case PROP_BUFSIZE:
259      g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
260      break;
261
262    default:
263      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264      break;
265    }
266
267}
268
269static void
270g_buffered_input_stream_get_property (GObject    *object,
271                                      guint       prop_id,
272                                      GValue     *value,
273                                      GParamSpec *pspec)
274{
275  GBufferedInputStreamPrivate *priv;
276  GBufferedInputStream        *bstream;
277
278  bstream = G_BUFFERED_INPUT_STREAM (object);
279  priv = bstream->priv;
280
281  switch (prop_id)
282    {
283    case PROP_BUFSIZE:
284      g_value_set_uint (value, priv->len);
285      break;
286
287    default:
288      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
289      break;
290    }
291}
292
293static void
294g_buffered_input_stream_finalize (GObject *object)
295{
296  GBufferedInputStreamPrivate *priv;
297  GBufferedInputStream        *stream;
298
299  stream = G_BUFFERED_INPUT_STREAM (object);
300  priv = stream->priv;
301
302  g_free (priv->buffer);
303
304  G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
305}
306
307static void
308g_buffered_input_stream_init (GBufferedInputStream *stream)
309{
310  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
311                                              G_TYPE_BUFFERED_INPUT_STREAM,
312                                              GBufferedInputStreamPrivate);
313}
314
315
316/**
317 * g_buffered_input_stream_new:
318 * @base_stream: a #GInputStream.
319 *
320 * Creates a new #GInputStream from the given @base_stream, with
321 * a buffer set to the default size (4 kilobytes).
322 *
323 * Returns: a #GInputStream for the given @base_stream.
324 **/
325GInputStream *
326g_buffered_input_stream_new (GInputStream *base_stream)
327{
328  GInputStream *stream;
329
330  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
331
332  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
333                         "base-stream", base_stream,
334                         NULL);
335
336  return stream;
337}
338
339/**
340 * g_buffered_input_stream_new_sized:
341 * @base_stream: a #GInputStream.
342 * @size: a #gsize.
343 *
344 * Creates a new #GBufferedInputStream from the given @base_stream,
345 * with a buffer set to @size.
346 *
347 * Returns: a #GInputStream.
348 **/
349GInputStream *
350g_buffered_input_stream_new_sized (GInputStream *base_stream,
351                                   gsize         size)
352{
353  GInputStream *stream;
354
355  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
356
357  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
358                         "base-stream", base_stream,
359                         "buffer-size", (guint)size,
360                         NULL);
361
362  return stream;
363}
364
365/**
366 * g_buffered_input_stream_fill:
367 * @stream: #GBufferedInputStream.
368 * @count: the number of bytes that will be read from the stream.
369 * @cancellable: optional #GCancellable object, %NULL to ignore.
370 * @error: location to store the error occuring, or %NULL to ignore.
371 *
372 * Tries to read @count bytes from the stream into the buffer.
373 * Will block during this read.
374 *
375 * If @count is zero, returns zero and does nothing. A value of @count
376 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
377 *
378 * On success, the number of bytes read into the buffer is returned.
379 * It is not an error if this is not the same as the requested size, as it
380 * can happen e.g. near the end of a file. Zero is returned on end of file
381 * (or if @count is zero),  but never otherwise.
382 *
383 * If @count is -1 then the attempted read size is equal to the number of
384 * bytes that are required to fill the buffer.
385 *
386 * If @cancellable is not %NULL, then the operation can be cancelled by
387 * triggering the cancellable object from another thread. If the operation
388 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
389 * operation was partially finished when the operation was cancelled the
390 * partial result will be returned, without an error.
391 *
392 * On error -1 is returned and @error is set accordingly.
393 *
394 * For the asynchronous, non-blocking, version of this function, see
395 * g_buffered_input_stream_fill_async().
396 *
397 * Returns: the number of bytes read into @stream's buffer, up to @count,
398 *     or -1 on error.
399 **/
400gssize
401g_buffered_input_stream_fill (GBufferedInputStream  *stream,
402                              gssize                 count,
403                              GCancellable          *cancellable,
404                              GError               **error)
405{
406  GBufferedInputStreamClass *class;
407  GInputStream *input_stream;
408  gssize res;
409
410  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
411
412  input_stream = G_INPUT_STREAM (stream);
413
414  if (count < -1)
415    {
416      g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
417                   _("Too large count value passed to %s"), G_STRFUNC);
418      return -1;
419    }
420
421  if (!g_input_stream_set_pending (input_stream, error))
422    return -1;
423
424  if (cancellable)
425    g_cancellable_push_current (cancellable);
426
427  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428  res = class->fill (stream, count, cancellable, error);
429
430  if (cancellable)
431    g_cancellable_pop_current (cancellable);
432
433  g_input_stream_clear_pending (input_stream);
434
435  return res;
436}
437
438static void
439async_fill_callback_wrapper (GObject      *source_object,
440                             GAsyncResult *res,
441                             gpointer      user_data)
442{
443  GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444
445  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
446  (*stream->priv->outstanding_callback) (source_object, res, user_data);
447  g_object_unref (stream);
448}
449
450/**
451 * g_buffered_input_stream_fill_async:
452 * @stream: #GBufferedInputStream.
453 * @count: the number of bytes that will be read from the stream.
454 * @io_priority: the <link linkend="io-priority">I/O priority</link>
455 *     of the request.
456 * @cancellable: optional #GCancellable object
457 * @callback: a #GAsyncReadyCallback.
458 * @user_data: a #gpointer.
459 *
460 * Reads data into @stream's buffer asynchronously, up to @count size.
461 * @io_priority can be used to prioritize reads. For the synchronous
462 * version of this function, see g_buffered_input_stream_fill().
463 *
464 * If @count is -1 then the attempted read size is equal to the number
465 * of bytes that are required to fill the buffer.
466 **/
467void
468g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
469                                    gssize                count,
470                                    int                   io_priority,
471                                    GCancellable         *cancellable,
472                                    GAsyncReadyCallback   callback,
473                                    gpointer              user_data)
474{
475  GBufferedInputStreamClass *class;
476  GSimpleAsyncResult *simple;
477  GError *error = NULL;
478
479  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
480
481  if (count == 0)
482    {
483      simple = g_simple_async_result_new (G_OBJECT (stream),
484					  callback,
485					  user_data,
486					  g_buffered_input_stream_fill_async);
487      g_simple_async_result_complete_in_idle (simple);
488      g_object_unref (simple);
489      return;
490    }
491
492  if (count < -1)
493    {
494      g_simple_async_report_error_in_idle (G_OBJECT (stream),
495					   callback,
496					   user_data,
497					   G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
498					   _("Too large count value passed to %s"),
499					   G_STRFUNC);
500      return;
501    }
502
503  if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
504    {
505      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
506					    callback,
507					    user_data,
508					    error);
509      g_error_free (error);
510      return;
511    }
512
513  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
514
515  stream->priv->outstanding_callback = callback;
516  g_object_ref (stream);
517  class->fill_async (stream, count, io_priority, cancellable,
518                     async_fill_callback_wrapper, user_data);
519}
520
521/**
522 * g_buffered_input_stream_fill_finish:
523 * @stream: a #GBufferedInputStream.
524 * @result: a #GAsyncResult.
525 * @error: a #GError.
526 *
527 * Finishes an asynchronous read.
528 *
529 * Returns: a #gssize of the read stream, or %-1 on an error.
530 **/
531gssize
532g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
533                                     GAsyncResult          *result,
534                                     GError               **error)
535{
536  GSimpleAsyncResult *simple;
537  GBufferedInputStreamClass *class;
538
539  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
541
542  if (G_IS_SIMPLE_ASYNC_RESULT (result))
543    {
544      simple = G_SIMPLE_ASYNC_RESULT (result);
545      if (g_simple_async_result_propagate_error (simple, error))
546        return -1;
547
548      /* Special case read of 0 bytes */
549      if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
550        return 0;
551    }
552
553  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554  return class->fill_finish (stream, result, error);
555}
556
557/**
558 * g_buffered_input_stream_get_available:
559 * @stream: #GBufferedInputStream.
560 *
561 * Gets the size of the available data within the stream.
562 *
563 * Returns: size of the available stream.
564 **/
565gsize
566g_buffered_input_stream_get_available (GBufferedInputStream *stream)
567{
568  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
569
570  return stream->priv->end - stream->priv->pos;
571}
572
573/**
574 * g_buffered_input_stream_peek:
575 * @stream: a #GBufferedInputStream.
576 * @buffer: a pointer to an allocated chunk of memory.
577 * @offset: a #gsize.
578 * @count: a #gsize.
579 *
580 * Peeks in the buffer, copying data of size @count into @buffer,
581 * offset @offset bytes.
582 *
583 * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
584 **/
585gsize
586g_buffered_input_stream_peek (GBufferedInputStream *stream,
587                              void                 *buffer,
588                              gsize                 offset,
589                              gsize                 count)
590{
591  gsize available;
592  gsize end;
593
594  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595  g_return_val_if_fail (buffer != NULL, -1);
596
597  available = g_buffered_input_stream_get_available (stream);
598
599  if (offset > available)
600    return 0;
601
602  end = MIN (offset + count, available);
603  count = end - offset;
604
605  memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
606  return count;
607}
608
609/**
610 * g_buffered_input_stream_peek_buffer:
611 * @stream: a #GBufferedInputStream.
612 * @count: a #gsize to get the number of bytes available in the buffer.
613 *
614 * Returns the buffer with the currently available bytes. The returned
615 * buffer must not be modified and will become invalid when reading from
616 * the stream or filling the buffer.
617 *
618 * Returns: read-only buffer
619 **/
620const void*
621g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
622                                     gsize                *count)
623{
624  GBufferedInputStreamPrivate *priv;
625
626  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
627
628  priv = stream->priv;
629
630  if (count)
631    *count = priv->end - priv->pos;
632
633  return priv->buffer + priv->pos;
634}
635
636static void
637compact_buffer (GBufferedInputStream *stream)
638{
639  GBufferedInputStreamPrivate *priv;
640  gsize current_size;
641
642  priv = stream->priv;
643
644  current_size = priv->end - priv->pos;
645
646  g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
647
648  priv->pos = 0;
649  priv->end = current_size;
650}
651
652static gssize
653g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
654                                   gssize                 count,
655                                   GCancellable          *cancellable,
656                                   GError               **error)
657{
658  GBufferedInputStreamPrivate *priv;
659  GInputStream *base_stream;
660  gssize nread;
661  gsize in_buffer;
662
663  priv = stream->priv;
664
665  if (count == -1)
666    count = priv->len;
667
668  in_buffer = priv->end - priv->pos;
669
670  /* Never fill more than can fit in the buffer */
671  count = MIN (count, priv->len - in_buffer);
672
673  /* If requested length does not fit at end, compact */
674  if (priv->len - priv->end < count)
675    compact_buffer (stream);
676
677  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
678  nread = g_input_stream_read (base_stream,
679                               priv->buffer + priv->end,
680                               count,
681                               cancellable,
682                               error);
683
684  if (nread > 0)
685    priv->end += nread;
686
687  return nread;
688}
689
690static gssize
691g_buffered_input_stream_skip (GInputStream  *stream,
692                              gsize          count,
693                              GCancellable  *cancellable,
694                              GError       **error)
695{
696  GBufferedInputStream        *bstream;
697  GBufferedInputStreamPrivate *priv;
698  GBufferedInputStreamClass *class;
699  GInputStream *base_stream;
700  gsize available, bytes_skipped;
701  gssize nread;
702
703  bstream = G_BUFFERED_INPUT_STREAM (stream);
704  priv = bstream->priv;
705
706  available = priv->end - priv->pos;
707
708  if (count <= available)
709    {
710      priv->pos += count;
711      return count;
712    }
713
714  /* Full request not available, skip all currently available and
715   * request refill for more
716   */
717
718  priv->pos = 0;
719  priv->end = 0;
720  bytes_skipped = available;
721  count -= available;
722
723  if (bytes_skipped > 0)
724    error = NULL; /* Ignore further errors if we already read some data */
725
726  if (count > priv->len)
727    {
728      /* Large request, shortcut buffer */
729
730      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
731
732      nread = g_input_stream_skip (base_stream,
733                                   count,
734                                   cancellable,
735                                   error);
736
737      if (nread < 0 && bytes_skipped == 0)
738        return -1;
739
740      if (nread > 0)
741        bytes_skipped += nread;
742
743      return bytes_skipped;
744    }
745
746  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
747  nread = class->fill (bstream, priv->len, cancellable, error);
748
749  if (nread < 0)
750    {
751      if (bytes_skipped == 0)
752        return -1;
753      else
754        return bytes_skipped;
755    }
756
757  available = priv->end - priv->pos;
758  count = MIN (count, available);
759
760  bytes_skipped += count;
761  priv->pos += count;
762
763  return bytes_skipped;
764}
765
766static gssize
767g_buffered_input_stream_read (GInputStream *stream,
768                              void         *buffer,
769                              gsize         count,
770                              GCancellable *cancellable,
771                              GError      **error)
772{
773  GBufferedInputStream        *bstream;
774  GBufferedInputStreamPrivate *priv;
775  GBufferedInputStreamClass *class;
776  GInputStream *base_stream;
777  gsize available, bytes_read;
778  gssize nread;
779
780  bstream = G_BUFFERED_INPUT_STREAM (stream);
781  priv = bstream->priv;
782
783  available = priv->end - priv->pos;
784
785  if (count <= available)
786    {
787      memcpy (buffer, priv->buffer + priv->pos, count);
788      priv->pos += count;
789      return count;
790    }
791
792  /* Full request not available, read all currently availbile and request refill for more */
793
794  memcpy (buffer, priv->buffer + priv->pos, available);
795  priv->pos = 0;
796  priv->end = 0;
797  bytes_read = available;
798  count -= available;
799
800  if (bytes_read > 0)
801    error = NULL; /* Ignore further errors if we already read some data */
802
803  if (count > priv->len)
804    {
805      /* Large request, shortcut buffer */
806
807      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
808
809      nread = g_input_stream_read (base_stream,
810				   (char *)buffer + bytes_read,
811				   count,
812				   cancellable,
813				   error);
814
815      if (nread < 0 && bytes_read == 0)
816        return -1;
817
818      if (nread > 0)
819        bytes_read += nread;
820
821      return bytes_read;
822    }
823
824  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
825  nread = class->fill (bstream, priv->len, cancellable, error);
826  if (nread < 0)
827    {
828      if (bytes_read == 0)
829        return -1;
830      else
831        return bytes_read;
832    }
833
834  available = priv->end - priv->pos;
835  count = MIN (count, available);
836
837  memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
838  bytes_read += count;
839  priv->pos += count;
840
841  return bytes_read;
842}
843
844/**
845 * g_buffered_input_stream_read_byte:
846 * @stream: #GBufferedInputStream.
847 * @cancellable: optional #GCancellable object, %NULL to ignore.
848 * @error: location to store the error occuring, or %NULL to ignore.
849 *
850 * Tries to read a single byte from the stream or the buffer. Will block
851 * during this read.
852 *
853 * On success, the byte read from the stream is returned. On end of stream
854 * -1 is returned but it's not an exceptional error and @error is not set.
855 *
856 * If @cancellable is not %NULL, then the operation can be cancelled by
857 * triggering the cancellable object from another thread. If the operation
858 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
859 * operation was partially finished when the operation was cancelled the
860 * partial result will be returned, without an error.
861 *
862 * On error -1 is returned and @error is set accordingly.
863 *
864 * Returns: the byte read from the @stream, or -1 on end of stream or error.
865 **/
866int
867g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
868                                   GCancellable          *cancellable,
869                                   GError               **error)
870{
871  GBufferedInputStreamPrivate *priv;
872  GBufferedInputStreamClass *class;
873  GInputStream *input_stream;
874  gsize available;
875  gssize nread;
876
877  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
878
879  priv = stream->priv;
880  input_stream = G_INPUT_STREAM (stream);
881
882  if (g_input_stream_is_closed (input_stream))
883    {
884      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
885                           _("Stream is already closed"));
886      return -1;
887    }
888
889  if (!g_input_stream_set_pending (input_stream, error))
890    return -1;
891
892  available = priv->end - priv->pos;
893
894  if (available != 0)
895    {
896      g_input_stream_clear_pending (input_stream);
897      return priv->buffer[priv->pos++];
898    }
899
900  /* Byte not available, request refill for more */
901
902  if (cancellable)
903    g_cancellable_push_current (cancellable);
904
905  priv->pos = 0;
906  priv->end = 0;
907
908  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
909  nread = class->fill (stream, priv->len, cancellable, error);
910
911  if (cancellable)
912    g_cancellable_pop_current (cancellable);
913
914  g_input_stream_clear_pending (input_stream);
915
916  if (nread <= 0)
917    return -1; /* error or end of stream */
918
919  return priv->buffer[priv->pos++];
920}
921
922/* ************************** */
923/* Async stuff implementation */
924/* ************************** */
925
926static void
927fill_async_callback (GObject      *source_object,
928                     GAsyncResult *result,
929                     gpointer      user_data)
930{
931  GError *error;
932  gssize res;
933  GSimpleAsyncResult *simple;
934
935  simple = user_data;
936
937  error = NULL;
938  res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
939				    result, &error);
940
941  g_simple_async_result_set_op_res_gssize (simple, res);
942  if (res == -1)
943    {
944      g_simple_async_result_set_from_error (simple, error);
945      g_error_free (error);
946    }
947  else
948    {
949      GBufferedInputStreamPrivate *priv;
950      GObject *object;
951
952      object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
953      priv = G_BUFFERED_INPUT_STREAM (object)->priv;
954
955      g_assert_cmpint (priv->end + res, <=, priv->len);
956      priv->end += res;
957
958      g_object_unref (object);
959    }
960
961  /* Complete immediately, not in idle, since we're already in a mainloop callout */
962  g_simple_async_result_complete (simple);
963  g_object_unref (simple);
964}
965
966static void
967g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
968                                         gssize                count,
969                                         int                   io_priority,
970                                         GCancellable         *cancellable,
971                                         GAsyncReadyCallback   callback,
972                                         gpointer              user_data)
973{
974  GBufferedInputStreamPrivate *priv;
975  GInputStream *base_stream;
976  GSimpleAsyncResult *simple;
977  gsize in_buffer;
978
979  priv = stream->priv;
980
981  if (count == -1)
982    count = priv->len;
983
984  in_buffer = priv->end - priv->pos;
985
986  /* Never fill more than can fit in the buffer */
987  count = MIN (count, priv->len - in_buffer);
988
989  /* If requested length does not fit at end, compact */
990  if (priv->len - priv->end < count)
991    compact_buffer (stream);
992
993  simple = g_simple_async_result_new (G_OBJECT (stream),
994				      callback, user_data,
995				      g_buffered_input_stream_real_fill_async);
996
997  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
998  g_input_stream_read_async (base_stream,
999			     priv->buffer + priv->end,
1000			     count,
1001			     io_priority,
1002			     cancellable,
1003			     fill_async_callback,
1004			     simple);
1005}
1006
1007static gssize
1008g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1009					  GAsyncResult         *result,
1010					  GError              **error)
1011{
1012  GSimpleAsyncResult *simple;
1013  gssize nread;
1014
1015  simple = G_SIMPLE_ASYNC_RESULT (result);
1016  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1017
1018  nread = g_simple_async_result_get_op_res_gssize (simple);
1019  return nread;
1020}
1021
1022typedef struct {
1023  gssize bytes_read;
1024  gssize count;
1025  void *buffer;
1026} ReadAsyncData;
1027
1028static void
1029free_read_async_data (gpointer _data)
1030{
1031  ReadAsyncData *data = _data;
1032  g_slice_free (ReadAsyncData, data);
1033}
1034
1035static void
1036large_read_callback (GObject *source_object,
1037		     GAsyncResult *result,
1038		     gpointer user_data)
1039{
1040  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1041  ReadAsyncData *data;
1042  GError *error;
1043  gssize nread;
1044
1045  data = g_simple_async_result_get_op_res_gpointer (simple);
1046
1047  error = NULL;
1048  nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1049				      result, &error);
1050
1051  /* Only report the error if we've not already read some data */
1052  if (nread < 0 && data->bytes_read == 0)
1053    g_simple_async_result_set_from_error (simple, error);
1054
1055  if (nread > 0)
1056    data->bytes_read += nread;
1057
1058  if (error)
1059    g_error_free (error);
1060
1061  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1062  g_simple_async_result_complete (simple);
1063  g_object_unref (simple);
1064}
1065
1066static void
1067read_fill_buffer_callback (GObject *source_object,
1068			   GAsyncResult *result,
1069			   gpointer user_data)
1070{
1071  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1072  GBufferedInputStream *bstream;
1073  GBufferedInputStreamPrivate *priv;
1074  ReadAsyncData *data;
1075  GError *error;
1076  gssize nread;
1077  gsize available;
1078
1079  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1080  priv = bstream->priv;
1081
1082  data = g_simple_async_result_get_op_res_gpointer (simple);
1083
1084  error = NULL;
1085  nread = g_buffered_input_stream_fill_finish (bstream,
1086					       result, &error);
1087
1088  if (nread < 0 && data->bytes_read == 0)
1089    g_simple_async_result_set_from_error (simple, error);
1090
1091
1092  if (nread > 0)
1093    {
1094      available = priv->end - priv->pos;
1095      data->count = MIN (data->count, available);
1096
1097      memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1098      data->bytes_read += data->count;
1099      priv->pos += data->count;
1100    }
1101
1102  if (error)
1103    g_error_free (error);
1104
1105  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1106  g_simple_async_result_complete (simple);
1107  g_object_unref (simple);
1108}
1109
1110static void
1111g_buffered_input_stream_read_async (GInputStream              *stream,
1112                                    void                      *buffer,
1113                                    gsize                      count,
1114                                    int                        io_priority,
1115                                    GCancellable              *cancellable,
1116                                    GAsyncReadyCallback        callback,
1117                                    gpointer                   user_data)
1118{
1119  GBufferedInputStream *bstream;
1120  GBufferedInputStreamPrivate *priv;
1121  GBufferedInputStreamClass *class;
1122  GInputStream *base_stream;
1123  gsize available;
1124  GSimpleAsyncResult *simple;
1125  ReadAsyncData *data;
1126
1127  bstream = G_BUFFERED_INPUT_STREAM (stream);
1128  priv = bstream->priv;
1129
1130  data = g_slice_new (ReadAsyncData);
1131  data->buffer = buffer;
1132  data->bytes_read = 0;
1133  simple = g_simple_async_result_new (G_OBJECT (stream),
1134				      callback, user_data,
1135				      g_buffered_input_stream_read_async);
1136  g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1137
1138  available = priv->end - priv->pos;
1139
1140  if (count <= available)
1141    {
1142      memcpy (buffer, priv->buffer + priv->pos, count);
1143      priv->pos += count;
1144      data->bytes_read = count;
1145
1146      g_simple_async_result_complete_in_idle (simple);
1147      g_object_unref (simple);
1148      return;
1149    }
1150
1151
1152  /* Full request not available, read all currently availbile and request refill for more */
1153
1154  memcpy (buffer, priv->buffer + priv->pos, available);
1155  priv->pos = 0;
1156  priv->end = 0;
1157
1158  count -= available;
1159
1160  data->bytes_read = available;
1161  data->count = count;
1162
1163  if (count > priv->len)
1164    {
1165      /* Large request, shortcut buffer */
1166
1167      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1168
1169      g_input_stream_read_async (base_stream,
1170				 (char *)buffer + data->bytes_read,
1171				 count,
1172				 io_priority, cancellable,
1173				 large_read_callback,
1174				 simple);
1175    }
1176  else
1177    {
1178      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1179      class->fill_async (bstream, priv->len, io_priority, cancellable,
1180			 read_fill_buffer_callback, simple);
1181    }
1182}
1183
1184static gssize
1185g_buffered_input_stream_read_finish (GInputStream   *stream,
1186                                     GAsyncResult   *result,
1187                                     GError        **error)
1188{
1189  GSimpleAsyncResult *simple;
1190  ReadAsyncData *data;
1191
1192  simple = G_SIMPLE_ASYNC_RESULT (result);
1193
1194  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1195
1196  data = g_simple_async_result_get_op_res_gpointer (simple);
1197
1198  return data->bytes_read;
1199}
1200
1201typedef struct {
1202  gssize bytes_skipped;
1203  gssize count;
1204} SkipAsyncData;
1205
1206static void
1207free_skip_async_data (gpointer _data)
1208{
1209  SkipAsyncData *data = _data;
1210  g_slice_free (SkipAsyncData, data);
1211}
1212
1213static void
1214large_skip_callback (GObject *source_object,
1215		     GAsyncResult *result,
1216		     gpointer user_data)
1217{
1218  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1219  SkipAsyncData *data;
1220  GError *error;
1221  gssize nread;
1222
1223  data = g_simple_async_result_get_op_res_gpointer (simple);
1224
1225  error = NULL;
1226  nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1227				      result, &error);
1228
1229  /* Only report the error if we've not already read some data */
1230  if (nread < 0 && data->bytes_skipped == 0)
1231    g_simple_async_result_set_from_error (simple, error);
1232
1233  if (nread > 0)
1234    data->bytes_skipped += nread;
1235
1236  if (error)
1237    g_error_free (error);
1238
1239  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1240  g_simple_async_result_complete (simple);
1241  g_object_unref (simple);
1242}
1243
1244static void
1245skip_fill_buffer_callback (GObject *source_object,
1246			   GAsyncResult *result,
1247			   gpointer user_data)
1248{
1249  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1250  GBufferedInputStream *bstream;
1251  GBufferedInputStreamPrivate *priv;
1252  SkipAsyncData *data;
1253  GError *error;
1254  gssize nread;
1255  gsize available;
1256
1257  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1258  priv = bstream->priv;
1259
1260  data = g_simple_async_result_get_op_res_gpointer (simple);
1261
1262  error = NULL;
1263  nread = g_buffered_input_stream_fill_finish (bstream,
1264					       result, &error);
1265
1266  if (nread < 0 && data->bytes_skipped == 0)
1267    g_simple_async_result_set_from_error (simple, error);
1268
1269
1270  if (nread > 0)
1271    {
1272      available = priv->end - priv->pos;
1273      data->count = MIN (data->count, available);
1274
1275      data->bytes_skipped += data->count;
1276      priv->pos += data->count;
1277    }
1278
1279  if (error)
1280    g_error_free (error);
1281
1282  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1283  g_simple_async_result_complete (simple);
1284  g_object_unref (simple);
1285}
1286
1287static void
1288g_buffered_input_stream_skip_async (GInputStream              *stream,
1289                                    gsize                      count,
1290                                    int                        io_priority,
1291                                    GCancellable              *cancellable,
1292                                    GAsyncReadyCallback        callback,
1293                                    gpointer                   user_data)
1294{
1295  GBufferedInputStream *bstream;
1296  GBufferedInputStreamPrivate *priv;
1297  GBufferedInputStreamClass *class;
1298  GInputStream *base_stream;
1299  gsize available;
1300  GSimpleAsyncResult *simple;
1301  SkipAsyncData *data;
1302
1303  bstream = G_BUFFERED_INPUT_STREAM (stream);
1304  priv = bstream->priv;
1305
1306  data = g_slice_new (SkipAsyncData);
1307  data->bytes_skipped = 0;
1308  simple = g_simple_async_result_new (G_OBJECT (stream),
1309				      callback, user_data,
1310				      g_buffered_input_stream_skip_async);
1311  g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1312
1313  available = priv->end - priv->pos;
1314
1315  if (count <= available)
1316    {
1317      priv->pos += count;
1318      data->bytes_skipped = count;
1319
1320      g_simple_async_result_complete_in_idle (simple);
1321      g_object_unref (simple);
1322      return;
1323    }
1324
1325
1326  /* Full request not available, skip all currently availbile and request refill for more */
1327
1328  priv->pos = 0;
1329  priv->end = 0;
1330
1331  count -= available;
1332
1333  data->bytes_skipped = available;
1334  data->count = count;
1335
1336  if (count > priv->len)
1337    {
1338      /* Large request, shortcut buffer */
1339
1340      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1341
1342      g_input_stream_skip_async (base_stream,
1343				 count,
1344				 io_priority, cancellable,
1345				 large_skip_callback,
1346				 simple);
1347    }
1348  else
1349    {
1350      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1351      class->fill_async (bstream, priv->len, io_priority, cancellable,
1352			 skip_fill_buffer_callback, simple);
1353    }
1354}
1355
1356static gssize
1357g_buffered_input_stream_skip_finish (GInputStream   *stream,
1358                                     GAsyncResult   *result,
1359                                     GError        **error)
1360{
1361  GSimpleAsyncResult *simple;
1362  SkipAsyncData *data;
1363
1364  simple = G_SIMPLE_ASYNC_RESULT (result);
1365
1366  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1367
1368  data = g_simple_async_result_get_op_res_gpointer (simple);
1369
1370  return data->bytes_skipped;
1371}
1372
1373
1374#define __G_BUFFERED_INPUT_STREAM_C__
1375#include "gioaliasdef.c"
1376