1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.io;
19
20import java.util.Arrays;
21
22/**
23 * Wraps an existing {@link InputStream} and <em>buffers</em> the input.
24 * Expensive interaction with the underlying input stream is minimized, since
25 * most (smaller) requests can be satisfied by accessing the buffer alone. The
26 * drawback is that some extra space is required to hold the buffer and that
27 * copying takes place when filling that buffer, but this is usually outweighed
28 * by the performance benefits.
29 *
30 * <p/>A typical application pattern for the class looks like this:<p/>
31 *
32 * <pre>
33 * BufferedInputStream buf = new BufferedInputStream(new FileInputStream(&quot;file.java&quot;));
34 * </pre>
35 *
36 * @see BufferedOutputStream
37 */
38public class BufferedInputStream extends FilterInputStream {
39    /**
40     * The buffer containing the current bytes read from the target InputStream.
41     */
42    protected volatile byte[] buf;
43
44    /**
45     * The total number of bytes inside the byte array {@code buf}.
46     */
47    protected int count;
48
49    /**
50     * The current limit, which when passed, invalidates the current mark.
51     */
52    protected int marklimit;
53
54    /**
55     * The currently marked position. -1 indicates no mark has been set or the
56     * mark has been invalidated.
57     */
58    protected int markpos = -1;
59
60    /**
61     * The current position within the byte array {@code buf}.
62     */
63    protected int pos;
64
65    /**
66     * Constructs a new {@code BufferedInputStream}, providing {@code in} with a buffer
67     * of 8192 bytes.
68     *
69     * <p><strong>Warning:</strong> passing a null source creates a closed
70     * {@code BufferedInputStream}. All read operations on such a stream will
71     * fail with an IOException.
72     *
73     * @param in the {@code InputStream} the buffer reads from.
74     */
75    public BufferedInputStream(InputStream in) {
76        this(in, 8192);
77    }
78
79    /**
80     * Constructs a new {@code BufferedInputStream}, providing {@code in} with {@code size} bytes
81     * of buffer.
82     *
83     * <p><strong>Warning:</strong> passing a null source creates a closed
84     * {@code BufferedInputStream}. All read operations on such a stream will
85     * fail with an IOException.
86     *
87     * @param in the {@code InputStream} the buffer reads from.
88     * @param size the size of buffer in bytes.
89     * @throws IllegalArgumentException if {@code size <= 0}.
90     */
91    public BufferedInputStream(InputStream in, int size) {
92        super(in);
93        if (size <= 0) {
94            throw new IllegalArgumentException("size <= 0");
95        }
96        buf = new byte[size];
97    }
98
99    /**
100     * Returns an estimated number of bytes that can be read or skipped without blocking for more
101     * input. This method returns the number of bytes available in the buffer
102     * plus those available in the source stream, but see {@link InputStream#available} for
103     * important caveats.
104     *
105     * @return the estimated number of bytes available
106     * @throws IOException if this stream is closed or an error occurs
107     */
108    @Override
109    public synchronized int available() throws IOException {
110        InputStream localIn = in; // 'in' could be invalidated by close()
111        if (buf == null || localIn == null) {
112            throw streamClosed();
113        }
114        return count - pos + localIn.available();
115    }
116
117    private IOException streamClosed() throws IOException {
118        throw new IOException("BufferedInputStream is closed");
119    }
120
121    /**
122     * Closes this stream. The source stream is closed and any resources
123     * associated with it are released.
124     *
125     * @throws IOException
126     *             if an error occurs while closing this stream.
127     */
128    @Override
129    public void close() throws IOException {
130        buf = null;
131        InputStream localIn = in;
132        in = null;
133        if (localIn != null) {
134            localIn.close();
135        }
136    }
137
138    private int fillbuf(InputStream localIn, byte[] localBuf)
139            throws IOException {
140        if (markpos == -1 || (pos - markpos >= marklimit)) {
141            /* Mark position not set or exceeded readlimit */
142            int result = localIn.read(localBuf);
143            if (result > 0) {
144                markpos = -1;
145                pos = 0;
146                count = result == -1 ? 0 : result;
147            }
148            return result;
149        }
150        if (markpos == 0 && marklimit > localBuf.length) {
151            /* Increase buffer size to accommodate the readlimit */
152            int newLength = localBuf.length * 2;
153            if (newLength > marklimit) {
154                newLength = marklimit;
155            }
156            byte[] newbuf = new byte[newLength];
157            System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
158            // Reassign buf, which will invalidate any local references
159            // FIXME: what if buf was null?
160            localBuf = buf = newbuf;
161        } else if (markpos > 0) {
162            System.arraycopy(localBuf, markpos, localBuf, 0, localBuf.length
163                    - markpos);
164        }
165        /* Set the new position and mark position */
166        pos -= markpos;
167        count = markpos = 0;
168        int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
169        count = bytesread <= 0 ? pos : pos + bytesread;
170        return bytesread;
171    }
172
173    /**
174     * Sets a mark position in this stream. The parameter {@code readlimit}
175     * indicates how many bytes can be read before a mark is invalidated.
176     * Calling {@code reset()} will reposition the stream back to the marked
177     * position if {@code readlimit} has not been surpassed. The underlying
178     * buffer may be increased in size to allow {@code readlimit} number of
179     * bytes to be supported.
180     *
181     * @param readlimit
182     *            the number of bytes that can be read before the mark is
183     *            invalidated.
184     * @see #reset()
185     */
186    @Override
187    public synchronized void mark(int readlimit) {
188        marklimit = readlimit;
189        markpos = pos;
190    }
191
192    /**
193     * Indicates whether {@code BufferedInputStream} supports the {@code mark()}
194     * and {@code reset()} methods.
195     *
196     * @return {@code true} for BufferedInputStreams.
197     * @see #mark(int)
198     * @see #reset()
199     */
200    @Override
201    public boolean markSupported() {
202        return true;
203    }
204
205    /**
206     * Reads a single byte from this stream and returns it as an integer in the
207     * range from 0 to 255. Returns -1 if the end of the source string has been
208     * reached. If the internal buffer does not contain any available bytes then
209     * it is filled from the source stream and the first byte is returned.
210     *
211     * @return the byte read or -1 if the end of the source stream has been
212     *         reached.
213     * @throws IOException
214     *             if this stream is closed or another IOException occurs.
215     */
216    @Override
217    public synchronized int read() throws IOException {
218        // Use local refs since buf and in may be invalidated by an
219        // unsynchronized close()
220        byte[] localBuf = buf;
221        InputStream localIn = in;
222        if (localBuf == null || localIn == null) {
223            throw streamClosed();
224        }
225
226        /* Are there buffered bytes available? */
227        if (pos >= count && fillbuf(localIn, localBuf) == -1) {
228            return -1; /* no, fill buffer */
229        }
230        // localBuf may have been invalidated by fillbuf
231        if (localBuf != buf) {
232            localBuf = buf;
233            if (localBuf == null) {
234                throw streamClosed();
235            }
236        }
237
238        /* Did filling the buffer fail with -1 (EOF)? */
239        if (count - pos > 0) {
240            return localBuf[pos++] & 0xFF;
241        }
242        return -1;
243    }
244
245    /**
246     * Reads at most {@code byteCount} bytes from this stream and stores them in
247     * byte array {@code buffer} starting at offset {@code offset}. Returns the
248     * number of bytes actually read or -1 if no bytes were read and the end of
249     * the stream was encountered. If all the buffered bytes have been used, a
250     * mark has not been set and the requested number of bytes is larger than
251     * the receiver's buffer size, this implementation bypasses the buffer and
252     * simply places the results directly into {@code buffer}.
253     *
254     * @param buffer
255     *            the byte array in which to store the bytes read.
256     * @return the number of bytes actually read or -1 if end of stream.
257     * @throws IndexOutOfBoundsException
258     *             if {@code offset < 0} or {@code byteCount < 0}, or if
259     *             {@code offset + byteCount} is greater than the size of
260     *             {@code buffer}.
261     * @throws IOException
262     *             if the stream is already closed or another IOException
263     *             occurs.
264     */
265    @Override
266    public synchronized int read(byte[] buffer, int offset, int byteCount) throws IOException {
267        // Use local ref since buf may be invalidated by an unsynchronized
268        // close()
269        byte[] localBuf = buf;
270        if (localBuf == null) {
271            throw streamClosed();
272        }
273        Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
274        if (byteCount == 0) {
275            return 0;
276        }
277        InputStream localIn = in;
278        if (localIn == null) {
279            throw streamClosed();
280        }
281
282        int required;
283        if (pos < count) {
284            /* There are bytes available in the buffer. */
285            int copylength = count - pos >= byteCount ? byteCount : count - pos;
286            System.arraycopy(localBuf, pos, buffer, offset, copylength);
287            pos += copylength;
288            if (copylength == byteCount || localIn.available() == 0) {
289                return copylength;
290            }
291            offset += copylength;
292            required = byteCount - copylength;
293        } else {
294            required = byteCount;
295        }
296
297        while (true) {
298            int read;
299            /*
300             * If we're not marked and the required size is greater than the
301             * buffer, simply read the bytes directly bypassing the buffer.
302             */
303            if (markpos == -1 && required >= localBuf.length) {
304                read = localIn.read(buffer, offset, required);
305                if (read == -1) {
306                    return required == byteCount ? -1 : byteCount - required;
307                }
308            } else {
309                if (fillbuf(localIn, localBuf) == -1) {
310                    return required == byteCount ? -1 : byteCount - required;
311                }
312                // localBuf may have been invalidated by fillbuf
313                if (localBuf != buf) {
314                    localBuf = buf;
315                    if (localBuf == null) {
316                        throw streamClosed();
317                    }
318                }
319
320                read = count - pos >= required ? required : count - pos;
321                System.arraycopy(localBuf, pos, buffer, offset, read);
322                pos += read;
323            }
324            required -= read;
325            if (required == 0) {
326                return byteCount;
327            }
328            if (localIn.available() == 0) {
329                return byteCount - required;
330            }
331            offset += read;
332        }
333    }
334
335    /**
336     * Resets this stream to the last marked location.
337     *
338     * @throws IOException
339     *             if this stream is closed, no mark has been set or the mark is
340     *             no longer valid because more than {@code readlimit} bytes
341     *             have been read since setting the mark.
342     * @see #mark(int)
343     */
344    @Override
345    public synchronized void reset() throws IOException {
346        if (buf == null) {
347            throw new IOException("Stream is closed");
348        }
349        if (-1 == markpos) {
350            throw new IOException("Mark has been invalidated.");
351        }
352        pos = markpos;
353    }
354
355    /**
356     * Skips {@code byteCount} bytes in this stream. Subsequent calls to
357     * {@code read} will not return these bytes unless {@code reset} is
358     * used.
359     *
360     * @param byteCount
361     *            the number of bytes to skip. {@code skip} does nothing and
362     *            returns 0 if {@code byteCount} is less than zero.
363     * @return the number of bytes actually skipped.
364     * @throws IOException
365     *             if this stream is closed or another IOException occurs.
366     */
367    @Override
368    public synchronized long skip(long byteCount) throws IOException {
369        // Use local refs since buf and in may be invalidated by an
370        // unsynchronized close()
371        byte[] localBuf = buf;
372        InputStream localIn = in;
373        if (localBuf == null) {
374            throw streamClosed();
375        }
376        if (byteCount < 1) {
377            return 0;
378        }
379        if (localIn == null) {
380            throw streamClosed();
381        }
382
383        if (count - pos >= byteCount) {
384            pos += byteCount;
385            return byteCount;
386        }
387        long read = count - pos;
388        pos = count;
389
390        if (markpos != -1) {
391            if (byteCount <= marklimit) {
392                if (fillbuf(localIn, localBuf) == -1) {
393                    return read;
394                }
395                if (count - pos >= byteCount - read) {
396                    pos += byteCount - read;
397                    return byteCount;
398                }
399                // Couldn't get all the bytes, skip what we read
400                read += (count - pos);
401                pos = count;
402                return read;
403            }
404        }
405        return read + localIn.skip(byteCount - read);
406    }
407}
408