1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved.
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This code is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 only, as
8 * published by the Free Software Foundation.  Oracle designates this
9 * particular file as subject to the "Classpath" exception as provided
10 * by Oracle in the LICENSE file that accompanied this code.
11 *
12 * This code is distributed in the hope that it will be useful, but WITHOUT
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 * version 2 for more details (a copy is included in the LICENSE file that
16 * accompanied this code).
17 *
18 * You should have received a copy of the GNU General Public License version
19 * 2 along with this work; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21 *
22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23 * or visit www.oracle.com if you need additional information or have any
24 * questions.
25 */
26
27package java.io;
28
29import libcore.io.IoUtils;
30
31/**
32 * A piped input stream should be connected
33 * to a piped output stream; the piped  input
34 * stream then provides whatever data bytes
35 * are written to the piped output  stream.
36 * Typically, data is read from a <code>PipedInputStream</code>
37 * object by one thread  and data is written
38 * to the corresponding <code>PipedOutputStream</code>
39 * by some  other thread. Attempting to use
40 * both objects from a single thread is not
41 * recommended, as it may deadlock the thread.
42 * The piped input stream contains a buffer,
43 * decoupling read operations from write operations,
44 * within limits.
45 * A pipe is said to be <a name="BROKEN"> <i>broken</i> </a> if a
46 * thread that was providing data bytes to the connected
47 * piped output stream is no longer alive.
48 *
49 * @author  James Gosling
50 * @see     java.io.PipedOutputStream
51 * @since   JDK1.0
52 */
53public class PipedInputStream extends InputStream {
54    boolean closedByWriter = false;
55    volatile boolean closedByReader = false;
56    boolean connected = false;
57
58        /* REMIND: identification of the read and write sides needs to be
59           more sophisticated.  Either using thread groups (but what about
60           pipes within a thread?) or using finalization (but it may be a
61           long time until the next GC). */
62    Thread readSide;
63    Thread writeSide;
64
65    private static final int DEFAULT_PIPE_SIZE = 1024;
66
67    /**
68     * The default size of the pipe's circular input buffer.
69     * @since   JDK1.1
70     */
71    // This used to be a constant before the pipe size was allowed
72    // to change. This field will continue to be maintained
73    // for backward compatibility.
74    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
75
76    /**
77     * The circular buffer into which incoming data is placed.
78     * @since   JDK1.1
79     */
80    protected byte buffer[];
81
82    /**
83     * The index of the position in the circular buffer at which the
84     * next byte of data will be stored when received from the connected
85     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
86     * <code>in==out</code> implies the buffer is full
87     * @since   JDK1.1
88     */
89    protected int in = -1;
90
91    /**
92     * The index of the position in the circular buffer at which the next
93     * byte of data will be read by this piped input stream.
94     * @since   JDK1.1
95     */
96    protected int out = 0;
97
98    /**
99     * Creates a <code>PipedInputStream</code> so
100     * that it is connected to the piped output
101     * stream <code>src</code>. Data bytes written
102     * to <code>src</code> will then be  available
103     * as input from this stream.
104     *
105     * @param      src   the stream to connect to.
106     * @exception  IOException  if an I/O error occurs.
107     */
108    public PipedInputStream(PipedOutputStream src) throws IOException {
109        this(src, DEFAULT_PIPE_SIZE);
110    }
111
112    /**
113     * Creates a <code>PipedInputStream</code> so that it is
114     * connected to the piped output stream
115     * <code>src</code> and uses the specified pipe size for
116     * the pipe's buffer.
117     * Data bytes written to <code>src</code> will then
118     * be available as input from this stream.
119     *
120     * @param      src   the stream to connect to.
121     * @param      pipeSize the size of the pipe's buffer.
122     * @exception  IOException  if an I/O error occurs.
123     * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
124     * @since      1.6
125     */
126    public PipedInputStream(PipedOutputStream src, int pipeSize)
127            throws IOException {
128         initPipe(pipeSize);
129         connect(src);
130    }
131
132    /**
133     * Creates a <code>PipedInputStream</code> so
134     * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
135     * connected}.
136     * It must be {@linkplain java.io.PipedOutputStream#connect(
137     * java.io.PipedInputStream) connected} to a
138     * <code>PipedOutputStream</code> before being used.
139     */
140    public PipedInputStream() {
141        initPipe(DEFAULT_PIPE_SIZE);
142    }
143
144    /**
145     * Creates a <code>PipedInputStream</code> so that it is not yet
146     * {@linkplain #connect(java.io.PipedOutputStream) connected} and
147     * uses the specified pipe size for the pipe's buffer.
148     * It must be {@linkplain java.io.PipedOutputStream#connect(
149     * java.io.PipedInputStream)
150     * connected} to a <code>PipedOutputStream</code> before being used.
151     *
152     * @param      pipeSize the size of the pipe's buffer.
153     * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
154     * @since      1.6
155     */
156    public PipedInputStream(int pipeSize) {
157        initPipe(pipeSize);
158    }
159
160    private void initPipe(int pipeSize) {
161         if (pipeSize <= 0) {
162            throw new IllegalArgumentException("Pipe Size <= 0");
163         }
164         buffer = new byte[pipeSize];
165    }
166
167    /**
168     * Causes this piped input stream to be connected
169     * to the piped  output stream <code>src</code>.
170     * If this object is already connected to some
171     * other piped output  stream, an <code>IOException</code>
172     * is thrown.
173     * <p>
174     * If <code>src</code> is an
175     * unconnected piped output stream and <code>snk</code>
176     * is an unconnected piped input stream, they
177     * may be connected by either the call:
178     *
179     * <pre><code>snk.connect(src)</code> </pre>
180     * <p>
181     * or the call:
182     *
183     * <pre><code>src.connect(snk)</code> </pre>
184     * <p>
185     * The two calls have the same effect.
186     *
187     * @param      src   The piped output stream to connect to.
188     * @exception  IOException  if an I/O error occurs.
189     */
190    public void connect(PipedOutputStream src) throws IOException {
191        src.connect(this);
192    }
193
194    /**
195     * Receives a byte of data.  This method will block if no input is
196     * available.
197     * @param b the byte being received
198     * @exception IOException If the pipe is <a href="#BROKEN"> <code>broken</code></a>,
199     *          {@link #connect(java.io.PipedOutputStream) unconnected},
200     *          closed, or if an I/O error occurs.
201     * @since     JDK1.1
202     */
203    protected synchronized void receive(int b) throws IOException {
204        checkStateForReceive();
205        writeSide = Thread.currentThread();
206        if (in == out)
207            awaitSpace();
208        if (in < 0) {
209            in = 0;
210            out = 0;
211        }
212        buffer[in++] = (byte)(b & 0xFF);
213        if (in >= buffer.length) {
214            in = 0;
215        }
216    }
217
218    /**
219     * Receives data into an array of bytes.  This method will
220     * block until some input is available.
221     * @param b the buffer into which the data is received
222     * @param off the start offset of the data
223     * @param len the maximum number of bytes received
224     * @exception IOException If the pipe is <a href="#BROKEN"> broken</a>,
225     *           {@link #connect(java.io.PipedOutputStream) unconnected},
226     *           closed,or if an I/O error occurs.
227     */
228    synchronized void receive(byte b[], int off, int len)  throws IOException {
229        checkStateForReceive();
230        writeSide = Thread.currentThread();
231        int bytesToTransfer = len;
232        while (bytesToTransfer > 0) {
233            if (in == out)
234                awaitSpace();
235            int nextTransferAmount = 0;
236            if (out < in) {
237                nextTransferAmount = buffer.length - in;
238            } else if (in < out) {
239                if (in == -1) {
240                    in = out = 0;
241                    nextTransferAmount = buffer.length - in;
242                } else {
243                    nextTransferAmount = out - in;
244                }
245            }
246            if (nextTransferAmount > bytesToTransfer)
247                nextTransferAmount = bytesToTransfer;
248            assert(nextTransferAmount > 0);
249            System.arraycopy(b, off, buffer, in, nextTransferAmount);
250            bytesToTransfer -= nextTransferAmount;
251            off += nextTransferAmount;
252            in += nextTransferAmount;
253            if (in >= buffer.length) {
254                in = 0;
255            }
256        }
257    }
258
259    private void checkStateForReceive() throws IOException {
260        if (!connected) {
261            throw new IOException("Pipe not connected");
262        } else if (closedByWriter || closedByReader) {
263            throw new IOException("Pipe closed");
264        } else if (readSide != null && !readSide.isAlive()) {
265            throw new IOException("Read end dead");
266        }
267    }
268
269    private void awaitSpace() throws IOException {
270        while (in == out) {
271            checkStateForReceive();
272
273            /* full: kick any waiting readers */
274            notifyAll();
275            try {
276                wait(1000);
277            } catch (InterruptedException ex) {
278                // Android-changed: re-set the thread's interrupt status
279                // throw new java.io.InterruptedIOException();
280                IoUtils.throwInterruptedIoException();
281            }
282        }
283    }
284
285    /**
286     * Notifies all waiting threads that the last byte of data has been
287     * received.
288     */
289    synchronized void receivedLast() {
290        closedByWriter = true;
291        notifyAll();
292    }
293
294    /**
295     * Reads the next byte of data from this piped input stream. The
296     * value byte is returned as an <code>int</code> in the range
297     * <code>0</code> to <code>255</code>.
298     * This method blocks until input data is available, the end of the
299     * stream is detected, or an exception is thrown.
300     *
301     * @return     the next byte of data, or <code>-1</code> if the end of the
302     *             stream is reached.
303     * @exception  IOException  if the pipe is
304     *           {@link #connect(java.io.PipedOutputStream) unconnected},
305     *           <a href="#BROKEN"> <code>broken</code></a>, closed,
306     *           or if an I/O error occurs.
307     */
308    public synchronized int read()  throws IOException {
309        if (!connected) {
310            throw new IOException("Pipe not connected");
311        } else if (closedByReader) {
312            throw new IOException("Pipe closed");
313        } else if (writeSide != null && !writeSide.isAlive()
314                   && !closedByWriter && (in < 0)) {
315            throw new IOException("Write end dead");
316        }
317
318        readSide = Thread.currentThread();
319        int trials = 2;
320        while (in < 0) {
321            if (closedByWriter) {
322                /* closed by writer, return EOF */
323                return -1;
324            }
325            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
326                throw new IOException("Pipe broken");
327            }
328            /* might be a writer waiting */
329            notifyAll();
330            try {
331                wait(1000);
332            } catch (InterruptedException ex) {
333                // Android-changed: re-set the thread's interrupt status
334                // throw new java.io.InterruptedIOException();
335                IoUtils.throwInterruptedIoException();
336            }
337        }
338        int ret = buffer[out++] & 0xFF;
339        if (out >= buffer.length) {
340            out = 0;
341        }
342        if (in == out) {
343            /* now empty */
344            in = -1;
345        }
346
347        return ret;
348    }
349
350    /**
351     * Reads up to <code>len</code> bytes of data from this piped input
352     * stream into an array of bytes. Less than <code>len</code> bytes
353     * will be read if the end of the data stream is reached or if
354     * <code>len</code> exceeds the pipe's buffer size.
355     * If <code>len </code> is zero, then no bytes are read and 0 is returned;
356     * otherwise, the method blocks until at least 1 byte of input is
357     * available, end of the stream has been detected, or an exception is
358     * thrown.
359     *
360     * @param      b     the buffer into which the data is read.
361     * @param      off   the start offset in the destination array <code>b</code>
362     * @param      len   the maximum number of bytes read.
363     * @return     the total number of bytes read into the buffer, or
364     *             <code>-1</code> if there is no more data because the end of
365     *             the stream has been reached.
366     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
367     * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
368     * <code>len</code> is negative, or <code>len</code> is greater than
369     * <code>b.length - off</code>
370     * @exception  IOException if the pipe is <a href="#BROKEN"> <code>broken</code></a>,
371     *           {@link #connect(java.io.PipedOutputStream) unconnected},
372     *           closed, or if an I/O error occurs.
373     */
374    public synchronized int read(byte b[], int off, int len)  throws IOException {
375        if (b == null) {
376            throw new NullPointerException();
377        } else if (off < 0 || len < 0 || len > b.length - off) {
378            throw new IndexOutOfBoundsException();
379        } else if (len == 0) {
380            return 0;
381        }
382
383        /* possibly wait on the first character */
384        int c = read();
385        if (c < 0) {
386            return -1;
387        }
388        b[off] = (byte) c;
389        int rlen = 1;
390        while ((in >= 0) && (len > 1)) {
391
392            int available;
393
394            if (in > out) {
395                available = Math.min((buffer.length - out), (in - out));
396            } else {
397                available = buffer.length - out;
398            }
399
400            // A byte is read beforehand outside the loop
401            if (available > (len - 1)) {
402                available = len - 1;
403            }
404            System.arraycopy(buffer, out, b, off + rlen, available);
405            out += available;
406            rlen += available;
407            len -= available;
408
409            if (out >= buffer.length) {
410                out = 0;
411            }
412            if (in == out) {
413                /* now empty */
414                in = -1;
415            }
416        }
417        return rlen;
418    }
419
420    /**
421     * Returns the number of bytes that can be read from this input
422     * stream without blocking.
423     *
424     * @return the number of bytes that can be read from this input stream
425     *         without blocking, or {@code 0} if this input stream has been
426     *         closed by invoking its {@link #close()} method, or if the pipe
427     *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
428     *          <a href="#BROKEN"> <code>broken</code></a>.
429     *
430     * @exception  IOException  if an I/O error occurs.
431     * @since   JDK1.0.2
432     */
433    public synchronized int available() throws IOException {
434        if(in < 0)
435            return 0;
436        else if(in == out)
437            return buffer.length;
438        else if (in > out)
439            return in - out;
440        else
441            return in + buffer.length - out;
442    }
443
444    /**
445     * Closes this piped input stream and releases any system resources
446     * associated with the stream.
447     *
448     * @exception  IOException  if an I/O error occurs.
449     */
450    public void close()  throws IOException {
451        closedByReader = true;
452        synchronized (this) {
453            in = -1;
454        }
455    }
456}
457