1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.io; 19 20import java.util.Arrays; 21 22/** 23 * Receives information from a communications pipe. When two threads want to 24 * pass data back and forth, one creates a piped output stream and the other one 25 * creates a piped input stream. 26 * 27 * @see PipedOutputStream 28 */ 29public class PipedInputStream extends InputStream { 30 31 private Thread lastReader; 32 33 private Thread lastWriter; 34 35 private boolean isClosed; 36 37 /** 38 * The circular buffer through which data is passed. Data is read from the 39 * range {@code [out, in)} and written to the range {@code [in, out)}. 40 * Data in the buffer is either sequential: <pre> 41 * { - - - X X X X X X X - - - - - } 42 * ^ ^ 43 * | | 44 * out in</pre> 45 * ...or wrapped around the buffer's end: <pre> 46 * { X X X X - - - - - - - - X X X } 47 * ^ ^ 48 * | | 49 * in out</pre> 50 * When the buffer is empty, {@code in == -1}. Reading when the buffer is 51 * empty will block until data is available. When the buffer is full, 52 * {@code in == out}. Writing when the buffer is full will block until free 53 * space is available. 54 */ 55 protected byte[] buffer; 56 57 /** 58 * The index in {@code buffer} where the next byte will be written. 59 */ 60 protected int in = -1; 61 62 /** 63 * The index in {@code buffer} where the next byte will be read. 64 */ 65 protected int out; 66 67 /** 68 * The size of the default pipe in bytes. 69 */ 70 protected static final int PIPE_SIZE = 1024; 71 72 /** 73 * Indicates if this pipe is connected. 74 */ 75 boolean isConnected; 76 77 /** 78 * Constructs a new unconnected {@code PipedInputStream}. The resulting 79 * stream must be connected to a {@link PipedOutputStream} before data may 80 * be read from it. 81 */ 82 public PipedInputStream() {} 83 84 /** 85 * Constructs a new {@code PipedInputStream} connected to the 86 * {@link PipedOutputStream} {@code out}. Any data written to the output 87 * stream can be read from the this input stream. 88 * 89 * @param out 90 * the piped output stream to connect to. 91 * @throws IOException 92 * if this stream or {@code out} are already connected. 93 */ 94 public PipedInputStream(PipedOutputStream out) throws IOException { 95 connect(out); 96 } 97 98 /** 99 * Constructs a new unconnected {@code PipedInputStream} with the given 100 * buffer size. The resulting stream must be connected to a 101 * {@code PipedOutputStream} before data may be read from it. 102 * 103 * @param pipeSize the size of the buffer in bytes. 104 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 105 * @since 1.6 106 */ 107 public PipedInputStream(int pipeSize) { 108 if (pipeSize <= 0) { 109 throw new IllegalArgumentException("pipe size " + pipeSize + " too small"); 110 } 111 buffer = new byte[pipeSize]; 112 } 113 114 /** 115 * Constructs a new {@code PipedInputStream} connected to the given {@code PipedOutputStream}, 116 * with the given buffer size. Any data written to the output stream can be read from this 117 * input stream. 118 * 119 * @param out the {@code PipedOutputStream} to connect to. 120 * @param pipeSize the size of the buffer in bytes. 121 * @throws IOException if an I/O error occurs. 122 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 123 * @since 1.6 124 */ 125 public PipedInputStream(PipedOutputStream out, int pipeSize) throws IOException { 126 this(pipeSize); 127 connect(out); 128 } 129 130 /** 131 * {@inheritDoc} 132 * 133 * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing 134 * {@code IOException} if the stream has been closed. Unconnected and broken pipes also 135 * return 0. 136 * 137 * @throws IOException if an I/O error occurs 138 */ 139 @Override 140 public synchronized int available() throws IOException { 141 if (buffer == null || in == -1) { 142 return 0; 143 } 144 return in <= out ? buffer.length - out + in : in - out; 145 } 146 147 /** 148 * Closes this stream. This implementation releases the buffer used for the 149 * pipe and notifies all threads waiting to read or write. 150 * 151 * @throws IOException 152 * if an error occurs while closing this stream. 153 */ 154 @Override 155 public synchronized void close() throws IOException { 156 buffer = null; 157 notifyAll(); 158 } 159 160 /** 161 * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}. 162 * Any data written to the output stream becomes readable in this input 163 * stream. 164 * 165 * @param src 166 * the source output stream. 167 * @throws IOException 168 * if either stream is already connected. 169 */ 170 public void connect(PipedOutputStream src) throws IOException { 171 src.connect(this); 172 } 173 174 /** 175 * Establishes the connection to the PipedOutputStream. 176 * 177 * @throws IOException 178 * If this Reader is already connected. 179 */ 180 synchronized void establishConnection() throws IOException { 181 if (isConnected) { 182 throw new IOException("Pipe already connected"); 183 } 184 if (buffer == null) { // We may already have allocated the buffer. 185 buffer = new byte[PipedInputStream.PIPE_SIZE]; 186 } 187 isConnected = true; 188 } 189 190 /** 191 * Reads a single byte from this stream and returns it as an integer in the 192 * range from 0 to 255. Returns -1 if the end of this stream has been 193 * reached. If there is no data in the pipe, this method blocks until data 194 * is available, the end of the stream is detected or an exception is 195 * thrown. 196 * <p> 197 * Separate threads should be used to read from a {@code PipedInputStream} 198 * and to write to the connected {@link PipedOutputStream}. If the same 199 * thread is used, a deadlock may occur. 200 * 201 * @return the byte read or -1 if the end of the source stream has been 202 * reached. 203 * @throws IOException 204 * if this stream is closed or not connected to an output 205 * stream, or if the thread writing to the connected output 206 * stream is no longer alive. 207 */ 208 @Override 209 public synchronized int read() throws IOException { 210 if (!isConnected) { 211 throw new IOException("Not connected"); 212 } 213 if (buffer == null) { 214 throw new IOException("InputStream is closed"); 215 } 216 217 /** 218 * Set the last thread to be reading on this PipedInputStream. If 219 * lastReader dies while someone is waiting to write an IOException of 220 * "Pipe broken" will be thrown in receive() 221 */ 222 lastReader = Thread.currentThread(); 223 try { 224 int attempts = 3; 225 while (in == -1) { 226 // Are we at end of stream? 227 if (isClosed) { 228 return -1; 229 } 230 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 231 throw new IOException("Pipe broken"); 232 } 233 // Notify callers of receive() 234 notifyAll(); 235 wait(1000); 236 } 237 } catch (InterruptedException e) { 238 throw new InterruptedIOException(); 239 } 240 241 int result = buffer[out++] & 0xff; 242 if (out == buffer.length) { 243 out = 0; 244 } 245 if (out == in) { 246 // empty buffer 247 in = -1; 248 out = 0; 249 } 250 251 // let blocked writers write to the newly available buffer space 252 notifyAll(); 253 254 return result; 255 } 256 257 /** 258 * Reads at most {@code byteCount} bytes from this stream and stores them in the 259 * byte array {@code bytes} starting at {@code offset}. Blocks until at 260 * least one byte has been read, the end of the stream is detected or an 261 * exception is thrown. 262 * <p> 263 * Separate threads should be used to read from a {@code PipedInputStream} 264 * and to write to the connected {@link PipedOutputStream}. If the same 265 * thread is used, a deadlock may occur. 266 * 267 * @return the number of bytes actually read or -1 if the end of the stream 268 * has been reached. 269 * @throws IndexOutOfBoundsException 270 * if {@code offset < 0} or {@code byteCount < 0}, or if {@code 271 * offset + byteCount} is greater than the size of {@code bytes}. 272 * @throws InterruptedIOException 273 * if the thread reading from this stream is interrupted. 274 * @throws IOException 275 * if this stream is closed or not connected to an output 276 * stream, or if the thread writing to the connected output 277 * stream is no longer alive. 278 * @throws NullPointerException 279 * if {@code bytes} is {@code null}. 280 */ 281 @Override 282 public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException { 283 Arrays.checkOffsetAndCount(bytes.length, offset, byteCount); 284 if (byteCount == 0) { 285 return 0; 286 } 287 288 if (!isConnected) { 289 throw new IOException("Not connected"); 290 } 291 292 if (buffer == null) { 293 throw new IOException("InputStream is closed"); 294 } 295 296 /* 297 * Set the last thread to be reading on this PipedInputStream. If 298 * lastReader dies while someone is waiting to write an IOException of 299 * "Pipe broken" will be thrown in receive() 300 */ 301 lastReader = Thread.currentThread(); 302 try { 303 int attempts = 3; 304 while (in == -1) { 305 // Are we at end of stream? 306 if (isClosed) { 307 return -1; 308 } 309 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 310 throw new IOException("Pipe broken"); 311 } 312 // Notify callers of receive() 313 notifyAll(); 314 wait(1000); 315 } 316 } catch (InterruptedException e) { 317 throw new InterruptedIOException(); 318 } 319 320 int totalCopied = 0; 321 322 // copy bytes from out thru the end of buffer 323 if (out >= in) { 324 int leftInBuffer = buffer.length - out; 325 int length = leftInBuffer < byteCount ? leftInBuffer : byteCount; 326 System.arraycopy(buffer, out, bytes, offset, length); 327 out += length; 328 if (out == buffer.length) { 329 out = 0; 330 } 331 if (out == in) { 332 // empty buffer 333 in = -1; 334 out = 0; 335 } 336 totalCopied += length; 337 } 338 339 // copy bytes from out thru in 340 if (totalCopied < byteCount && in != -1) { 341 int leftInBuffer = in - out; 342 int leftToCopy = byteCount - totalCopied; 343 int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer; 344 System.arraycopy(buffer, out, bytes, offset + totalCopied, length); 345 out += length; 346 if (out == in) { 347 // empty buffer 348 in = -1; 349 out = 0; 350 } 351 totalCopied += length; 352 } 353 354 // let blocked writers write to the newly available buffer space 355 notifyAll(); 356 357 return totalCopied; 358 } 359 360 /** 361 * Receives a byte and stores it in this stream's {@code buffer}. This 362 * method is called by {@link PipedOutputStream#write(int)}. The least 363 * significant byte of the integer {@code oneByte} is stored at index 364 * {@code in} in the {@code buffer}. 365 * <p> 366 * This method blocks as long as {@code buffer} is full. 367 * 368 * @param oneByte 369 * the byte to store in this pipe. 370 * @throws InterruptedIOException 371 * if the {@code buffer} is full and the thread that has called 372 * this method is interrupted. 373 * @throws IOException 374 * if this stream is closed or the thread that has last read 375 * from this stream is no longer alive. 376 */ 377 protected synchronized void receive(int oneByte) throws IOException { 378 if (buffer == null || isClosed) { 379 throw new IOException("Pipe is closed"); 380 } 381 382 /* 383 * Set the last thread to be writing on this PipedInputStream. If 384 * lastWriter dies while someone is waiting to read an IOException of 385 * "Pipe broken" will be thrown in read() 386 */ 387 lastWriter = Thread.currentThread(); 388 try { 389 while (buffer != null && out == in) { 390 if (lastReader != null && !lastReader.isAlive()) { 391 throw new IOException("Pipe broken"); 392 } 393 notifyAll(); 394 wait(1000); 395 } 396 } catch (InterruptedException e) { 397 throw new InterruptedIOException(); 398 } 399 if (buffer == null) { 400 throw new IOException("Pipe is closed"); 401 } 402 if (in == -1) { 403 in = 0; 404 } 405 buffer[in++] = (byte) oneByte; 406 if (in == buffer.length) { 407 in = 0; 408 } 409 410 // let blocked readers read the newly available data 411 notifyAll(); 412 } 413 414 synchronized void done() { 415 isClosed = true; 416 notifyAll(); 417 } 418} 419