1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.io;
19
20import java.util.Arrays;
21
22/**
23 * Wraps an existing {@link InputStream} and <em>buffers</em> the input.
24 * Expensive interaction with the underlying input stream is minimized, since
25 * most (smaller) requests can be satisfied by accessing the buffer alone. The
26 * drawback is that some extra space is required to hold the buffer and that
27 * copying takes place when filling that buffer, but this is usually outweighed
28 * by the performance benefits.
29 *
30 * <p/>A typical application pattern for the class looks like this:<p/>
31 *
32 * <pre>
33 * BufferedInputStream buf = new BufferedInputStream(new FileInputStream(&quot;file.java&quot;));
34 * </pre>
35 *
36 * @see BufferedOutputStream
37 */
38public class BufferedInputStream extends FilterInputStream {
39    /**
40     * The buffer containing the current bytes read from the target InputStream.
41     */
42    protected volatile byte[] buf;
43
44    /**
45     * The total number of bytes inside the byte array {@code buf}.
46     */
47    protected int count;
48
49    /**
50     * The current limit, which when passed, invalidates the current mark.
51     */
52    protected int marklimit;
53
54    /**
55     * The currently marked position. -1 indicates no mark has been set or the
56     * mark has been invalidated.
57     */
58    protected int markpos = -1;
59
60    /**
61     * The current position within the byte array {@code buf}.
62     */
63    protected int pos;
64
65    /**
66     * Constructs a new {@code BufferedInputStream}, providing {@code in} with a buffer
67     * of 8192 bytes.
68     *
69     * <p><strong>Warning:</strong> passing a null source creates a closed
70     * {@code BufferedInputStream}. All read operations on such a stream will
71     * fail with an IOException.
72     *
73     * @param in the {@code InputStream} the buffer reads from.
74     */
75    public BufferedInputStream(InputStream in) {
76        this(in, 8192);
77    }
78
79    /**
80     * Constructs a new {@code BufferedInputStream}, providing {@code in} with {@code size} bytes
81     * of buffer.
82     *
83     * <p><strong>Warning:</strong> passing a null source creates a closed
84     * {@code BufferedInputStream}. All read operations on such a stream will
85     * fail with an IOException.
86     *
87     * @param in the {@code InputStream} the buffer reads from.
88     * @param size the size of buffer in bytes.
89     * @throws IllegalArgumentException if {@code size <= 0}.
90     */
91    public BufferedInputStream(InputStream in, int size) {
92        super(in);
93        if (size <= 0) {
94            throw new IllegalArgumentException("size <= 0");
95        }
96        buf = new byte[size];
97    }
98
99    /**
100     * Returns an estimated number of bytes that can be read or skipped without blocking for more
101     * input. This method returns the number of bytes available in the buffer
102     * plus those available in the source stream, but see {@link InputStream#available} for
103     * important caveats.
104     *
105     * @return the estimated number of bytes available
106     * @throws IOException if this stream is closed or an error occurs
107     */
108    @Override
109    public synchronized int available() throws IOException {
110        InputStream localIn = in; // 'in' could be invalidated by close()
111        if (buf == null || localIn == null) {
112            throw streamClosed();
113        }
114        return count - pos + localIn.available();
115    }
116
117    private IOException streamClosed() throws IOException {
118        throw new IOException("BufferedInputStream is closed");
119    }
120
121    /**
122     * Closes this stream. The source stream is closed and any resources
123     * associated with it are released.
124     *
125     * @throws IOException
126     *             if an error occurs while closing this stream.
127     */
128    @Override
129    public void close() throws IOException {
130        buf = null;
131        InputStream localIn = in;
132        in = null;
133        if (localIn != null) {
134            localIn.close();
135        }
136    }
137
138    private int fillbuf(InputStream localIn, byte[] localBuf)
139            throws IOException {
140        if (markpos == -1 || (pos - markpos >= marklimit)) {
141            /* Mark position not set or exceeded readlimit */
142            int result = localIn.read(localBuf);
143            if (result > 0) {
144                markpos = -1;
145                pos = 0;
146                count = result == -1 ? 0 : result;
147            }
148            return result;
149        }
150        if (markpos == 0 && marklimit > localBuf.length) {
151            /* Increase buffer size to accommodate the readlimit */
152            int newLength = localBuf.length * 2;
153            if (newLength > marklimit) {
154                newLength = marklimit;
155            }
156            byte[] newbuf = new byte[newLength];
157            System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
158            // Reassign buf, which will invalidate any local references
159            // FIXME: what if buf was null?
160            localBuf = buf = newbuf;
161        } else if (markpos > 0) {
162            System.arraycopy(localBuf, markpos, localBuf, 0, localBuf.length
163                    - markpos);
164        }
165        /* Set the new position and mark position */
166        pos -= markpos;
167        count = markpos = 0;
168        int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
169        count = bytesread <= 0 ? pos : pos + bytesread;
170        return bytesread;
171    }
172
173    /**
174     * Sets a mark position in this stream. The parameter {@code readlimit}
175     * indicates how many bytes can be read before a mark is invalidated.
176     * Calling {@code reset()} will reposition the stream back to the marked
177     * position if {@code readlimit} has not been surpassed. The underlying
178     * buffer may be increased in size to allow {@code readlimit} number of
179     * bytes to be supported.
180     *
181     * @param readlimit
182     *            the number of bytes that can be read before the mark is
183     *            invalidated.
184     * @see #reset()
185     */
186    @Override
187    public synchronized void mark(int readlimit) {
188        marklimit = readlimit;
189        markpos = pos;
190    }
191
192    /**
193     * Indicates whether {@code BufferedInputStream} supports the {@code mark()}
194     * and {@code reset()} methods.
195     *
196     * @return {@code true} for BufferedInputStreams.
197     * @see #mark(int)
198     * @see #reset()
199     */
200    @Override
201    public boolean markSupported() {
202        return true;
203    }
204
205    /**
206     * Reads a single byte from this stream and returns it as an integer in the
207     * range from 0 to 255. Returns -1 if the end of the source string has been
208     * reached. If the internal buffer does not contain any available bytes then
209     * it is filled from the source stream and the first byte is returned.
210     *
211     * @return the byte read or -1 if the end of the source stream has been
212     *         reached.
213     * @throws IOException
214     *             if this stream is closed or another IOException occurs.
215     */
216    @Override
217    public synchronized int read() throws IOException {
218        // Use local refs since buf and in may be invalidated by an
219        // unsynchronized close()
220        byte[] localBuf = buf;
221        InputStream localIn = in;
222        if (localBuf == null || localIn == null) {
223            throw streamClosed();
224        }
225
226        /* Are there buffered bytes available? */
227        if (pos >= count && fillbuf(localIn, localBuf) == -1) {
228            return -1; /* no, fill buffer */
229        }
230        // localBuf may have been invalidated by fillbuf
231        if (localBuf != buf) {
232            localBuf = buf;
233            if (localBuf == null) {
234                throw streamClosed();
235            }
236        }
237
238        /* Did filling the buffer fail with -1 (EOF)? */
239        if (count - pos > 0) {
240            return localBuf[pos++] & 0xFF;
241        }
242        return -1;
243    }
244
245    @Override public synchronized int read(byte[] buffer, int byteOffset, int byteCount) throws IOException {
246        // Use local ref since buf may be invalidated by an unsynchronized
247        // close()
248        byte[] localBuf = buf;
249        if (localBuf == null) {
250            throw streamClosed();
251        }
252        Arrays.checkOffsetAndCount(buffer.length, byteOffset, byteCount);
253        if (byteCount == 0) {
254            return 0;
255        }
256        InputStream localIn = in;
257        if (localIn == null) {
258            throw streamClosed();
259        }
260
261        int required;
262        if (pos < count) {
263            /* There are bytes available in the buffer. */
264            int copylength = count - pos >= byteCount ? byteCount : count - pos;
265            System.arraycopy(localBuf, pos, buffer, byteOffset, copylength);
266            pos += copylength;
267            if (copylength == byteCount || localIn.available() == 0) {
268                return copylength;
269            }
270            byteOffset += copylength;
271            required = byteCount - copylength;
272        } else {
273            required = byteCount;
274        }
275
276        while (true) {
277            int read;
278            /*
279             * If we're not marked and the required size is greater than the
280             * buffer, simply read the bytes directly bypassing the buffer.
281             */
282            if (markpos == -1 && required >= localBuf.length) {
283                read = localIn.read(buffer, byteOffset, required);
284                if (read == -1) {
285                    return required == byteCount ? -1 : byteCount - required;
286                }
287            } else {
288                if (fillbuf(localIn, localBuf) == -1) {
289                    return required == byteCount ? -1 : byteCount - required;
290                }
291                // localBuf may have been invalidated by fillbuf
292                if (localBuf != buf) {
293                    localBuf = buf;
294                    if (localBuf == null) {
295                        throw streamClosed();
296                    }
297                }
298
299                read = count - pos >= required ? required : count - pos;
300                System.arraycopy(localBuf, pos, buffer, byteOffset, read);
301                pos += read;
302            }
303            required -= read;
304            if (required == 0) {
305                return byteCount;
306            }
307            if (localIn.available() == 0) {
308                return byteCount - required;
309            }
310            byteOffset += read;
311        }
312    }
313
314    /**
315     * Resets this stream to the last marked location.
316     *
317     * @throws IOException
318     *             if this stream is closed, no mark has been set or the mark is
319     *             no longer valid because more than {@code readlimit} bytes
320     *             have been read since setting the mark.
321     * @see #mark(int)
322     */
323    @Override
324    public synchronized void reset() throws IOException {
325        if (buf == null) {
326            throw new IOException("Stream is closed");
327        }
328        if (-1 == markpos) {
329            throw new IOException("Mark has been invalidated.");
330        }
331        pos = markpos;
332    }
333
334    /**
335     * Skips {@code byteCount} bytes in this stream. Subsequent calls to
336     * {@code read} will not return these bytes unless {@code reset} is
337     * used.
338     *
339     * @param byteCount
340     *            the number of bytes to skip. {@code skip} does nothing and
341     *            returns 0 if {@code byteCount} is less than zero.
342     * @return the number of bytes actually skipped.
343     * @throws IOException
344     *             if this stream is closed or another IOException occurs.
345     */
346    @Override
347    public synchronized long skip(long byteCount) throws IOException {
348        // Use local refs since buf and in may be invalidated by an
349        // unsynchronized close()
350        byte[] localBuf = buf;
351        InputStream localIn = in;
352        if (localBuf == null) {
353            throw streamClosed();
354        }
355        if (byteCount < 1) {
356            return 0;
357        }
358        if (localIn == null) {
359            throw streamClosed();
360        }
361
362        if (count - pos >= byteCount) {
363            pos += byteCount;
364            return byteCount;
365        }
366        long read = count - pos;
367        pos = count;
368
369        if (markpos != -1) {
370            if (byteCount <= marklimit) {
371                if (fillbuf(localIn, localBuf) == -1) {
372                    return read;
373                }
374                if (count - pos >= byteCount - read) {
375                    pos += byteCount - read;
376                    return byteCount;
377                }
378                // Couldn't get all the bytes, skip what we read
379                read += (count - pos);
380                pos = count;
381                return read;
382            }
383        }
384        return read + localIn.skip(byteCount - read);
385    }
386}
387