1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18// BEGIN android-note 19// We've made several changes including: 20// - delayed buffer creation until pipe connection 21// - throw an IOException when a pipe is closed during a write 22// - improved consistency with PipedReader 23// END android-note 24 25package java.io; 26 27import org.apache.harmony.luni.util.Msg; 28 29/** 30 * Receives information from a communications pipe. When two threads want to 31 * pass data back and forth, one creates a piped output stream and the other one 32 * creates a piped input stream. 33 * 34 * @see PipedOutputStream 35 */ 36public class PipedInputStream extends InputStream { 37 38 private Thread lastReader; 39 40 private Thread lastWriter; 41 42 private boolean isClosed; 43 44 /** 45 * The circular buffer through which data is passed. Data is read from the 46 * range {@code [out, in)} and written to the range {@code [in, out)}. 47 * Data in the buffer is either sequential: <pre> 48 * { - - - X X X X X X X - - - - - } 49 * ^ ^ 50 * | | 51 * out in</pre> 52 * ...or wrapped around the buffer's end: <pre> 53 * { X X X X - - - - - - - - X X X } 54 * ^ ^ 55 * | | 56 * in out</pre> 57 * When the buffer is empty, {@code in == -1}. Reading when the buffer is 58 * empty will block until data is available. When the buffer is full, 59 * {@code in == out}. Writing when the buffer is full will block until free 60 * space is available. 61 */ 62 protected byte buffer[]; 63 64 /** 65 * The index in {@code buffer} where the next byte will be written. 66 */ 67 protected int in = -1; 68 69 /** 70 * The index in {@code buffer} where the next byte will be read. 71 */ 72 protected int out; 73 74 /** 75 * The size of the default pipe in bytes. 76 */ 77 protected static final int PIPE_SIZE = 1024; 78 79 /** 80 * Indicates if this pipe is connected. 81 */ 82 boolean isConnected; 83 84 /** 85 * Constructs a new unconnected {@code PipedInputStream}. The resulting 86 * stream must be connected to a {@link PipedOutputStream} before data may 87 * be read from it. 88 */ 89 public PipedInputStream() {} 90 91 /** 92 * Constructs a new {@code PipedInputStream} connected to the 93 * {@link PipedOutputStream} {@code out}. Any data written to the output 94 * stream can be read from the this input stream. 95 * 96 * @param out 97 * the piped output stream to connect to. 98 * @throws IOException 99 * if this stream or {@code out} are already connected. 100 */ 101 public PipedInputStream(PipedOutputStream out) throws IOException { 102 connect(out); 103 } 104 105 /** 106 * {@inheritDoc} 107 * 108 * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing 109 * {@code IOException} if the stream has been closed. Unconnected and broken pipes also 110 * return 0. 111 * 112 * @throws IOException if an I/O error occurs 113 */ 114 @Override 115 public synchronized int available() throws IOException { 116 if (buffer == null || in == -1) { 117 return 0; 118 } 119 return in <= out ? buffer.length - out + in : in - out; 120 } 121 122 /** 123 * Closes this stream. This implementation releases the buffer used for the 124 * pipe and notifies all threads waiting to read or write. 125 * 126 * @throws IOException 127 * if an error occurs while closing this stream. 128 */ 129 @Override 130 public synchronized void close() throws IOException { 131 buffer = null; 132 notifyAll(); 133 } 134 135 /** 136 * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}. 137 * Any data written to the output stream becomes readable in this input 138 * stream. 139 * 140 * @param src 141 * the source output stream. 142 * @throws IOException 143 * if either stream is already connected. 144 */ 145 public void connect(PipedOutputStream src) throws IOException { 146 src.connect(this); 147 } 148 149 /** 150 * Establishes the connection to the PipedOutputStream. 151 * 152 * @throws IOException 153 * If this Reader is already connected. 154 */ 155 synchronized void establishConnection() throws IOException { 156 if (isConnected) { 157 throw new IOException(Msg.getString("K007a")); //$NON-NLS-1$ 158 } 159 buffer = new byte[PipedInputStream.PIPE_SIZE]; 160 isConnected = true; 161 } 162 163 /** 164 * Reads a single byte from this stream and returns it as an integer in the 165 * range from 0 to 255. Returns -1 if the end of this stream has been 166 * reached. If there is no data in the pipe, this method blocks until data 167 * is available, the end of the stream is detected or an exception is 168 * thrown. 169 * <p> 170 * Separate threads should be used to read from a {@code PipedInputStream} 171 * and to write to the connected {@link PipedOutputStream}. If the same 172 * thread is used, a deadlock may occur. 173 * 174 * @return the byte read or -1 if the end of the source stream has been 175 * reached. 176 * @throws IOException 177 * if this stream is closed or not connected to an output 178 * stream, or if the thread writing to the connected output 179 * stream is no longer alive. 180 */ 181 @Override 182 public synchronized int read() throws IOException { 183 if (!isConnected) { 184 // K0074=Not connected 185 throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$ 186 } 187 if (buffer == null) { 188 // K0075=InputStream is closed 189 throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$ 190 } 191 192 // BEGIN android-removed 193 // eagerly throwing prevents checking isClosed and returning normally 194 // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) { 195 // // KA030=Write end dead 196 // throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$ 197 // } 198 // END android-removed 199 200 /** 201 * Set the last thread to be reading on this PipedInputStream. If 202 * lastReader dies while someone is waiting to write an IOException of 203 * "Pipe broken" will be thrown in receive() 204 */ 205 lastReader = Thread.currentThread(); 206 try { 207 int attempts = 3; 208 while (in == -1) { 209 // Are we at end of stream? 210 if (isClosed) { 211 return -1; 212 } 213 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 214 // K0076=Pipe broken 215 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 216 } 217 // Notify callers of receive() 218 notifyAll(); 219 wait(1000); 220 } 221 } catch (InterruptedException e) { 222 throw new InterruptedIOException(); 223 } 224 225 // BEGIN android-changed 226 int result = buffer[out++] & 0xff; 227 if (out == buffer.length) { 228 out = 0; 229 } 230 if (out == in) { 231 // empty buffer 232 in = -1; 233 out = 0; 234 } 235 236 // let blocked writers write to the newly available buffer space 237 notifyAll(); 238 239 return result; 240 // END android-changed 241 } 242 243 /** 244 * Reads at most {@code count} bytes from this stream and stores them in the 245 * byte array {@code bytes} starting at {@code offset}. Blocks until at 246 * least one byte has been read, the end of the stream is detected or an 247 * exception is thrown. 248 * <p> 249 * Separate threads should be used to read from a {@code PipedInputStream} 250 * and to write to the connected {@link PipedOutputStream}. If the same 251 * thread is used, a deadlock may occur. 252 * 253 * @param bytes 254 * the array in which to store the bytes read. 255 * @param offset 256 * the initial position in {@code bytes} to store the bytes 257 * read from this stream. 258 * @param count 259 * the maximum number of bytes to store in {@code bytes}. 260 * @return the number of bytes actually read or -1 if the end of the stream 261 * has been reached. 262 * @throws IndexOutOfBoundsException 263 * if {@code offset < 0} or {@code count < 0}, or if {@code 264 * offset + count} is greater than the size of {@code bytes}. 265 * @throws InterruptedIOException 266 * if the thread reading from this stream is interrupted. 267 * @throws IOException 268 * if this stream is closed or not connected to an output 269 * stream, or if the thread writing to the connected output 270 * stream is no longer alive. 271 * @throws NullPointerException 272 * if {@code bytes} is {@code null}. 273 */ 274 @Override 275 public synchronized int read(byte[] bytes, int offset, int count) 276 throws IOException { 277 // BEGIN android-changed 278 if (bytes == null) { 279 throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$ 280 } 281 282 // Exception priorities (in case of multiple errors) differ from 283 // RI, but are spec-compliant. 284 // removed redundant check, used (offset | count) < 0 285 // instead of (offset < 0) || (count < 0) to safe one operation 286 if ((offset | count) < 0 || count > bytes.length - offset) { 287 throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$ 288 } 289 // END android-changed 290 291 if (count == 0) { 292 return 0; 293 } 294 295 if (!isConnected) { 296 // K0074=Not connected 297 throw new IOException(Msg.getString("K0074")); //$NON-NLS-1$ 298 } 299 300 if (buffer == null) { 301 // K0075=InputStream is closed 302 throw new IOException(Msg.getString("K0075")); //$NON-NLS-1$ 303 } 304 305 // BEGIN android-removed 306 // eagerly throwing prevents checking isClosed and returning normally 307 // if (lastWriter != null && !lastWriter.isAlive() && (in < 0)) { 308 // // KA030=Write end dead 309 // throw new IOException(Msg.getString("KA030")); //$NON-NLS-1$ 310 // } 311 // END android-removed 312 313 /** 314 * Set the last thread to be reading on this PipedInputStream. If 315 * lastReader dies while someone is waiting to write an IOException of 316 * "Pipe broken" will be thrown in receive() 317 */ 318 lastReader = Thread.currentThread(); 319 try { 320 int attempts = 3; 321 while (in == -1) { 322 // Are we at end of stream? 323 if (isClosed) { 324 return -1; 325 } 326 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) { 327 // K0076=Pipe broken 328 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 329 } 330 // Notify callers of receive() 331 notifyAll(); 332 wait(1000); 333 } 334 } catch (InterruptedException e) { 335 throw new InterruptedIOException(); 336 } 337 338 // BEGIN android-changed 339 int totalCopied = 0; 340 341 // copy bytes from out thru the end of buffer 342 if (out >= in) { 343 int leftInBuffer = buffer.length - out; 344 int length = leftInBuffer < count ? leftInBuffer : count; 345 System.arraycopy(buffer, out, bytes, offset, length); 346 out += length; 347 if (out == buffer.length) { 348 out = 0; 349 } 350 if (out == in) { 351 // empty buffer 352 in = -1; 353 out = 0; 354 } 355 totalCopied += length; 356 } 357 358 // copy bytes from out thru in 359 if (totalCopied < count && in != -1) { 360 int leftInBuffer = in - out; 361 int leftToCopy = count - totalCopied; 362 int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer; 363 System.arraycopy(buffer, out, bytes, offset + totalCopied, length); 364 out += length; 365 if (out == in) { 366 // empty buffer 367 in = -1; 368 out = 0; 369 } 370 totalCopied += length; 371 } 372 373 // let blocked writers write to the newly available buffer space 374 notifyAll(); 375 376 return totalCopied; 377 // END android-changed 378 } 379 380 /** 381 * Receives a byte and stores it in this stream's {@code buffer}. This 382 * method is called by {@link PipedOutputStream#write(int)}. The least 383 * significant byte of the integer {@code oneByte} is stored at index 384 * {@code in} in the {@code buffer}. 385 * <p> 386 * This method blocks as long as {@code buffer} is full. 387 * 388 * @param oneByte 389 * the byte to store in this pipe. 390 * @throws InterruptedIOException 391 * if the {@code buffer} is full and the thread that has called 392 * this method is interrupted. 393 * @throws IOException 394 * if this stream is closed or the thread that has last read 395 * from this stream is no longer alive. 396 */ 397 protected synchronized void receive(int oneByte) throws IOException { 398 if (buffer == null || isClosed) { 399 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 400 } 401 // BEGIN android-removed 402 // eagerly throwing causes us to fail even if the buffer's not full 403 // if (lastReader != null && !lastReader.isAlive()) { 404 // throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 405 // } 406 // END android-removed 407 /** 408 * Set the last thread to be writing on this PipedInputStream. If 409 * lastWriter dies while someone is waiting to read an IOException of 410 * "Pipe broken" will be thrown in read() 411 */ 412 lastWriter = Thread.currentThread(); 413 try { 414 while (buffer != null && out == in) { 415 // BEGIN android-changed 416 // moved has-last-reader-died check to be before wait() 417 if (lastReader != null && !lastReader.isAlive()) { 418 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 419 } 420 notifyAll(); 421 wait(1000); 422 // END android-changed 423 } 424 } catch (InterruptedException e) { 425 throw new InterruptedIOException(); 426 } 427 if (buffer == null) { 428 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 429 } 430 if (in == -1) { 431 in = 0; 432 } 433 buffer[in++] = (byte) oneByte; 434 if (in == buffer.length) { 435 in = 0; 436 } 437 438 // BEGIN android-added 439 // let blocked readers read the newly available data 440 notifyAll(); 441 // END android-added 442 } 443 444 synchronized void done() { 445 isClosed = true; 446 notifyAll(); 447 } 448} 449