1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.io;
19
20// BEGIN android-note
21// We've made several changes including:
22//  - throw an IOException when a pipe is closed during a write
23//  - fix shallow concurrency problems, always lock on 'this'
24//  - improved consistency with PipedInputStream
25// END android-note
26
27import org.apache.harmony.luni.util.Msg;
28
29/**
30 * Receives information on a communications pipe. When two threads want to pass
31 * data back and forth, one creates a piped writer and the other creates a piped
32 * reader.
33 *
34 * @see PipedWriter
35 */
36public class PipedReader extends Reader {
37
38    private Thread lastReader;
39
40    private Thread lastWriter;
41
42    private boolean isClosed;
43
44    /**
45     * The circular buffer through which data is passed. Data is read from the
46     * range {@code [out, in)} and written to the range {@code [in, out)}.
47     * Data in the buffer is either sequential: <pre>
48     *     { - - - X X X X X X X - - - - - }
49     *             ^             ^
50     *             |             |
51     *            out           in</pre>
52     * ...or wrapped around the buffer's end: <pre>
53     *     { X X X X - - - - - - - - X X X }
54     *               ^               ^
55     *               |               |
56     *              in              out</pre>
57     * When the buffer is empty, {@code in == -1}. Reading when the buffer is
58     * empty will block until data is available. When the buffer is full,
59     * {@code in == out}. Writing when the buffer is full will block until free
60     * space is available.
61     */
62    private char[] buffer;
63
64    /**
65     * The index in {@code buffer} where the next character will be written.
66     */
67    private int in = -1;
68
69    /**
70     * The index in {@code buffer} where the next character will be read.
71     */
72    private int out;
73
74    /**
75     * The size of the default pipe in characters
76     */
77    private static final int PIPE_SIZE = 1024;
78
79    /**
80     * Indicates if this pipe is connected
81     */
82    boolean isConnected;
83
84    /**
85     * Constructs a new unconnected {@code PipedReader}. The resulting reader
86     * must be connected to a {@code PipedWriter} before data may be read from
87     * it.
88     */
89    public PipedReader() {}
90
91    /**
92     * Constructs a new {@code PipedReader} connected to the {@link PipedWriter}
93     * {@code out}. Any data written to the writer can be read from the this
94     * reader.
95     *
96     * @param out
97     *            the {@code PipedWriter} to connect to.
98     * @throws IOException
99     *             if {@code out} is already connected.
100     */
101    public PipedReader(PipedWriter out) throws IOException {
102        connect(out);
103    }
104
105    /**
106     * Closes this reader. This implementation releases the buffer used for
107     * the pipe and notifies all threads waiting to read or write.
108     *
109     * @throws IOException
110     *             if an error occurs while closing this reader.
111     */
112    @Override
113    public synchronized void close() throws IOException {
114        buffer = null;
115        notifyAll();
116    }
117
118    /**
119     * Connects this {@code PipedReader} to a {@link PipedWriter}. Any data
120     * written to the writer becomes readable in this reader.
121     *
122     * @param src
123     *            the writer to connect to.
124     * @throws IOException
125     *             if this reader is closed or already connected, or if {@code
126     *             src} is already connected.
127     */
128    public void connect(PipedWriter src) throws IOException {
129        src.connect(this);
130    }
131
132    /**
133     * Establishes the connection to the PipedWriter.
134     *
135     * @throws IOException
136     *             If this Reader is already connected.
137     */
138    synchronized void establishConnection() throws IOException {
139        if (isConnected) {
140            throw new IOException(Msg.getString("K007a")); //$NON-NLS-1$
141        }
142        buffer = new char[PIPE_SIZE];
143        isConnected = true;
144    }
145
146    /**
147     * Reads a single character from this reader and returns it as an integer
148     * with the two higher-order bytes set to 0. Returns -1 if the end of the
149     * reader has been reached. If there is no data in the pipe, this method
150     * blocks until data is available, the end of the reader is detected or an
151     * exception is thrown.
152     * <p>
153     * Separate threads should be used to read from a {@code PipedReader} and to
154     * write to the connected {@link PipedWriter}. If the same thread is used,
155     * a deadlock may occur.
156     *
157     * @return the character read or -1 if the end of the reader has been
158     *         reached.
159     * @throws IOException
160     *             if this reader is closed or some other I/O error occurs.
161     */
162    @Override
163    public int read() throws IOException {
164        char[] carray = new char[1];
165        int result = read(carray, 0, 1);
166        return result != -1 ? carray[0] : result;
167    }
168
169    /**
170     * Reads at most {@code count} characters from this reader and stores them
171     * in the character array {@code buffer} starting at {@code offset}. If
172     * there is no data in the pipe, this method blocks until at least one byte
173     * has been read, the end of the reader is detected or an exception is
174     * thrown.
175     * <p>
176     * Separate threads should be used to read from a {@code PipedReader} and to
177     * write to the connected {@link PipedWriter}. If the same thread is used, a
178     * deadlock may occur.
179     *
180     * @param buffer
181     *            the character array in which to store the characters read.
182     * @param offset
183     *            the initial position in {@code bytes} to store the characters
184     *            read from this reader.
185     * @param count
186     *            the maximum number of characters to store in {@code buffer}.
187     * @return the number of characters read or -1 if the end of the reader has
188     *         been reached.
189     * @throws IndexOutOfBoundsException
190     *             if {@code offset < 0} or {@code count < 0}, or if {@code
191     *             offset + count} is greater than the size of {@code buffer}.
192     * @throws InterruptedIOException
193     *             if the thread reading from this reader is interrupted.
194     * @throws IOException
195     *             if this reader is closed or not connected to a writer, or if
196     *             the thread writing to the connected writer is no longer
197     *             alive.
198     */
199    @Override
200    public synchronized int read(char[] buffer, int offset, int count) throws IOException {
201        if (!isConnected) {
202            throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$
203        }
204        if (this.buffer == null) {
205            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
206        }
207        // avoid int overflow
208        // BEGIN android-changed
209        // Exception priorities (in case of multiple errors) differ from
210        // RI, but are spec-compliant.
211        // made implicit null check explicit,
212        // used (offset | count) < 0 instead of (offset < 0) || (count < 0)
213        // to safe one operation
214        if (buffer == null) {
215            throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$
216        }
217        if ((offset | count) < 0 || count > buffer.length - offset) {
218            throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$
219        }
220        // END android-changed
221        if (count == 0) {
222            return 0;
223        }
224        /**
225         * Set the last thread to be reading on this PipedReader. If
226         * lastReader dies while someone is waiting to write an IOException
227         * of "Pipe broken" will be thrown in receive()
228         */
229        lastReader = Thread.currentThread();
230        try {
231            boolean first = true;
232            while (in == -1) {
233                // Are we at end of stream?
234                if (isClosed) {
235                    return -1;
236                }
237                if (!first && lastWriter != null && !lastWriter.isAlive()) {
238                    throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
239                }
240                first = false;
241                // Notify callers of receive()
242                notifyAll();
243                wait(1000);
244            }
245        } catch (InterruptedException e) {
246            throw new InterruptedIOException();
247        }
248
249        int copyLength = 0;
250        /* Copy chars from out to end of buffer first */
251        if (out >= in) {
252            copyLength = count > this.buffer.length - out ? this.buffer.length - out
253                    : count;
254            System.arraycopy(this.buffer, out, buffer, offset, copyLength);
255            out += copyLength;
256            if (out == this.buffer.length) {
257                out = 0;
258            }
259            if (out == in) {
260                // empty buffer
261                in = -1;
262                out = 0;
263            }
264        }
265
266        /*
267         * Did the read fully succeed in the previous copy or is the buffer
268         * empty?
269         */
270        if (copyLength == count || in == -1) {
271            return copyLength;
272        }
273
274        int charsCopied = copyLength;
275        /* Copy bytes from 0 to the number of available bytes */
276        copyLength = in - out > count - copyLength ? count - copyLength
277                : in - out;
278        System.arraycopy(this.buffer, out, buffer, offset + charsCopied,
279                copyLength);
280        out += copyLength;
281        if (out == in) {
282            // empty buffer
283            in = -1;
284            out = 0;
285        }
286        return charsCopied + copyLength;
287    }
288
289    /**
290     * Indicates whether this reader is ready to be read without blocking.
291     * Returns {@code true} if this reader will not block when {@code read} is
292     * called, {@code false} if unknown or blocking will occur. This
293     * implementation returns {@code true} if the internal buffer contains
294     * characters that can be read.
295     *
296     * @return always {@code false}.
297     * @throws IOException
298     *             if this reader is closed or not connected, or if some other
299     *             I/O error occurs.
300     * @see #read()
301     * @see #read(char[], int, int)
302     */
303    @Override
304    public synchronized boolean ready() throws IOException {
305        if (!isConnected) {
306            throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$
307        }
308        if (buffer == null) {
309            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
310        }
311        return in != -1;
312    }
313
314    /**
315     * Receives a char and stores it into the PipedReader. This called by
316     * PipedWriter.write() when writes occur.
317     * <P>
318     * If the buffer is full and the thread sending #receive is interrupted, the
319     * InterruptedIOException will be thrown.
320     *
321     * @param oneChar
322     *            the char to store into the pipe.
323     *
324     * @throws IOException
325     *             If the stream is already closed or another IOException
326     *             occurs.
327     */
328    synchronized void receive(char oneChar) throws IOException {
329        if (buffer == null) {
330            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
331        }
332        if (lastReader != null && !lastReader.isAlive()) {
333            throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
334        }
335        /*
336        * Set the last thread to be writing on this PipedWriter. If
337        * lastWriter dies while someone is waiting to read an IOException
338        * of "Pipe broken" will be thrown in read()
339        */
340        lastWriter = Thread.currentThread();
341        try {
342            while (buffer != null && out == in) {
343                notifyAll();
344                // BEGIN android-changed
345                wait(1000);
346                // END android-changed
347                if (lastReader != null && !lastReader.isAlive()) {
348                    throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
349                }
350            }
351        } catch (InterruptedException e) {
352            throw new InterruptedIOException();
353        }
354        if (buffer == null) {
355            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
356        }
357        if (in == -1) {
358            in = 0;
359        }
360        buffer[in++] = oneChar;
361        if (in == buffer.length) {
362            in = 0;
363        }
364    }
365
366    /**
367     * Receives a char array and stores it into the PipedReader. This called by
368     * PipedWriter.write() when writes occur.
369     * <P>
370     * If the buffer is full and the thread sending #receive is interrupted, the
371     * InterruptedIOException will be thrown.
372     *
373     * @param chars
374     *            the char array to store into the pipe.
375     * @param offset
376     *            offset to start reading from
377     * @param count
378     *            total characters to read
379     *
380     * @throws IOException
381     *             If the stream is already closed or another IOException
382     *             occurs.
383     */
384    synchronized void receive(char[] chars, int offset, int count) throws IOException {
385        if (chars == null) {
386            throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$
387        }
388        if ((offset | count) < 0 || count > chars.length - offset) {
389            throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$
390        }
391        if (buffer == null) {
392            throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
393        }
394        if (lastReader != null && !lastReader.isAlive()) {
395            throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
396        }
397        /**
398         * Set the last thread to be writing on this PipedWriter. If
399         * lastWriter dies while someone is waiting to read an IOException
400         * of "Pipe broken" will be thrown in read()
401         */
402        lastWriter = Thread.currentThread();
403        while (count > 0) {
404            try {
405                while (buffer != null && out == in) {
406                    notifyAll();
407                    // BEGIN android-changed
408                    wait(1000);
409                    // END android-changed
410                    if (lastReader != null && !lastReader.isAlive()) {
411                        throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$
412                    }
413                }
414            } catch (InterruptedException e) {
415                throw new InterruptedIOException();
416            }
417            if (buffer == null) {
418                throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$
419            }
420            if (in == -1) {
421                in = 0;
422            }
423            if (in >= out) {
424                int length = buffer.length - in;
425                if (count < length) {
426                    length = count;
427                }
428                System.arraycopy(chars, offset, buffer, in, length);
429                offset += length;
430                count -= length;
431                in += length;
432                if (in == buffer.length) {
433                    in = 0;
434                }
435            }
436            if (count > 0 && in != out) {
437                int length = out - in;
438                if (count < length) {
439                    length = count;
440                }
441                System.arraycopy(chars, offset, buffer, in, length);
442                offset += length;
443                count -= length;
444                in += length;
445            }
446        }
447    }
448
449    synchronized void done() {
450        isClosed = true;
451        notifyAll();
452    }
453}
454