1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.io;
19
20import java.util.Arrays;
21
22/**
23 * Receives information from a communications pipe. When two threads want to
24 * pass data back and forth, one creates a piped output stream and the other one
25 * creates a piped input stream.
26 *
27 * @see PipedOutputStream
28 */
29public class PipedInputStream extends InputStream {
30
31    private Thread lastReader;
32
33    private Thread lastWriter;
34
35    private boolean isClosed;
36
37    /**
38     * The circular buffer through which data is passed. Data is read from the
39     * range {@code [out, in)} and written to the range {@code [in, out)}.
40     * Data in the buffer is either sequential: <pre>
41     *     { - - - X X X X X X X - - - - - }
42     *             ^             ^
43     *             |             |
44     *            out           in</pre>
45     * ...or wrapped around the buffer's end: <pre>
46     *     { X X X X - - - - - - - - X X X }
47     *               ^               ^
48     *               |               |
49     *              in              out</pre>
50     * When the buffer is empty, {@code in == -1}. Reading when the buffer is
51     * empty will block until data is available. When the buffer is full,
52     * {@code in == out}. Writing when the buffer is full will block until free
53     * space is available.
54     */
55    protected byte[] buffer;
56
57    /**
58     * The index in {@code buffer} where the next byte will be written.
59     */
60    protected int in = -1;
61
62    /**
63     * The index in {@code buffer} where the next byte will be read.
64     */
65    protected int out;
66
67    /**
68     * The size of the default pipe in bytes.
69     */
70    protected static final int PIPE_SIZE = 1024;
71
72    /**
73     * Indicates if this pipe is connected.
74     */
75    boolean isConnected;
76
77    /**
78     * Constructs a new unconnected {@code PipedInputStream}. The resulting
79     * stream must be connected to a {@link PipedOutputStream} before data may
80     * be read from it.
81     */
82    public PipedInputStream() {}
83
84    /**
85     * Constructs a new {@code PipedInputStream} connected to the
86     * {@link PipedOutputStream} {@code out}. Any data written to the output
87     * stream can be read from the this input stream.
88     *
89     * @param out
90     *            the piped output stream to connect to.
91     * @throws IOException
92     *             if this stream or {@code out} are already connected.
93     */
94    public PipedInputStream(PipedOutputStream out) throws IOException {
95        connect(out);
96    }
97
98    /**
99     * Constructs a new unconnected {@code PipedInputStream} with the given
100     * buffer size. The resulting stream must be connected to a
101     * {@code PipedOutputStream} before data may be read from it.
102     *
103     * @param pipeSize the size of the buffer in bytes.
104     * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
105     * @since 1.6
106     */
107    public PipedInputStream(int pipeSize) {
108        if (pipeSize <= 0) {
109            throw new IllegalArgumentException("pipe size " + pipeSize + " too small");
110        }
111        buffer = new byte[pipeSize];
112    }
113
114    /**
115     * Constructs a new {@code PipedInputStream} connected to the given {@code PipedOutputStream},
116     * with the given buffer size. Any data written to the output stream can be read from this
117     * input stream.
118     *
119     * @param out the {@code PipedOutputStream} to connect to.
120     * @param pipeSize the size of the buffer in bytes.
121     * @throws IOException if an I/O error occurs.
122     * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
123     * @since 1.6
124     */
125    public PipedInputStream(PipedOutputStream out, int pipeSize) throws IOException {
126        this(pipeSize);
127        connect(out);
128    }
129
130    /**
131     * {@inheritDoc}
132     *
133     * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing
134     * {@code IOException} if the stream has been closed. Unconnected and broken pipes also
135     * return 0.
136     *
137     * @throws IOException if an I/O error occurs
138     */
139    @Override
140    public synchronized int available() throws IOException {
141        if (buffer == null || in == -1) {
142            return 0;
143        }
144        return in <= out ? buffer.length - out + in : in - out;
145    }
146
147    /**
148     * Closes this stream. This implementation releases the buffer used for the
149     * pipe and notifies all threads waiting to read or write.
150     *
151     * @throws IOException
152     *             if an error occurs while closing this stream.
153     */
154    @Override
155    public synchronized void close() throws IOException {
156        buffer = null;
157        notifyAll();
158    }
159
160    /**
161     * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}.
162     * Any data written to the output stream becomes readable in this input
163     * stream.
164     *
165     * @param src
166     *            the source output stream.
167     * @throws IOException
168     *             if either stream is already connected.
169     */
170    public void connect(PipedOutputStream src) throws IOException {
171        src.connect(this);
172    }
173
174    /**
175     * Establishes the connection to the PipedOutputStream.
176     *
177     * @throws IOException
178     *             If this Reader is already connected.
179     */
180    synchronized void establishConnection() throws IOException {
181        if (isConnected) {
182            throw new IOException("Pipe already connected");
183        }
184        if (buffer == null) { // We may already have allocated the buffer.
185            buffer = new byte[PipedInputStream.PIPE_SIZE];
186        }
187        isConnected = true;
188    }
189
190    /**
191     * Reads a single byte from this stream and returns it as an integer in the
192     * range from 0 to 255. Returns -1 if the end of this stream has been
193     * reached. If there is no data in the pipe, this method blocks until data
194     * is available, the end of the stream is detected or an exception is
195     * thrown.
196     * <p>
197     * Separate threads should be used to read from a {@code PipedInputStream}
198     * and to write to the connected {@link PipedOutputStream}. If the same
199     * thread is used, a deadlock may occur.
200     *
201     * @return the byte read or -1 if the end of the source stream has been
202     *         reached.
203     * @throws IOException
204     *             if this stream is closed or not connected to an output
205     *             stream, or if the thread writing to the connected output
206     *             stream is no longer alive.
207     */
208    @Override
209    public synchronized int read() throws IOException {
210        if (!isConnected) {
211            throw new IOException("Not connected");
212        }
213        if (buffer == null) {
214            throw new IOException("InputStream is closed");
215        }
216
217        /**
218         * Set the last thread to be reading on this PipedInputStream. If
219         * lastReader dies while someone is waiting to write an IOException of
220         * "Pipe broken" will be thrown in receive()
221         */
222        lastReader = Thread.currentThread();
223        try {
224            int attempts = 3;
225            while (in == -1) {
226                // Are we at end of stream?
227                if (isClosed) {
228                    return -1;
229                }
230                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
231                    throw new IOException("Pipe broken");
232                }
233                // Notify callers of receive()
234                notifyAll();
235                wait(1000);
236            }
237        } catch (InterruptedException e) {
238            throw new InterruptedIOException();
239        }
240
241        int result = buffer[out++] & 0xff;
242        if (out == buffer.length) {
243            out = 0;
244        }
245        if (out == in) {
246            // empty buffer
247            in = -1;
248            out = 0;
249        }
250
251        // let blocked writers write to the newly available buffer space
252        notifyAll();
253
254        return result;
255    }
256
257    /**
258     * Reads at most {@code byteCount} bytes from this stream and stores them in the
259     * byte array {@code bytes} starting at {@code offset}. Blocks until at
260     * least one byte has been read, the end of the stream is detected or an
261     * exception is thrown.
262     * <p>
263     * Separate threads should be used to read from a {@code PipedInputStream}
264     * and to write to the connected {@link PipedOutputStream}. If the same
265     * thread is used, a deadlock may occur.
266     *
267     * @return the number of bytes actually read or -1 if the end of the stream
268     *         has been reached.
269     * @throws IndexOutOfBoundsException
270     *             if {@code offset < 0} or {@code byteCount < 0}, or if {@code
271     *             offset + byteCount} is greater than the size of {@code bytes}.
272     * @throws InterruptedIOException
273     *             if the thread reading from this stream is interrupted.
274     * @throws IOException
275     *             if this stream is closed or not connected to an output
276     *             stream, or if the thread writing to the connected output
277     *             stream is no longer alive.
278     * @throws NullPointerException
279     *             if {@code bytes} is {@code null}.
280     */
281    @Override
282    public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException {
283        Arrays.checkOffsetAndCount(bytes.length, offset, byteCount);
284        if (byteCount == 0) {
285            return 0;
286        }
287
288        if (!isConnected) {
289            throw new IOException("Not connected");
290        }
291
292        if (buffer == null) {
293            throw new IOException("InputStream is closed");
294        }
295
296        /*
297         * Set the last thread to be reading on this PipedInputStream. If
298         * lastReader dies while someone is waiting to write an IOException of
299         * "Pipe broken" will be thrown in receive()
300         */
301        lastReader = Thread.currentThread();
302        try {
303            int attempts = 3;
304            while (in == -1) {
305                // Are we at end of stream?
306                if (isClosed) {
307                    return -1;
308                }
309                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
310                    throw new IOException("Pipe broken");
311                }
312                // Notify callers of receive()
313                notifyAll();
314                wait(1000);
315            }
316        } catch (InterruptedException e) {
317            throw new InterruptedIOException();
318        }
319
320        int totalCopied = 0;
321
322        // copy bytes from out thru the end of buffer
323        if (out >= in) {
324            int leftInBuffer = buffer.length - out;
325            int length = leftInBuffer < byteCount ? leftInBuffer : byteCount;
326            System.arraycopy(buffer, out, bytes, offset, length);
327            out += length;
328            if (out == buffer.length) {
329                out = 0;
330            }
331            if (out == in) {
332                // empty buffer
333                in = -1;
334                out = 0;
335            }
336            totalCopied += length;
337        }
338
339        // copy bytes from out thru in
340        if (totalCopied < byteCount && in != -1) {
341            int leftInBuffer = in - out;
342            int leftToCopy = byteCount - totalCopied;
343            int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
344            System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
345            out += length;
346            if (out == in) {
347                // empty buffer
348                in = -1;
349                out = 0;
350            }
351            totalCopied += length;
352        }
353
354        // let blocked writers write to the newly available buffer space
355        notifyAll();
356
357        return totalCopied;
358    }
359
360    /**
361     * Receives a byte and stores it in this stream's {@code buffer}. This
362     * method is called by {@link PipedOutputStream#write(int)}. The least
363     * significant byte of the integer {@code oneByte} is stored at index
364     * {@code in} in the {@code buffer}.
365     * <p>
366     * This method blocks as long as {@code buffer} is full.
367     *
368     * @param oneByte
369     *            the byte to store in this pipe.
370     * @throws InterruptedIOException
371     *             if the {@code buffer} is full and the thread that has called
372     *             this method is interrupted.
373     * @throws IOException
374     *             if this stream is closed or the thread that has last read
375     *             from this stream is no longer alive.
376     */
377    protected synchronized void receive(int oneByte) throws IOException {
378        if (buffer == null || isClosed) {
379            throw new IOException("Pipe is closed");
380        }
381
382        /*
383         * Set the last thread to be writing on this PipedInputStream. If
384         * lastWriter dies while someone is waiting to read an IOException of
385         * "Pipe broken" will be thrown in read()
386         */
387        lastWriter = Thread.currentThread();
388        try {
389            while (buffer != null && out == in) {
390                if (lastReader != null && !lastReader.isAlive()) {
391                    throw new IOException("Pipe broken");
392                }
393                notifyAll();
394                wait(1000);
395            }
396        } catch (InterruptedException e) {
397            throw new InterruptedIOException();
398        }
399        if (buffer == null) {
400            throw new IOException("Pipe is closed");
401        }
402        if (in == -1) {
403            in = 0;
404        }
405        buffer[in++] = (byte) oneByte;
406        if (in == buffer.length) {
407            in = 0;
408        }
409
410        // let blocked readers read the newly available data
411        notifyAll();
412    }
413
414    synchronized void done() {
415        isClosed = true;
416        notifyAll();
417    }
418}
419