1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.io;
19
20import java.util.Arrays;
21import libcore.io.IoUtils;
22
23/**
24 * Receives information from a communications pipe. When two threads want to
25 * pass data back and forth, one creates a piped output stream and the other one
26 * creates a piped input stream.
27 *
28 * @see PipedOutputStream
29 */
30public class PipedInputStream extends InputStream {
31
32    private Thread lastReader;
33
34    private Thread lastWriter;
35
36    private boolean isClosed;
37
38    /**
39     * The circular buffer through which data is passed. Data is read from the
40     * range {@code [out, in)} and written to the range {@code [in, out)}.
41     * Data in the buffer is either sequential: <pre>
42     *     { - - - X X X X X X X - - - - - }
43     *             ^             ^
44     *             |             |
45     *            out           in</pre>
46     * ...or wrapped around the buffer's end: <pre>
47     *     { X X X X - - - - - - - - X X X }
48     *               ^               ^
49     *               |               |
50     *              in              out</pre>
51     * When the buffer is empty, {@code in == -1}. Reading when the buffer is
52     * empty will block until data is available. When the buffer is full,
53     * {@code in == out}. Writing when the buffer is full will block until free
54     * space is available.
55     */
56    protected byte[] buffer;
57
58    /**
59     * The index in {@code buffer} where the next byte will be written.
60     */
61    protected int in = -1;
62
63    /**
64     * The index in {@code buffer} where the next byte will be read.
65     */
66    protected int out;
67
68    /**
69     * The size of the default pipe in bytes.
70     */
71    protected static final int PIPE_SIZE = 1024;
72
73    /**
74     * Indicates if this pipe is connected.
75     */
76    boolean isConnected;
77
78    /**
79     * Constructs a new unconnected {@code PipedInputStream}. The resulting
80     * stream must be connected to a {@link PipedOutputStream} before data may
81     * be read from it.
82     */
83    public PipedInputStream() {}
84
85    /**
86     * Constructs a new {@code PipedInputStream} connected to the
87     * {@link PipedOutputStream} {@code out}. Any data written to the output
88     * stream can be read from the this input stream.
89     *
90     * @param out
91     *            the piped output stream to connect to.
92     * @throws IOException
93     *             if this stream or {@code out} are already connected.
94     */
95    public PipedInputStream(PipedOutputStream out) throws IOException {
96        connect(out);
97    }
98
99    /**
100     * Constructs a new unconnected {@code PipedInputStream} with the given
101     * buffer size. The resulting stream must be connected to a
102     * {@code PipedOutputStream} before data may be read from it.
103     *
104     * @param pipeSize the size of the buffer in bytes.
105     * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
106     * @since 1.6
107     */
108    public PipedInputStream(int pipeSize) {
109        if (pipeSize <= 0) {
110            throw new IllegalArgumentException("pipe size " + pipeSize + " too small");
111        }
112        buffer = new byte[pipeSize];
113    }
114
115    /**
116     * Constructs a new {@code PipedInputStream} connected to the given {@code PipedOutputStream},
117     * with the given buffer size. Any data written to the output stream can be read from this
118     * input stream.
119     *
120     * @param out the {@code PipedOutputStream} to connect to.
121     * @param pipeSize the size of the buffer in bytes.
122     * @throws IOException if an I/O error occurs.
123     * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
124     * @since 1.6
125     */
126    public PipedInputStream(PipedOutputStream out, int pipeSize) throws IOException {
127        this(pipeSize);
128        connect(out);
129    }
130
131    /**
132     * {@inheritDoc}
133     *
134     * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing
135     * {@code IOException} if the stream has been closed. Unconnected and broken pipes also
136     * return 0.
137     *
138     * @throws IOException if an I/O error occurs
139     */
140    @Override
141    public synchronized int available() throws IOException {
142        if (buffer == null || in == -1) {
143            return 0;
144        }
145        return in <= out ? buffer.length - out + in : in - out;
146    }
147
148    /**
149     * Closes this stream. This implementation releases the buffer used for the
150     * pipe and notifies all threads waiting to read or write.
151     *
152     * @throws IOException
153     *             if an error occurs while closing this stream.
154     */
155    @Override
156    public synchronized void close() throws IOException {
157        buffer = null;
158        notifyAll();
159    }
160
161    /**
162     * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}.
163     * Any data written to the output stream becomes readable in this input
164     * stream.
165     *
166     * @param src
167     *            the source output stream.
168     * @throws IOException
169     *             if either stream is already connected.
170     */
171    public void connect(PipedOutputStream src) throws IOException {
172        src.connect(this);
173    }
174
175    /**
176     * Establishes the connection to the PipedOutputStream.
177     *
178     * @throws IOException
179     *             If this Reader is already connected.
180     */
181    synchronized void establishConnection() throws IOException {
182        if (isConnected) {
183            throw new IOException("Pipe already connected");
184        }
185        if (buffer == null) { // We may already have allocated the buffer.
186            buffer = new byte[PipedInputStream.PIPE_SIZE];
187        }
188        isConnected = true;
189    }
190
191    /**
192     * Reads a single byte from this stream and returns it as an integer in the
193     * range from 0 to 255. Returns -1 if the end of this stream has been
194     * reached. If there is no data in the pipe, this method blocks until data
195     * is available, the end of the stream is detected or an exception is
196     * thrown.
197     * <p>
198     * Separate threads should be used to read from a {@code PipedInputStream}
199     * and to write to the connected {@link PipedOutputStream}. If the same
200     * thread is used, a deadlock may occur.
201     *
202     * @return the byte read or -1 if the end of the source stream has been
203     *         reached.
204     * @throws IOException
205     *             if this stream is closed or not connected to an output
206     *             stream, or if the thread writing to the connected output
207     *             stream is no longer alive.
208     */
209    @Override
210    public synchronized int read() throws IOException {
211        if (!isConnected) {
212            throw new IOException("Not connected");
213        }
214        if (buffer == null) {
215            throw new IOException("InputStream is closed");
216        }
217
218        /**
219         * Set the last thread to be reading on this PipedInputStream. If
220         * lastReader dies while someone is waiting to write an IOException of
221         * "Pipe broken" will be thrown in receive()
222         */
223        lastReader = Thread.currentThread();
224        try {
225            int attempts = 3;
226            while (in == -1) {
227                // Are we at end of stream?
228                if (isClosed) {
229                    return -1;
230                }
231                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
232                    throw new IOException("Pipe broken");
233                }
234                // Notify callers of receive()
235                notifyAll();
236                wait(1000);
237            }
238        } catch (InterruptedException e) {
239            IoUtils.throwInterruptedIoException();
240        }
241
242        int result = buffer[out++] & 0xff;
243        if (out == buffer.length) {
244            out = 0;
245        }
246        if (out == in) {
247            // empty buffer
248            in = -1;
249            out = 0;
250        }
251
252        // let blocked writers write to the newly available buffer space
253        notifyAll();
254
255        return result;
256    }
257
258    /**
259     * Reads at most {@code byteCount} bytes from this stream and stores them in the
260     * byte array {@code bytes} starting at {@code offset}. Blocks until at
261     * least one byte has been read, the end of the stream is detected or an
262     * exception is thrown.
263     * <p>
264     * Separate threads should be used to read from a {@code PipedInputStream}
265     * and to write to the connected {@link PipedOutputStream}. If the same
266     * thread is used, a deadlock may occur.
267     *
268     * @return the number of bytes actually read or -1 if the end of the stream
269     *         has been reached.
270     * @throws IndexOutOfBoundsException
271     *             if {@code offset < 0} or {@code byteCount < 0}, or if {@code
272     *             offset + byteCount} is greater than the size of {@code bytes}.
273     * @throws InterruptedIOException
274     *             if the thread reading from this stream is interrupted.
275     * @throws IOException
276     *             if this stream is closed or not connected to an output
277     *             stream, or if the thread writing to the connected output
278     *             stream is no longer alive.
279     * @throws NullPointerException
280     *             if {@code bytes} is {@code null}.
281     */
282    @Override
283    public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException {
284        Arrays.checkOffsetAndCount(bytes.length, offset, byteCount);
285        if (byteCount == 0) {
286            return 0;
287        }
288
289        if (!isConnected) {
290            throw new IOException("Not connected");
291        }
292
293        if (buffer == null) {
294            throw new IOException("InputStream is closed");
295        }
296
297        /*
298         * Set the last thread to be reading on this PipedInputStream. If
299         * lastReader dies while someone is waiting to write an IOException of
300         * "Pipe broken" will be thrown in receive()
301         */
302        lastReader = Thread.currentThread();
303        try {
304            int attempts = 3;
305            while (in == -1) {
306                // Are we at end of stream?
307                if (isClosed) {
308                    return -1;
309                }
310                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
311                    throw new IOException("Pipe broken");
312                }
313                // Notify callers of receive()
314                notifyAll();
315                wait(1000);
316            }
317        } catch (InterruptedException e) {
318            IoUtils.throwInterruptedIoException();
319        }
320
321        int totalCopied = 0;
322
323        // copy bytes from out thru the end of buffer
324        if (out >= in) {
325            int leftInBuffer = buffer.length - out;
326            int length = leftInBuffer < byteCount ? leftInBuffer : byteCount;
327            System.arraycopy(buffer, out, bytes, offset, length);
328            out += length;
329            if (out == buffer.length) {
330                out = 0;
331            }
332            if (out == in) {
333                // empty buffer
334                in = -1;
335                out = 0;
336            }
337            totalCopied += length;
338        }
339
340        // copy bytes from out thru in
341        if (totalCopied < byteCount && in != -1) {
342            int leftInBuffer = in - out;
343            int leftToCopy = byteCount - totalCopied;
344            int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
345            System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
346            out += length;
347            if (out == in) {
348                // empty buffer
349                in = -1;
350                out = 0;
351            }
352            totalCopied += length;
353        }
354
355        // let blocked writers write to the newly available buffer space
356        notifyAll();
357
358        return totalCopied;
359    }
360
361    /**
362     * Receives a byte and stores it in this stream's {@code buffer}. This
363     * method is called by {@link PipedOutputStream#write(int)}. The least
364     * significant byte of the integer {@code oneByte} is stored at index
365     * {@code in} in the {@code buffer}.
366     * <p>
367     * This method blocks as long as {@code buffer} is full.
368     *
369     * @param oneByte
370     *            the byte to store in this pipe.
371     * @throws InterruptedIOException
372     *             if the {@code buffer} is full and the thread that has called
373     *             this method is interrupted.
374     * @throws IOException
375     *             if this stream is closed or the thread that has last read
376     *             from this stream is no longer alive.
377     */
378    protected synchronized void receive(int oneByte) throws IOException {
379        if (buffer == null || isClosed) {
380            throw new IOException("Pipe is closed");
381        }
382
383        /*
384         * Set the last thread to be writing on this PipedInputStream. If
385         * lastWriter dies while someone is waiting to read an IOException of
386         * "Pipe broken" will be thrown in read()
387         */
388        lastWriter = Thread.currentThread();
389        try {
390            while (buffer != null && out == in) {
391                if (lastReader != null && !lastReader.isAlive()) {
392                    throw new IOException("Pipe broken");
393                }
394                notifyAll();
395                wait(1000);
396            }
397        } catch (InterruptedException e) {
398            IoUtils.throwInterruptedIoException();
399        }
400        if (buffer == null) {
401            throw new IOException("Pipe is closed");
402        }
403        if (in == -1) {
404            in = 0;
405        }
406        buffer[in++] = (byte) oneByte;
407        if (in == buffer.length) {
408            in = 0;
409        }
410
411        // let blocked readers read the newly available data
412        notifyAll();
413    }
414
415    synchronized void done() {
416        isClosed = true;
417        notifyAll();
418    }
419}
420