gbufferedinputstream.c revision 3d93bf6968884d75dd2706ef85e2014305eb92f2
1/* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
20 *
21 * Author: Christian Kellner <gicmo@gnome.org>
22 */
23
24#include "config.h"
25#include "gbufferedinputstream.h"
26#include "ginputstream.h"
27#include "gcancellable.h"
28#include "gasyncresult.h"
29#include "gsimpleasyncresult.h"
30#include "gioerror.h"
31#include <string.h>
32#include "glibintl.h"
33
34#include "gioalias.h"
35
36/**
37 * SECTION:gbufferedinputstream
38 * @short_description: Buffered Input Stream
39 * @include: gio/gio.h
40 * @see_also: #GFilterInputStream, #GInputStream
41 *
42 * Buffered input stream implements #GFilterInputStream and provides
43 * for buffered reads.
44 *
45 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46 *
47 * To create a buffered input stream, use g_buffered_input_stream_new(),
48 * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49 * construction.
50 *
51 * To get the size of a buffer within a buffered input stream, use
52 * g_buffered_input_stream_get_buffer_size(). To change the size of a
53 * buffered input stream's buffer, use
54 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55 * cannot be reduced below the size of the data within the buffer.
56 *
57 **/
58
59
60
61#define DEFAULT_BUFFER_SIZE 4096
62
63struct _GBufferedInputStreamPrivate {
64  guint8 *buffer;
65  gsize   len;
66  gsize   pos;
67  gsize   end;
68  GAsyncReadyCallback outstanding_callback;
69};
70
71enum {
72  PROP_0,
73  PROP_BUFSIZE
74};
75
76static void g_buffered_input_stream_set_property  (GObject      *object,
77                                                   guint         prop_id,
78                                                   const GValue *value,
79                                                   GParamSpec   *pspec);
80
81static void g_buffered_input_stream_get_property  (GObject      *object,
82                                                   guint         prop_id,
83                                                   GValue       *value,
84                                                   GParamSpec   *pspec);
85static void g_buffered_input_stream_finalize      (GObject *object);
86
87
88static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
89							gsize                  count,
90							GCancellable          *cancellable,
91							GError               **error);
92static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
93							gsize                  count,
94							int                    io_priority,
95							GCancellable          *cancellable,
96							GAsyncReadyCallback    callback,
97							gpointer               user_data);
98static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
99							GAsyncResult          *result,
100							GError               **error);
101static gssize g_buffered_input_stream_read             (GInputStream          *stream,
102							void                  *buffer,
103							gsize                  count,
104							GCancellable          *cancellable,
105							GError               **error);
106static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
107							void                  *buffer,
108							gsize                  count,
109							int                    io_priority,
110							GCancellable          *cancellable,
111							GAsyncReadyCallback    callback,
112							gpointer               user_data);
113static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
114							GAsyncResult          *result,
115							GError               **error);
116static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
117							gssize                 count,
118							GCancellable          *cancellable,
119							GError               **error);
120static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
121							gssize                 count,
122							int                    io_priority,
123							GCancellable          *cancellable,
124							GAsyncReadyCallback    callback,
125							gpointer               user_data);
126static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
127							GAsyncResult          *result,
128							GError               **error);
129
130static void compact_buffer (GBufferedInputStream *stream);
131
132G_DEFINE_TYPE (GBufferedInputStream,
133               g_buffered_input_stream,
134               G_TYPE_FILTER_INPUT_STREAM)
135
136
137static void
138g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139{
140  GObjectClass *object_class;
141  GInputStreamClass *istream_class;
142  GBufferedInputStreamClass *bstream_class;
143
144  g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
145
146  object_class = G_OBJECT_CLASS (klass);
147  object_class->get_property = g_buffered_input_stream_get_property;
148  object_class->set_property = g_buffered_input_stream_set_property;
149  object_class->finalize     = g_buffered_input_stream_finalize;
150
151  istream_class = G_INPUT_STREAM_CLASS (klass);
152  istream_class->skip = g_buffered_input_stream_skip;
153  istream_class->skip_async  = g_buffered_input_stream_skip_async;
154  istream_class->skip_finish = g_buffered_input_stream_skip_finish;
155  istream_class->read_fn = g_buffered_input_stream_read;
156  istream_class->read_async  = g_buffered_input_stream_read_async;
157  istream_class->read_finish = g_buffered_input_stream_read_finish;
158
159  bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
160  bstream_class->fill = g_buffered_input_stream_real_fill;
161  bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
162  bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
163
164  g_object_class_install_property (object_class,
165                                   PROP_BUFSIZE,
166                                   g_param_spec_uint ("buffer-size",
167                                                      P_("Buffer Size"),
168                                                      P_("The size of the backend buffer"),
169                                                      1,
170                                                      G_MAXUINT,
171                                                      DEFAULT_BUFFER_SIZE,
172                                                      G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
173                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
174
175
176}
177
178/**
179 * g_buffered_input_stream_get_buffer_size:
180 * @stream: #GBufferedInputStream.
181 *
182 * Gets the size of the input buffer.
183 *
184 * Returns: the current buffer size.
185 **/
186gsize
187g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
188{
189  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
190
191  return stream->priv->len;
192}
193
194/**
195 * g_buffered_input_stream_set_buffer_size:
196 * @stream: #GBufferedInputStream.
197 * @size: a #gsize.
198 *
199 * Sets the size of the internal buffer of @stream to @size, or to the
200 * size of the contents of the buffer. The buffer can never be resized
201 * smaller than its current contents.
202 **/
203void
204g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
205                                         gsize                  size)
206{
207  GBufferedInputStreamPrivate *priv;
208  gsize in_buffer;
209  guint8 *buffer;
210
211  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
212
213  priv = stream->priv;
214
215  if (priv->len == size)
216    return;
217
218  if (priv->buffer)
219    {
220      in_buffer = priv->end - priv->pos;
221
222      /* Never resize smaller than current buffer contents */
223      size = MAX (size, in_buffer);
224
225      buffer = g_malloc (size);
226      memcpy (buffer, priv->buffer + priv->pos, in_buffer);
227      priv->len = size;
228      priv->pos = 0;
229      priv->end = in_buffer;
230      g_free (priv->buffer);
231      priv->buffer = buffer;
232    }
233  else
234    {
235      priv->len = size;
236      priv->pos = 0;
237      priv->end = 0;
238      priv->buffer = g_malloc (size);
239    }
240
241  g_object_notify (G_OBJECT (stream), "buffer-size");
242}
243
244static void
245g_buffered_input_stream_set_property (GObject      *object,
246                                      guint         prop_id,
247                                      const GValue *value,
248                                      GParamSpec   *pspec)
249{
250  GBufferedInputStreamPrivate *priv;
251  GBufferedInputStream        *bstream;
252
253  bstream = G_BUFFERED_INPUT_STREAM (object);
254  priv = bstream->priv;
255
256  switch (prop_id)
257    {
258    case PROP_BUFSIZE:
259      g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
260      break;
261
262    default:
263      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264      break;
265    }
266
267}
268
269static void
270g_buffered_input_stream_get_property (GObject    *object,
271                                      guint       prop_id,
272                                      GValue     *value,
273                                      GParamSpec *pspec)
274{
275  GBufferedInputStreamPrivate *priv;
276  GBufferedInputStream        *bstream;
277
278  bstream = G_BUFFERED_INPUT_STREAM (object);
279  priv = bstream->priv;
280
281  switch (prop_id)
282    {
283    case PROP_BUFSIZE:
284      g_value_set_uint (value, priv->len);
285      break;
286
287    default:
288      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
289      break;
290    }
291}
292
293static void
294g_buffered_input_stream_finalize (GObject *object)
295{
296  GBufferedInputStreamPrivate *priv;
297  GBufferedInputStream        *stream;
298
299  stream = G_BUFFERED_INPUT_STREAM (object);
300  priv = stream->priv;
301
302  g_free (priv->buffer);
303
304  G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
305}
306
307static void
308g_buffered_input_stream_init (GBufferedInputStream *stream)
309{
310  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
311                                              G_TYPE_BUFFERED_INPUT_STREAM,
312                                              GBufferedInputStreamPrivate);
313}
314
315
316/**
317 * g_buffered_input_stream_new:
318 * @base_stream: a #GInputStream.
319 *
320 * Creates a new #GInputStream from the given @base_stream, with
321 * a buffer set to the default size (4 kilobytes).
322 *
323 * Returns: a #GInputStream for the given @base_stream.
324 **/
325GInputStream *
326g_buffered_input_stream_new (GInputStream *base_stream)
327{
328  GInputStream *stream;
329
330  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
331
332  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
333                         "base-stream", base_stream,
334                         NULL);
335
336  return stream;
337}
338
339/**
340 * g_buffered_input_stream_new_sized:
341 * @base_stream: a #GInputStream.
342 * @size: a #gsize.
343 *
344 * Creates a new #GBufferedInputStream from the given @base_stream,
345 * with a buffer set to @size.
346 *
347 * Returns: a #GInputStream.
348 **/
349GInputStream *
350g_buffered_input_stream_new_sized (GInputStream *base_stream,
351                                   gsize         size)
352{
353  GInputStream *stream;
354
355  g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
356
357  stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
358                         "base-stream", base_stream,
359                         "buffer-size", (guint)size,
360                         NULL);
361
362  return stream;
363}
364
365/**
366 * g_buffered_input_stream_fill:
367 * @stream: #GBufferedInputStream.
368 * @count: the number of bytes that will be read from the stream.
369 * @cancellable: optional #GCancellable object, %NULL to ignore.
370 * @error: location to store the error occuring, or %NULL to ignore.
371 *
372 * Tries to read @count bytes from the stream into the buffer.
373 * Will block during this read.
374 *
375 * If @count is zero, returns zero and does nothing. A value of @count
376 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
377 *
378 * On success, the number of bytes read into the buffer is returned.
379 * It is not an error if this is not the same as the requested size, as it
380 * can happen e.g. near the end of a file. Zero is returned on end of file
381 * (or if @count is zero),  but never otherwise.
382 *
383 * If @cancellable is not %NULL, then the operation can be cancelled by
384 * triggering the cancellable object from another thread. If the operation
385 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
386 * operation was partially finished when the operation was cancelled the
387 * partial result will be returned, without an error.
388 *
389 * On error -1 is returned and @error is set accordingly.
390 *
391 * For the asynchronous, non-blocking, version of this function, see
392 * g_buffered_input_stream_fill_async().
393 *
394 * Returns: the number of bytes read into @stream's buffer, up to @count,
395 *     or -1 on error.
396 **/
397gssize
398g_buffered_input_stream_fill (GBufferedInputStream  *stream,
399                              gssize                 count,
400                              GCancellable          *cancellable,
401                              GError               **error)
402{
403  GBufferedInputStreamClass *class;
404  GInputStream *input_stream;
405  gssize res;
406
407  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
408
409  input_stream = G_INPUT_STREAM (stream);
410
411  if (!g_input_stream_set_pending (input_stream, error))
412    return -1;
413
414  if (cancellable)
415    g_cancellable_push_current (cancellable);
416
417  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
418  res = class->fill (stream, count, cancellable, error);
419
420  if (cancellable)
421    g_cancellable_pop_current (cancellable);
422
423  g_input_stream_clear_pending (input_stream);
424
425  return res;
426}
427
428static void
429async_fill_callback_wrapper (GObject      *source_object,
430                             GAsyncResult *res,
431                             gpointer      user_data)
432{
433  GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
434
435  g_input_stream_clear_pending (G_INPUT_STREAM (stream));
436  (*stream->priv->outstanding_callback) (source_object, res, user_data);
437  g_object_unref (stream);
438}
439
440/**
441 * g_buffered_input_stream_fill_async:
442 * @stream: #GBufferedInputStream.
443 * @count: a #gssize.
444 * @io_priority: the <link linkend="io-priority">I/O priority</link>
445 *     of the request.
446 * @cancellable: optional #GCancellable object
447 * @callback: a #GAsyncReadyCallback.
448 * @user_data: a #gpointer.
449 *
450 * Reads data into @stream's buffer asynchronously, up to @count size.
451 * @io_priority can be used to prioritize reads. For the synchronous
452 * version of this function, see g_buffered_input_stream_fill().
453 **/
454void
455g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
456                                    gssize                count,
457                                    int                   io_priority,
458                                    GCancellable         *cancellable,
459                                    GAsyncReadyCallback   callback,
460                                    gpointer              user_data)
461{
462  GBufferedInputStreamClass *class;
463  GSimpleAsyncResult *simple;
464  GError *error = NULL;
465
466  g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
467
468  if (count == 0)
469    {
470      simple = g_simple_async_result_new (G_OBJECT (stream),
471					  callback,
472					  user_data,
473					  g_buffered_input_stream_fill_async);
474      g_simple_async_result_complete_in_idle (simple);
475      g_object_unref (simple);
476      return;
477    }
478
479  if (((gssize) count) < 0)
480    {
481      g_simple_async_report_error_in_idle (G_OBJECT (stream),
482					   callback,
483					   user_data,
484					   G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
485					   _("Too large count value passed to %s"),
486					   G_STRFUNC);
487      return;
488    }
489
490  if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
491    {
492      g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
493					    callback,
494					    user_data,
495					    error);
496      g_error_free (error);
497      return;
498    }
499
500  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
501
502  stream->priv->outstanding_callback = callback;
503  g_object_ref (stream);
504  class->fill_async (stream, count, io_priority, cancellable,
505                     async_fill_callback_wrapper, user_data);
506}
507
508/**
509 * g_buffered_input_stream_fill_finish:
510 * @stream: a #GBufferedInputStream.
511 * @result: a #GAsyncResult.
512 * @error: a #GError.
513 *
514 * Finishes an asynchronous read.
515 *
516 * Returns: a #gssize of the read stream, or %-1 on an error.
517 **/
518gssize
519g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
520                                     GAsyncResult          *result,
521                                     GError               **error)
522{
523  GSimpleAsyncResult *simple;
524  GBufferedInputStreamClass *class;
525
526  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
527  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
528
529  if (G_IS_SIMPLE_ASYNC_RESULT (result))
530    {
531      simple = G_SIMPLE_ASYNC_RESULT (result);
532      if (g_simple_async_result_propagate_error (simple, error))
533        return -1;
534
535      /* Special case read of 0 bytes */
536      if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
537        return 0;
538    }
539
540  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
541  return class->fill_finish (stream, result, error);
542}
543
544/**
545 * g_buffered_input_stream_get_available:
546 * @stream: #GBufferedInputStream.
547 *
548 * Gets the size of the available data within the stream.
549 *
550 * Returns: size of the available stream.
551 **/
552gsize
553g_buffered_input_stream_get_available (GBufferedInputStream *stream)
554{
555  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
556
557  return stream->priv->end - stream->priv->pos;
558}
559
560/**
561 * g_buffered_input_stream_peek:
562 * @stream: a #GBufferedInputStream.
563 * @buffer: a pointer to an allocated chunk of memory.
564 * @offset: a #gsize.
565 * @count: a #gsize.
566 *
567 * Peeks in the buffer, copying data of size @count into @buffer,
568 * offset @offset bytes.
569 *
570 * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
571 **/
572gsize
573g_buffered_input_stream_peek (GBufferedInputStream *stream,
574                              void                 *buffer,
575                              gsize                 offset,
576                              gsize                 count)
577{
578  gsize available;
579  gsize end;
580
581  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
582  g_return_val_if_fail (buffer != NULL, -1);
583
584  available = g_buffered_input_stream_get_available (stream);
585
586  if (offset > available)
587    return 0;
588
589  end = MIN (offset + count, available);
590  count = end - offset;
591
592  memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
593  return count;
594}
595
596/**
597 * g_buffered_input_stream_peek_buffer:
598 * @stream: a #GBufferedInputStream.
599 * @count: a #gsize to get the number of bytes available in the buffer.
600 *
601 * Returns the buffer with the currently available bytes. The returned
602 * buffer must not be modified and will become invalid when reading from
603 * the stream or filling the buffer.
604 *
605 * Returns: read-only buffer
606 **/
607const void*
608g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
609                                     gsize                *count)
610{
611  GBufferedInputStreamPrivate *priv;
612
613  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
614
615  priv = stream->priv;
616
617  if (count)
618    *count = priv->end - priv->pos;
619
620  return priv->buffer + priv->pos;
621}
622
623static void
624compact_buffer (GBufferedInputStream *stream)
625{
626  GBufferedInputStreamPrivate *priv;
627  gsize current_size;
628
629  priv = stream->priv;
630
631  current_size = priv->end - priv->pos;
632
633  g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
634
635  priv->pos = 0;
636  priv->end = current_size;
637}
638
639static gssize
640g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
641                                   gssize                 count,
642                                   GCancellable          *cancellable,
643                                   GError               **error)
644{
645  GBufferedInputStreamPrivate *priv;
646  GInputStream *base_stream;
647  gssize nread;
648  gsize in_buffer;
649
650  priv = stream->priv;
651
652  if (count == -1)
653    count = priv->len;
654
655  in_buffer = priv->end - priv->pos;
656
657  /* Never fill more than can fit in the buffer */
658  count = MIN (count, priv->len - in_buffer);
659
660  /* If requested length does not fit at end, compact */
661  if (priv->len - priv->end < count)
662    compact_buffer (stream);
663
664  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
665  nread = g_input_stream_read (base_stream,
666                               priv->buffer + priv->end,
667                               count,
668                               cancellable,
669                               error);
670
671  if (nread > 0)
672    priv->end += nread;
673
674  return nread;
675}
676
677static gssize
678g_buffered_input_stream_skip (GInputStream  *stream,
679                              gsize          count,
680                              GCancellable  *cancellable,
681                              GError       **error)
682{
683  GBufferedInputStream        *bstream;
684  GBufferedInputStreamPrivate *priv;
685  GBufferedInputStreamClass *class;
686  GInputStream *base_stream;
687  gsize available, bytes_skipped;
688  gssize nread;
689
690  bstream = G_BUFFERED_INPUT_STREAM (stream);
691  priv = bstream->priv;
692
693  available = priv->end - priv->pos;
694
695  if (count <= available)
696    {
697      priv->pos += count;
698      return count;
699    }
700
701  /* Full request not available, skip all currently available and
702   * request refill for more
703   */
704
705  priv->pos = 0;
706  priv->end = 0;
707  bytes_skipped = available;
708  count -= available;
709
710  if (bytes_skipped > 0)
711    error = NULL; /* Ignore further errors if we already read some data */
712
713  if (count > priv->len)
714    {
715      /* Large request, shortcut buffer */
716
717      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
718
719      nread = g_input_stream_skip (base_stream,
720                                   count,
721                                   cancellable,
722                                   error);
723
724      if (nread < 0 && bytes_skipped == 0)
725        return -1;
726
727      if (nread > 0)
728        bytes_skipped += nread;
729
730      return bytes_skipped;
731    }
732
733  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
734  nread = class->fill (bstream, priv->len, cancellable, error);
735
736  if (nread < 0)
737    {
738      if (bytes_skipped == 0)
739        return -1;
740      else
741        return bytes_skipped;
742    }
743
744  available = priv->end - priv->pos;
745  count = MIN (count, available);
746
747  bytes_skipped += count;
748  priv->pos += count;
749
750  return bytes_skipped;
751}
752
753static gssize
754g_buffered_input_stream_read (GInputStream *stream,
755                              void         *buffer,
756                              gsize         count,
757                              GCancellable *cancellable,
758                              GError      **error)
759{
760  GBufferedInputStream        *bstream;
761  GBufferedInputStreamPrivate *priv;
762  GBufferedInputStreamClass *class;
763  GInputStream *base_stream;
764  gsize available, bytes_read;
765  gssize nread;
766
767  bstream = G_BUFFERED_INPUT_STREAM (stream);
768  priv = bstream->priv;
769
770  available = priv->end - priv->pos;
771
772  if (count <= available)
773    {
774      memcpy (buffer, priv->buffer + priv->pos, count);
775      priv->pos += count;
776      return count;
777    }
778
779  /* Full request not available, read all currently availbile and request refill for more */
780
781  memcpy (buffer, priv->buffer + priv->pos, available);
782  priv->pos = 0;
783  priv->end = 0;
784  bytes_read = available;
785  count -= available;
786
787  if (bytes_read > 0)
788    error = NULL; /* Ignore further errors if we already read some data */
789
790  if (count > priv->len)
791    {
792      /* Large request, shortcut buffer */
793
794      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
795
796      nread = g_input_stream_read (base_stream,
797				   (char *)buffer + bytes_read,
798				   count,
799				   cancellable,
800				   error);
801
802      if (nread < 0 && bytes_read == 0)
803        return -1;
804
805      if (nread > 0)
806        bytes_read += nread;
807
808      return bytes_read;
809    }
810
811  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
812  nread = class->fill (bstream, priv->len, cancellable, error);
813  if (nread < 0)
814    {
815      if (bytes_read == 0)
816        return -1;
817      else
818        return bytes_read;
819    }
820
821  available = priv->end - priv->pos;
822  count = MIN (count, available);
823
824  memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
825  bytes_read += count;
826  priv->pos += count;
827
828  return bytes_read;
829}
830
831/**
832 * g_buffered_input_stream_read_byte:
833 * @stream: #GBufferedInputStream.
834 * @cancellable: optional #GCancellable object, %NULL to ignore.
835 * @error: location to store the error occuring, or %NULL to ignore.
836 *
837 * Tries to read a single byte from the stream or the buffer. Will block
838 * during this read.
839 *
840 * On success, the byte read from the stream is returned. On end of stream
841 * -1 is returned but it's not an exceptional error and @error is not set.
842 *
843 * If @cancellable is not %NULL, then the operation can be cancelled by
844 * triggering the cancellable object from another thread. If the operation
845 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
846 * operation was partially finished when the operation was cancelled the
847 * partial result will be returned, without an error.
848 *
849 * On error -1 is returned and @error is set accordingly.
850 *
851 * Returns: the byte read from the @stream, or -1 on end of stream or error.
852 **/
853int
854g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
855                                   GCancellable          *cancellable,
856                                   GError               **error)
857{
858  GBufferedInputStreamPrivate *priv;
859  GBufferedInputStreamClass *class;
860  GInputStream *input_stream;
861  gsize available;
862  gssize nread;
863
864  g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
865
866  priv = stream->priv;
867  input_stream = G_INPUT_STREAM (stream);
868
869  if (g_input_stream_is_closed (input_stream))
870    {
871      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
872                           _("Stream is already closed"));
873      return -1;
874    }
875
876  if (!g_input_stream_set_pending (input_stream, error))
877    return -1;
878
879  available = priv->end - priv->pos;
880
881  if (available < 1)
882    {
883      g_input_stream_clear_pending (input_stream);
884      return priv->buffer[priv->pos++];
885    }
886
887  /* Byte not available, request refill for more */
888
889  if (cancellable)
890    g_cancellable_push_current (cancellable);
891
892  priv->pos = 0;
893  priv->end = 0;
894
895  class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
896  nread = class->fill (stream, priv->len, cancellable, error);
897
898  if (cancellable)
899    g_cancellable_pop_current (cancellable);
900
901  g_input_stream_clear_pending (input_stream);
902
903  if (nread <= 0)
904    return -1; /* error or end of stream */
905
906  return priv->buffer[priv->pos++];
907}
908
909/* ************************** */
910/* Async stuff implementation */
911/* ************************** */
912
913static void
914fill_async_callback (GObject      *source_object,
915                     GAsyncResult *result,
916                     gpointer      user_data)
917{
918  GError *error;
919  gssize res;
920  GSimpleAsyncResult *simple;
921
922  simple = user_data;
923
924  error = NULL;
925  res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
926				    result, &error);
927
928  g_simple_async_result_set_op_res_gssize (simple, res);
929  if (res == -1)
930    {
931      g_simple_async_result_set_from_error (simple, error);
932      g_error_free (error);
933    }
934
935  /* Complete immediately, not in idle, since we're already in a mainloop callout */
936  g_simple_async_result_complete (simple);
937  g_object_unref (simple);
938}
939
940static void
941g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
942                                         gssize                count,
943                                         int                   io_priority,
944                                         GCancellable         *cancellable,
945                                         GAsyncReadyCallback   callback,
946                                         gpointer              user_data)
947{
948  GBufferedInputStreamPrivate *priv;
949  GInputStream *base_stream;
950  GSimpleAsyncResult *simple;
951  gsize in_buffer;
952
953  priv = stream->priv;
954
955  if (count == -1)
956    count = priv->len;
957
958  in_buffer = priv->end - priv->pos;
959
960  /* Never fill more than can fit in the buffer */
961  count = MIN (count, priv->len - in_buffer);
962
963  /* If requested length does not fit at end, compact */
964  if (priv->len - priv->end < count)
965    compact_buffer (stream);
966
967  simple = g_simple_async_result_new (G_OBJECT (stream),
968				      callback, user_data,
969				      g_buffered_input_stream_real_fill_async);
970
971  base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
972  g_input_stream_read_async (base_stream,
973			     priv->buffer + priv->end,
974			     count,
975			     io_priority,
976			     cancellable,
977			     fill_async_callback,
978			     simple);
979}
980
981static gssize
982g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
983					  GAsyncResult         *result,
984					  GError              **error)
985{
986  GSimpleAsyncResult *simple;
987  gssize nread;
988
989  simple = G_SIMPLE_ASYNC_RESULT (result);
990  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
991
992  nread = g_simple_async_result_get_op_res_gssize (simple);
993  return nread;
994}
995
996typedef struct {
997  gssize bytes_read;
998  gssize count;
999  void *buffer;
1000} ReadAsyncData;
1001
1002static void
1003free_read_async_data (gpointer _data)
1004{
1005  ReadAsyncData *data = _data;
1006  g_slice_free (ReadAsyncData, data);
1007}
1008
1009static void
1010large_read_callback (GObject *source_object,
1011		     GAsyncResult *result,
1012		     gpointer user_data)
1013{
1014  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1015  ReadAsyncData *data;
1016  GError *error;
1017  gssize nread;
1018
1019  data = g_simple_async_result_get_op_res_gpointer (simple);
1020
1021  error = NULL;
1022  nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1023				      result, &error);
1024
1025  /* Only report the error if we've not already read some data */
1026  if (nread < 0 && data->bytes_read == 0)
1027    g_simple_async_result_set_from_error (simple, error);
1028
1029  if (nread > 0)
1030    data->bytes_read += nread;
1031
1032  if (error)
1033    g_error_free (error);
1034
1035  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1036  g_simple_async_result_complete (simple);
1037  g_object_unref (simple);
1038}
1039
1040static void
1041read_fill_buffer_callback (GObject *source_object,
1042			   GAsyncResult *result,
1043			   gpointer user_data)
1044{
1045  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1046  GBufferedInputStream *bstream;
1047  GBufferedInputStreamPrivate *priv;
1048  ReadAsyncData *data;
1049  GError *error;
1050  gssize nread;
1051  gsize available;
1052
1053  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1054  priv = bstream->priv;
1055
1056  data = g_simple_async_result_get_op_res_gpointer (simple);
1057
1058  error = NULL;
1059  nread = g_buffered_input_stream_fill_finish (bstream,
1060					       result, &error);
1061
1062  if (nread < 0 && data->bytes_read == 0)
1063    g_simple_async_result_set_from_error (simple, error);
1064
1065
1066  if (nread > 0)
1067    {
1068      available = priv->end - priv->pos;
1069      data->count = MIN (data->count, available);
1070
1071      memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1072      data->bytes_read += data->count;
1073      priv->pos += data->count;
1074    }
1075
1076  if (error)
1077    g_error_free (error);
1078
1079  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1080  g_simple_async_result_complete (simple);
1081  g_object_unref (simple);
1082}
1083
1084static void
1085g_buffered_input_stream_read_async (GInputStream              *stream,
1086                                    void                      *buffer,
1087                                    gsize                      count,
1088                                    int                        io_priority,
1089                                    GCancellable              *cancellable,
1090                                    GAsyncReadyCallback        callback,
1091                                    gpointer                   user_data)
1092{
1093  GBufferedInputStream *bstream;
1094  GBufferedInputStreamPrivate *priv;
1095  GBufferedInputStreamClass *class;
1096  GInputStream *base_stream;
1097  gsize available;
1098  GSimpleAsyncResult *simple;
1099  ReadAsyncData *data;
1100
1101  bstream = G_BUFFERED_INPUT_STREAM (stream);
1102  priv = bstream->priv;
1103
1104  data = g_slice_new (ReadAsyncData);
1105  data->buffer = buffer;
1106  data->bytes_read = 0;
1107  simple = g_simple_async_result_new (G_OBJECT (stream),
1108				      callback, user_data,
1109				      g_buffered_input_stream_read_async);
1110  g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1111
1112  available = priv->end - priv->pos;
1113
1114  if (count <= available)
1115    {
1116      memcpy (buffer, priv->buffer + priv->pos, count);
1117      priv->pos += count;
1118      data->bytes_read = count;
1119
1120      g_simple_async_result_complete_in_idle (simple);
1121      g_object_unref (simple);
1122      return;
1123    }
1124
1125
1126  /* Full request not available, read all currently availbile and request refill for more */
1127
1128  memcpy (buffer, priv->buffer + priv->pos, available);
1129  priv->pos = 0;
1130  priv->end = 0;
1131
1132  count -= available;
1133
1134  data->bytes_read = available;
1135  data->count = count;
1136
1137  if (count > priv->len)
1138    {
1139      /* Large request, shortcut buffer */
1140
1141      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1142
1143      g_input_stream_read_async (base_stream,
1144				 (char *)buffer + data->bytes_read,
1145				 count,
1146				 io_priority, cancellable,
1147				 large_read_callback,
1148				 simple);
1149    }
1150  else
1151    {
1152      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1153      class->fill_async (bstream, priv->len, io_priority, cancellable,
1154			 read_fill_buffer_callback, simple);
1155    }
1156}
1157
1158static gssize
1159g_buffered_input_stream_read_finish (GInputStream   *stream,
1160                                     GAsyncResult   *result,
1161                                     GError        **error)
1162{
1163  GSimpleAsyncResult *simple;
1164  ReadAsyncData *data;
1165
1166  simple = G_SIMPLE_ASYNC_RESULT (result);
1167
1168  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1169
1170  data = g_simple_async_result_get_op_res_gpointer (simple);
1171
1172  return data->bytes_read;
1173}
1174
1175typedef struct {
1176  gssize bytes_skipped;
1177  gssize count;
1178} SkipAsyncData;
1179
1180static void
1181free_skip_async_data (gpointer _data)
1182{
1183  SkipAsyncData *data = _data;
1184  g_slice_free (SkipAsyncData, data);
1185}
1186
1187static void
1188large_skip_callback (GObject *source_object,
1189		     GAsyncResult *result,
1190		     gpointer user_data)
1191{
1192  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1193  SkipAsyncData *data;
1194  GError *error;
1195  gssize nread;
1196
1197  data = g_simple_async_result_get_op_res_gpointer (simple);
1198
1199  error = NULL;
1200  nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1201				      result, &error);
1202
1203  /* Only report the error if we've not already read some data */
1204  if (nread < 0 && data->bytes_skipped == 0)
1205    g_simple_async_result_set_from_error (simple, error);
1206
1207  if (nread > 0)
1208    data->bytes_skipped += nread;
1209
1210  if (error)
1211    g_error_free (error);
1212
1213  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1214  g_simple_async_result_complete (simple);
1215  g_object_unref (simple);
1216}
1217
1218static void
1219skip_fill_buffer_callback (GObject *source_object,
1220			   GAsyncResult *result,
1221			   gpointer user_data)
1222{
1223  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1224  GBufferedInputStream *bstream;
1225  GBufferedInputStreamPrivate *priv;
1226  SkipAsyncData *data;
1227  GError *error;
1228  gssize nread;
1229  gsize available;
1230
1231  bstream = G_BUFFERED_INPUT_STREAM (source_object);
1232  priv = bstream->priv;
1233
1234  data = g_simple_async_result_get_op_res_gpointer (simple);
1235
1236  error = NULL;
1237  nread = g_buffered_input_stream_fill_finish (bstream,
1238					       result, &error);
1239
1240  if (nread < 0 && data->bytes_skipped == 0)
1241    g_simple_async_result_set_from_error (simple, error);
1242
1243
1244  if (nread > 0)
1245    {
1246      available = priv->end - priv->pos;
1247      data->count = MIN (data->count, available);
1248
1249      data->bytes_skipped += data->count;
1250      priv->pos += data->count;
1251    }
1252
1253  if (error)
1254    g_error_free (error);
1255
1256  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1257  g_simple_async_result_complete (simple);
1258  g_object_unref (simple);
1259}
1260
1261static void
1262g_buffered_input_stream_skip_async (GInputStream              *stream,
1263                                    gsize                      count,
1264                                    int                        io_priority,
1265                                    GCancellable              *cancellable,
1266                                    GAsyncReadyCallback        callback,
1267                                    gpointer                   user_data)
1268{
1269  GBufferedInputStream *bstream;
1270  GBufferedInputStreamPrivate *priv;
1271  GBufferedInputStreamClass *class;
1272  GInputStream *base_stream;
1273  gsize available;
1274  GSimpleAsyncResult *simple;
1275  SkipAsyncData *data;
1276
1277  bstream = G_BUFFERED_INPUT_STREAM (stream);
1278  priv = bstream->priv;
1279
1280  data = g_slice_new (SkipAsyncData);
1281  data->bytes_skipped = 0;
1282  simple = g_simple_async_result_new (G_OBJECT (stream),
1283				      callback, user_data,
1284				      g_buffered_input_stream_skip_async);
1285  g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1286
1287  available = priv->end - priv->pos;
1288
1289  if (count <= available)
1290    {
1291      priv->pos += count;
1292      data->bytes_skipped = count;
1293
1294      g_simple_async_result_complete_in_idle (simple);
1295      g_object_unref (simple);
1296      return;
1297    }
1298
1299
1300  /* Full request not available, skip all currently availbile and request refill for more */
1301
1302  priv->pos = 0;
1303  priv->end = 0;
1304
1305  count -= available;
1306
1307  data->bytes_skipped = available;
1308  data->count = count;
1309
1310  if (count > priv->len)
1311    {
1312      /* Large request, shortcut buffer */
1313
1314      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1315
1316      g_input_stream_skip_async (base_stream,
1317				 count,
1318				 io_priority, cancellable,
1319				 large_skip_callback,
1320				 simple);
1321    }
1322  else
1323    {
1324      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1325      class->fill_async (bstream, priv->len, io_priority, cancellable,
1326			 skip_fill_buffer_callback, simple);
1327    }
1328}
1329
1330static gssize
1331g_buffered_input_stream_skip_finish (GInputStream   *stream,
1332                                     GAsyncResult   *result,
1333                                     GError        **error)
1334{
1335  GSimpleAsyncResult *simple;
1336  SkipAsyncData *data;
1337
1338  simple = G_SIMPLE_ASYNC_RESULT (result);
1339
1340  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1341
1342  data = g_simple_async_result_get_op_res_gpointer (simple);
1343
1344  return data->bytes_skipped;
1345}
1346
1347
1348#define __G_BUFFERED_INPUT_STREAM_C__
1349#include "gioaliasdef.c"
1350