1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.io; 19 20import java.util.Arrays; 21import libcore.io.IoUtils; 22 23/** 24 * Receives information from a communications pipe. When two threads want to 25 * pass data back and forth, one creates a piped output stream and the other one 26 * creates a piped input stream. 27 * 28 * @see PipedOutputStream 29 */ 30public class PipedInputStream extends InputStream { 31 32 private Thread lastReader; 33 34 private Thread lastWriter; 35 36 private boolean isClosed; 37 38 /** 39 * The circular buffer through which data is passed. Data is read from the 40 * range {@code [out, in)} and written to the range {@code [in, out)}. 41 * Data in the buffer is either sequential: <pre> 42 * { - - - X X X X X X X - - - - - } 43 * ^ ^ 44 * | | 45 * out in</pre> 46 * ...or wrapped around the buffer's end: <pre> 47 * { X X X X - - - - - - - - X X X } 48 * ^ ^ 49 * | | 50 * in out</pre> 51 * When the buffer is empty, {@code in == -1}. Reading when the buffer is 52 * empty will block until data is available. When the buffer is full, 53 * {@code in == out}. Writing when the buffer is full will block until free 54 * space is available. 55 */ 56 protected byte[] buffer; 57 58 /** 59 * The index in {@code buffer} where the next byte will be written. 60 */ 61 protected int in = -1; 62 63 /** 64 * The index in {@code buffer} where the next byte will be read. 65 */ 66 protected int out; 67 68 /** 69 * The size of the default pipe in bytes. 70 */ 71 protected static final int PIPE_SIZE = 1024; 72 73 /** 74 * Indicates if this pipe is connected. 75 */ 76 boolean isConnected; 77 78 /** 79 * Constructs a new unconnected {@code PipedInputStream}. The resulting 80 * stream must be connected to a {@link PipedOutputStream} before data may 81 * be read from it. 82 */ 83 public PipedInputStream() {} 84 85 /** 86 * Constructs a new {@code PipedInputStream} connected to the 87 * {@link PipedOutputStream} {@code out}. Any data written to the output 88 * stream can be read from the this input stream. 89 * 90 * @param out 91 * the piped output stream to connect to. 92 * @throws IOException 93 * if this stream or {@code out} are already connected. 94 */ 95 public PipedInputStream(PipedOutputStream out) throws IOException { 96 connect(out); 97 } 98 99 /** 100 * Constructs a new unconnected {@code PipedInputStream} with the given 101 * buffer size. The resulting stream must be connected to a 102 * {@code PipedOutputStream} before data may be read from it. 103 * 104 * @param pipeSize the size of the buffer in bytes. 105 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 106 * @since 1.6 107 */ 108 public PipedInputStream(int pipeSize) { 109 if (pipeSize <= 0) { 110 throw new IllegalArgumentException("pipe size " + pipeSize + " too small"); 111 } 112 buffer = new byte[pipeSize]; 113 } 114 115 /** 116 * Constructs a new {@code PipedInputStream} connected to the given {@code PipedOutputStream}, 117 * with the given buffer size. Any data written to the output stream can be read from this 118 * input stream. 119 * 120 * @param out the {@code PipedOutputStream} to connect to. 121 * @param pipeSize the size of the buffer in bytes. 122 * @throws IOException if an I/O error occurs. 123 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 124 * @since 1.6 125 */ 126 public PipedInputStream(PipedOutputStream out, int pipeSize) throws IOException { 127 this(pipeSize); 128 connect(out); 129 } 130 131 /** 132 * {@inheritDoc} 133 * 134 * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing 135 * {@code IOException} if the stream has been closed. Unconnected and broken pipes also 136 * return 0. 137 * 138 * @throws IOException if an I/O error occurs 139 */ 140 @Override 141 public synchronized int available() throws IOException { 142 if (buffer == null || in == -1) { 143 return 0; 144 } 145 return in <= out ? buffer.length - out + in : in - out; 146 } 147 148 /** 149 * Closes this stream. This implementation releases the buffer used for the 150 * pipe and notifies all threads waiting to read or write. 151 * 152 * @throws IOException 153 * if an error occurs while closing this stream. 154 */ 155 @Override 156 public synchronized void close() throws IOException { 157 buffer = null; 158 notifyAll(); 159 } 160 161 /** 162 * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}. 163 * Any data written to the output stream becomes readable in this input 164 * stream. 165 * 166 * @param src 167 * the source output stream. 168 * @throws IOException 169 * if either stream is already connected. 170 */ 171 public void connect(PipedOutputStream src) throws IOException { 172 src.connect(this); 173 } 174 175 /** 176 * Establishes the connection to the PipedOutputStream. 177 * 178 * @throws IOException 179 * If this Reader is already connected. 180 */ 181 synchronized void establishConnection() throws IOException { 182 if (isConnected) { 183 throw new IOException("Pipe already connected"); 184 } 185 if (buffer == null) { // We may already have allocated the buffer. 186 buffer = new byte[PipedInputStream.PIPE_SIZE]; 187 } 188 isConnected = true; 189 } 190 191 /** 192 * Reads a single byte from this stream and returns it as an integer in the 193 * range from 0 to 255. Returns -1 if the end of this stream has been 194 * reached. If there is no data in the pipe, this method blocks until data 195 * is available, the end of the stream is detected or an exception is 196 * thrown. 197 * <p> 198 * Separate threads should be used to read from a {@code PipedInputStream} 199 * and to write to the connected {@link PipedOutputStream}. If the same 200 * thread is used, a deadlock may occur. 201 * 202 * @return the byte read or -1 if the end of the source stream has been 203 * reached. 204 * @throws IOException 205 * if this stream is closed or not connected to an output 206 * stream, or if the thread writing to the connected output 207 * stream is no longer alive. 208 */ 209 @Override 210 public synchronized int read() throws IOException { 211 if (!isConnected) { 212 throw new IOException("Not connected"); 213 } 214 if (buffer == null) { 215 throw new IOException("InputStream is closed"); 216 } 217 218 /** 219 * Set the last thread to be reading on this PipedInputStream. If 220 * lastReader dies while someone is waiting to write an IOException of 221 * "Pipe broken" will be thrown in receive() 222 */ 223 lastReader = Thread.currentThread(); 224 try { 225 int attempts = 3; 226 while (in == -1) { 227 // Are we at end of stream? 228 if (isClosed) { 229 return -1; 230 } 231 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 232 throw new IOException("Pipe broken"); 233 } 234 // Notify callers of receive() 235 notifyAll(); 236 wait(1000); 237 } 238 } catch (InterruptedException e) { 239 IoUtils.throwInterruptedIoException(); 240 } 241 242 int result = buffer[out++] & 0xff; 243 if (out == buffer.length) { 244 out = 0; 245 } 246 if (out == in) { 247 // empty buffer 248 in = -1; 249 out = 0; 250 } 251 252 // let blocked writers write to the newly available buffer space 253 notifyAll(); 254 255 return result; 256 } 257 258 /** 259 * Reads at most {@code byteCount} bytes from this stream and stores them in the 260 * byte array {@code bytes} starting at {@code offset}. Blocks until at 261 * least one byte has been read, the end of the stream is detected or an 262 * exception is thrown. 263 * <p> 264 * Separate threads should be used to read from a {@code PipedInputStream} 265 * and to write to the connected {@link PipedOutputStream}. If the same 266 * thread is used, a deadlock may occur. 267 * 268 * @return the number of bytes actually read or -1 if the end of the stream 269 * has been reached. 270 * @throws IndexOutOfBoundsException 271 * if {@code offset < 0} or {@code byteCount < 0}, or if {@code 272 * offset + byteCount} is greater than the size of {@code bytes}. 273 * @throws InterruptedIOException 274 * if the thread reading from this stream is interrupted. 275 * @throws IOException 276 * if this stream is closed or not connected to an output 277 * stream, or if the thread writing to the connected output 278 * stream is no longer alive. 279 * @throws NullPointerException 280 * if {@code bytes} is {@code null}. 281 */ 282 @Override 283 public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException { 284 Arrays.checkOffsetAndCount(bytes.length, offset, byteCount); 285 if (byteCount == 0) { 286 return 0; 287 } 288 289 if (!isConnected) { 290 throw new IOException("Not connected"); 291 } 292 293 if (buffer == null) { 294 throw new IOException("InputStream is closed"); 295 } 296 297 /* 298 * Set the last thread to be reading on this PipedInputStream. If 299 * lastReader dies while someone is waiting to write an IOException of 300 * "Pipe broken" will be thrown in receive() 301 */ 302 lastReader = Thread.currentThread(); 303 try { 304 int attempts = 3; 305 while (in == -1) { 306 // Are we at end of stream? 307 if (isClosed) { 308 return -1; 309 } 310 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 311 throw new IOException("Pipe broken"); 312 } 313 // Notify callers of receive() 314 notifyAll(); 315 wait(1000); 316 } 317 } catch (InterruptedException e) { 318 IoUtils.throwInterruptedIoException(); 319 } 320 321 int totalCopied = 0; 322 323 // copy bytes from out thru the end of buffer 324 if (out >= in) { 325 int leftInBuffer = buffer.length - out; 326 int length = leftInBuffer < byteCount ? leftInBuffer : byteCount; 327 System.arraycopy(buffer, out, bytes, offset, length); 328 out += length; 329 if (out == buffer.length) { 330 out = 0; 331 } 332 if (out == in) { 333 // empty buffer 334 in = -1; 335 out = 0; 336 } 337 totalCopied += length; 338 } 339 340 // copy bytes from out thru in 341 if (totalCopied < byteCount && in != -1) { 342 int leftInBuffer = in - out; 343 int leftToCopy = byteCount - totalCopied; 344 int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer; 345 System.arraycopy(buffer, out, bytes, offset + totalCopied, length); 346 out += length; 347 if (out == in) { 348 // empty buffer 349 in = -1; 350 out = 0; 351 } 352 totalCopied += length; 353 } 354 355 // let blocked writers write to the newly available buffer space 356 notifyAll(); 357 358 return totalCopied; 359 } 360 361 /** 362 * Receives a byte and stores it in this stream's {@code buffer}. This 363 * method is called by {@link PipedOutputStream#write(int)}. The least 364 * significant byte of the integer {@code oneByte} is stored at index 365 * {@code in} in the {@code buffer}. 366 * <p> 367 * This method blocks as long as {@code buffer} is full. 368 * 369 * @param oneByte 370 * the byte to store in this pipe. 371 * @throws InterruptedIOException 372 * if the {@code buffer} is full and the thread that has called 373 * this method is interrupted. 374 * @throws IOException 375 * if this stream is closed or the thread that has last read 376 * from this stream is no longer alive. 377 */ 378 protected synchronized void receive(int oneByte) throws IOException { 379 if (buffer == null || isClosed) { 380 throw new IOException("Pipe is closed"); 381 } 382 383 /* 384 * Set the last thread to be writing on this PipedInputStream. If 385 * lastWriter dies while someone is waiting to read an IOException of 386 * "Pipe broken" will be thrown in read() 387 */ 388 lastWriter = Thread.currentThread(); 389 try { 390 while (buffer != null && out == in) { 391 if (lastReader != null && !lastReader.isAlive()) { 392 throw new IOException("Pipe broken"); 393 } 394 notifyAll(); 395 wait(1000); 396 } 397 } catch (InterruptedException e) { 398 IoUtils.throwInterruptedIoException(); 399 } 400 if (buffer == null) { 401 throw new IOException("Pipe is closed"); 402 } 403 if (in == -1) { 404 in = 0; 405 } 406 buffer[in++] = (byte) oneByte; 407 if (in == buffer.length) { 408 in = 0; 409 } 410 411 // let blocked readers read the newly available data 412 notifyAll(); 413 } 414 415 synchronized void done() { 416 isClosed = true; 417 notifyAll(); 418 } 419} 420