1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18// BEGIN android-note
19// We've made several changes including:
20//  - delayed buffer creation until pipe connection
21//  - throw an IOException when a pipe is closed during a write
22//  - improved consistency with PipedReader
23// END android-note
24
25package java.io;
26
27import org.apache.harmony.luni.util.Msg;
28
29/**
30 * Receives information from a communications pipe. When two threads want to
31 * pass data back and forth, one creates a piped output stream and the other one
32 * creates a piped input stream.
33 *
34 * @see PipedOutputStream
35 */
36public class PipedInputStream extends InputStream {
37
38    private Thread lastReader;
39
40    private Thread lastWriter;
41
42    private boolean isClosed;
43
44    /**
45     * The circular buffer through which data is passed. Data is read from the
46     * range {@code [out, in)} and written to the range {@code [in, out)}.
47     * Data in the buffer is either sequential: <pre>
48     *     { - - - X X X X X X X - - - - - }
49     *             ^             ^
50     *             |             |
51     *            out           in</pre>
52     * ...or wrapped around the buffer's end: <pre>
53     *     { X X X X - - - - - - - - X X X }
54     *               ^               ^
55     *               |               |
56     *              in              out</pre>
57     * When the buffer is empty, {@code in == -1}. Reading when the buffer is
58     * empty will block until data is available. When the buffer is full,
59     * {@code in == out}. Writing when the buffer is full will block until free
60     * space is available.
61     */
62    protected byte buffer[];
63
64    /**
65     * The index in {@code buffer} where the next byte will be written.
66     */
67    protected int in = -1;
68
69    /**
70     * The index in {@code buffer} where the next byte will be read.
71     */
72    protected int out;
73
74    /**
75     * The size of the default pipe in bytes.
76     */
77    protected static final int PIPE_SIZE = 1024;
78
79    /**
80     * Indicates if this pipe is connected.
81     */
82    boolean isConnected;
83
84    /**
85     * Constructs a new unconnected {@code PipedInputStream}. The resulting
86     * stream must be connected to a {@link PipedOutputStream} before data may
87     * be read from it.
88     */
89    public PipedInputStream() {}
90
91    /**
92     * Constructs a new {@code PipedInputStream} connected to the
93     * {@link PipedOutputStream} {@code out}. Any data written to the output
94     * stream can be read from the this input stream.
95     *
96     * @param out
97     *            the piped output stream to connect to.
98     * @throws IOException
99     *             if this stream or {@code out} are already connected.
100     */
101    public PipedInputStream(PipedOutputStream out) throws IOException {
102        connect(out);
103    }
104
105    /**
106     * {@inheritDoc}
107     *
108     * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing
109     * {@code IOException} if the stream has been closed. Unconnected and broken pipes also
110     * return 0.
111     *
112     * @throws IOException if an I/O error occurs
113     */
114    @Override
115    public synchronized int available() throws IOException {
116        if (buffer == null || in == -1) {
117            return 0;
118        }
119        return in <= out ? buffer.length - out + in : in - out;
120    }
121
122    /**
123     * Closes this stream. This implementation releases the buffer used for the
124     * pipe and notifies all threads waiting to read or write.
125     *
126     * @throws IOException
127     *             if an error occurs while closing this stream.
128     */
129    @Override
130    public synchronized void close() throws IOException {
131        buffer = null;
132        notifyAll();
133    }
134
135    /**
136     * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}.
137     * Any data written to the output stream becomes readable in this input
138     * stream.
139     *
140     * @param src
141     *            the source output stream.
142     * @throws IOException
143     *             if either stream is already connected.
144     */
145    public void connect(PipedOutputStream src) throws IOException {
146        src.connect(this);
147    }
148
149    /**
150     * Establishes the connection to the PipedOutputStream.
151     *
152     * @throws IOException
153     *             If this Reader is already connected.
154     */
155    synchronized void establishConnection() throws IOException {
156        if (isConnected) {
157            throw new IOException(Msg.getString("K007a")); //$NON-NLS-1$
158        }
159        buffer = new byte[PipedInputStream.PIPE_SIZE];
160        isConnected = true;
161    }
162
163    /**
164     * Reads a single byte from this stream and returns it as an integer in the
165     * range from 0 to 255. Returns -1 if the end of this stream has been
166     * reached. If there is no data in the pipe, this method blocks until data
167     * is available, the end of the stream is detected or an exception is
168     * thrown.
169     * <p>
170     * Separate threads should be used to read from a {@code PipedInputStream}
171     * and to write to the connected {@link PipedOutputStream}. If the same
172     * thread is used, a deadlock may occur.
173     *
174     * @return the byte read or -1 if the end of the source stream has been
175     *         reached.
176     * @throws IOException
177     *             if this stream is closed or not connected to an output
178     *             stream, or if the thread writing to the connected output
179     *             stream is no longer alive.
180     */
181    @Override
182    public synchronized int read() throws IOException {
183        if (!isConnected) {
184            // K0074=Not connected
185            throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$
186        }
187        if (buffer == null) {
188            // K0075=InputStream is closed
189            throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
190        }
191
192        // BEGIN android-removed
193        // eagerly throwing prevents checking isClosed and returning normally
194        // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
195        //     // KA030=Write end dead
196        //     throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
197        // }
198        // END android-removed
199
200        /**
201         * Set the last thread to be reading on this PipedInputStream. If
202         * lastReader dies while someone is waiting to write an IOException of
203         * "Pipe broken" will be thrown in receive()
204         */
205        lastReader = Thread.currentThread();
206        try {
207            int attempts = 3;
208            while (in == -1) {
209                // Are we at end of stream?
210                if (isClosed) {
211                    return -1;
212                }
213                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
214                    // K0076=Pipe broken
215                    throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
216                }
217                // Notify callers of receive()
218                notifyAll();
219                wait(1000);
220            }
221        } catch (InterruptedException e) {
222            throw new InterruptedIOException();
223        }
224
225        // BEGIN android-changed
226        int result = buffer[out++] & 0xff;
227        if (out == buffer.length) {
228            out = 0;
229        }
230        if (out == in) {
231            // empty buffer
232            in = -1;
233            out = 0;
234        }
235
236        // let blocked writers write to the newly available buffer space
237        notifyAll();
238
239        return result;
240        // END android-changed
241    }
242
243    /**
244     * Reads at most {@code count} bytes from this stream and stores them in the
245     * byte array {@code bytes} starting at {@code offset}. Blocks until at
246     * least one byte has been read, the end of the stream is detected or an
247     * exception is thrown.
248     * <p>
249     * Separate threads should be used to read from a {@code PipedInputStream}
250     * and to write to the connected {@link PipedOutputStream}. If the same
251     * thread is used, a deadlock may occur.
252     *
253     * @param bytes
254     *            the array in which to store the bytes read.
255     * @param offset
256     *            the initial position in {@code bytes} to store the bytes
257     *            read from this stream.
258     * @param count
259     *            the maximum number of bytes to store in {@code bytes}.
260     * @return the number of bytes actually read or -1 if the end of the stream
261     *         has been reached.
262     * @throws IndexOutOfBoundsException
263     *             if {@code offset < 0} or {@code count < 0}, or if {@code
264     *             offset + count} is greater than the size of {@code bytes}.
265     * @throws InterruptedIOException
266     *             if the thread reading from this stream is interrupted.
267     * @throws IOException
268     *             if this stream is closed or not connected to an output
269     *             stream, or if the thread writing to the connected output
270     *             stream is no longer alive.
271     * @throws NullPointerException
272     *             if {@code bytes} is {@code null}.
273     */
274    @Override
275    public synchronized int read(byte[] bytes, int offset, int count)
276            throws IOException {
277        // BEGIN android-changed
278        if (bytes == null) {
279            throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$
280        }
281
282        // Exception priorities (in case of multiple errors) differ from
283        // RI, but are spec-compliant.
284        // removed redundant check, used (offset | count) < 0
285        // instead of (offset < 0) || (count < 0) to safe one operation
286        if ((offset | count) < 0 || count > bytes.length - offset) {
287            throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$
288        }
289        // END android-changed
290
291        if (count == 0) {
292            return 0;
293        }
294
295        if (!isConnected) {
296            // K0074=Not connected
297            throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$
298        }
299
300        if (buffer == null) {
301            // K0075=InputStream is closed
302            throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$
303        }
304
305        // BEGIN android-removed
306        // eagerly throwing prevents checking isClosed and returning normally
307        // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) {
308        //     // KA030=Write end dead
309        //     throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$
310        // }
311        // END android-removed
312
313        /**
314         * Set the last thread to be reading on this PipedInputStream. If
315         * lastReader dies while someone is waiting to write an IOException of
316         * "Pipe broken" will be thrown in receive()
317         */
318        lastReader = Thread.currentThread();
319        try {
320            int attempts = 3;
321            while (in == -1) {
322                // Are we at end of stream?
323                if (isClosed) {
324                    return -1;
325                }
326                if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
327                    // K0076=Pipe broken
328                    throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
329                }
330                // Notify callers of receive()
331                notifyAll();
332                wait(1000);
333            }
334        } catch (InterruptedException e) {
335            throw new InterruptedIOException();
336        }
337
338        // BEGIN android-changed
339        int totalCopied = 0;
340
341        // copy bytes from out thru the end of buffer
342        if (out >= in) {
343            int leftInBuffer = buffer.length - out;
344            int length = leftInBuffer < count ? leftInBuffer : count;
345            System.arraycopy(buffer, out, bytes, offset, length);
346            out += length;
347            if (out == buffer.length) {
348                out = 0;
349            }
350            if (out == in) {
351                // empty buffer
352                in = -1;
353                out = 0;
354            }
355            totalCopied += length;
356        }
357
358        // copy bytes from out thru in
359        if (totalCopied < count && in != -1) {
360            int leftInBuffer = in - out;
361            int leftToCopy = count - totalCopied;
362            int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
363            System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
364            out += length;
365            if (out == in) {
366                // empty buffer
367                in = -1;
368                out = 0;
369            }
370            totalCopied += length;
371        }
372
373        // let blocked writers write to the newly available buffer space
374        notifyAll();
375
376        return totalCopied;
377        // END android-changed
378    }
379
380    /**
381     * Receives a byte and stores it in this stream's {@code buffer}. This
382     * method is called by {@link PipedOutputStream#write(int)}. The least
383     * significant byte of the integer {@code oneByte} is stored at index
384     * {@code in} in the {@code buffer}.
385     * <p>
386     * This method blocks as long as {@code buffer} is full.
387     *
388     * @param oneByte
389     *            the byte to store in this pipe.
390     * @throws InterruptedIOException
391     *             if the {@code buffer} is full and the thread that has called
392     *             this method is interrupted.
393     * @throws IOException
394     *             if this stream is closed or the thread that has last read
395     *             from this stream is no longer alive.
396     */
397    protected synchronized void receive(int oneByte) throws IOException {
398        if (buffer == null || isClosed) {
399            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
400        }
401        // BEGIN android-removed
402        // eagerly throwing causes us to fail even if the buffer's not full
403        // if (lastReader != null && !lastReader.isAlive()) {
404        //     throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
405        // }
406        // END android-removed
407        /**
408         * Set the last thread to be writing on this PipedInputStream. If
409         * lastWriter dies while someone is waiting to read an IOException of
410         * "Pipe broken" will be thrown in read()
411         */
412        lastWriter = Thread.currentThread();
413        try {
414            while (buffer != null && out == in) {
415                // BEGIN android-changed
416                // moved has-last-reader-died check to be before wait()
417                if (lastReader != null && !lastReader.isAlive()) {
418                    throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
419                }
420                notifyAll();
421                wait(1000);
422                // END android-changed
423            }
424        } catch (InterruptedException e) {
425            throw new InterruptedIOException();
426        }
427        if (buffer == null) {
428            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
429        }
430        if (in == -1) {
431            in = 0;
432        }
433        buffer[in++] = (byte) oneByte;
434        if (in == buffer.length) {
435            in = 0;
436        }
437
438        // BEGIN android-added
439        // let blocked readers read the newly available data
440        notifyAll();
441        // END android-added
442    }
443
444    synchronized void done() {
445        isClosed = true;
446        notifyAll();
447    }
448}
449