1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.io; 19 20// BEGIN android-note 21// We've made several changes including: 22// - throw an IOException when a pipe is closed during a write 23// - fix shallow concurrency problems, always lock on 'this' 24// - improved consistency with PipedInputStream 25// END android-note 26 27import org.apache.harmony.luni.util.Msg; 28 29/** 30 * Receives information on a communications pipe. When two threads want to pass 31 * data back and forth, one creates a piped writer and the other creates a piped 32 * reader. 33 * 34 * @see PipedWriter 35 */ 36public class PipedReader extends Reader { 37 38 private Thread lastReader; 39 40 private Thread lastWriter; 41 42 private boolean isClosed; 43 44 /** 45 * The circular buffer through which data is passed. Data is read from the 46 * range {@code [out, in)} and written to the range {@code [in, out)}. 47 * Data in the buffer is either sequential: <pre> 48 * { - - - X X X X X X X - - - - - } 49 * ^ ^ 50 * | | 51 * out in</pre> 52 * ...or wrapped around the buffer's end: <pre> 53 * { X X X X - - - - - - - - X X X } 54 * ^ ^ 55 * | | 56 * in out</pre> 57 * When the buffer is empty, {@code in == -1}. Reading when the buffer is 58 * empty will block until data is available. When the buffer is full, 59 * {@code in == out}. Writing when the buffer is full will block until free 60 * space is available. 61 */ 62 private char[] buffer; 63 64 /** 65 * The index in {@code buffer} where the next character will be written. 66 */ 67 private int in = -1; 68 69 /** 70 * The index in {@code buffer} where the next character will be read. 71 */ 72 private int out; 73 74 /** 75 * The size of the default pipe in characters 76 */ 77 private static final int PIPE_SIZE = 1024; 78 79 /** 80 * Indicates if this pipe is connected 81 */ 82 boolean isConnected; 83 84 /** 85 * Constructs a new unconnected {@code PipedReader}. The resulting reader 86 * must be connected to a {@code PipedWriter} before data may be read from 87 * it. 88 */ 89 public PipedReader() {} 90 91 /** 92 * Constructs a new {@code PipedReader} connected to the {@link PipedWriter} 93 * {@code out}. Any data written to the writer can be read from the this 94 * reader. 95 * 96 * @param out 97 * the {@code PipedWriter} to connect to. 98 * @throws IOException 99 * if {@code out} is already connected. 100 */ 101 public PipedReader(PipedWriter out) throws IOException { 102 connect(out); 103 } 104 105 /** 106 * Closes this reader. This implementation releases the buffer used for 107 * the pipe and notifies all threads waiting to read or write. 108 * 109 * @throws IOException 110 * if an error occurs while closing this reader. 111 */ 112 @Override 113 public synchronized void close() throws IOException { 114 buffer = null; 115 notifyAll(); 116 } 117 118 /** 119 * Connects this {@code PipedReader} to a {@link PipedWriter}. Any data 120 * written to the writer becomes readable in this reader. 121 * 122 * @param src 123 * the writer to connect to. 124 * @throws IOException 125 * if this reader is closed or already connected, or if {@code 126 * src} is already connected. 127 */ 128 public void connect(PipedWriter src) throws IOException { 129 src.connect(this); 130 } 131 132 /** 133 * Establishes the connection to the PipedWriter. 134 * 135 * @throws IOException 136 * If this Reader is already connected. 137 */ 138 synchronized void establishConnection() throws IOException { 139 if (isConnected) { 140 throw new IOException(Msg.getString("K007a")); //$NON-NLS-1$ 141 } 142 buffer = new char[PIPE_SIZE]; 143 isConnected = true; 144 } 145 146 /** 147 * Reads a single character from this reader and returns it as an integer 148 * with the two higher-order bytes set to 0. Returns -1 if the end of the 149 * reader has been reached. If there is no data in the pipe, this method 150 * blocks until data is available, the end of the reader is detected or an 151 * exception is thrown. 152 * <p> 153 * Separate threads should be used to read from a {@code PipedReader} and to 154 * write to the connected {@link PipedWriter}. If the same thread is used, 155 * a deadlock may occur. 156 * 157 * @return the character read or -1 if the end of the reader has been 158 * reached. 159 * @throws IOException 160 * if this reader is closed or some other I/O error occurs. 161 */ 162 @Override 163 public int read() throws IOException { 164 char[] carray = new char[1]; 165 int result = read(carray, 0, 1); 166 return result != -1 ? carray[0] : result; 167 } 168 169 /** 170 * Reads at most {@code count} characters from this reader and stores them 171 * in the character array {@code buffer} starting at {@code offset}. If 172 * there is no data in the pipe, this method blocks until at least one byte 173 * has been read, the end of the reader is detected or an exception is 174 * thrown. 175 * <p> 176 * Separate threads should be used to read from a {@code PipedReader} and to 177 * write to the connected {@link PipedWriter}. If the same thread is used, a 178 * deadlock may occur. 179 * 180 * @param buffer 181 * the character array in which to store the characters read. 182 * @param offset 183 * the initial position in {@code bytes} to store the characters 184 * read from this reader. 185 * @param count 186 * the maximum number of characters to store in {@code buffer}. 187 * @return the number of characters read or -1 if the end of the reader has 188 * been reached. 189 * @throws IndexOutOfBoundsException 190 * if {@code offset < 0} or {@code count < 0}, or if {@code 191 * offset + count} is greater than the size of {@code buffer}. 192 * @throws InterruptedIOException 193 * if the thread reading from this reader is interrupted. 194 * @throws IOException 195 * if this reader is closed or not connected to a writer, or if 196 * the thread writing to the connected writer is no longer 197 * alive. 198 */ 199 @Override 200 public synchronized int read(char[] buffer, int offset, int count) throws IOException { 201 if (!isConnected) { 202 throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$ 203 } 204 if (this.buffer == null) { 205 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 206 } 207 // avoid int overflow 208 // BEGIN android-changed 209 // Exception priorities (in case of multiple errors) differ from 210 // RI, but are spec-compliant. 211 // made implicit null check explicit, 212 // used (offset | count) < 0 instead of (offset < 0) || (count < 0) 213 // to safe one operation 214 if (buffer == null) { 215 throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$ 216 } 217 if ((offset | count) < 0 || count > buffer.length - offset) { 218 throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$ 219 } 220 // END android-changed 221 if (count == 0) { 222 return 0; 223 } 224 /** 225 * Set the last thread to be reading on this PipedReader. If 226 * lastReader dies while someone is waiting to write an IOException 227 * of "Pipe broken" will be thrown in receive() 228 */ 229 lastReader = Thread.currentThread(); 230 try { 231 boolean first = true; 232 while (in == -1) { 233 // Are we at end of stream? 234 if (isClosed) { 235 return -1; 236 } 237 if (!first && lastWriter != null && !lastWriter.isAlive()) { 238 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 239 } 240 first = false; 241 // Notify callers of receive() 242 notifyAll(); 243 wait(1000); 244 } 245 } catch (InterruptedException e) { 246 throw new InterruptedIOException(); 247 } 248 249 int copyLength = 0; 250 /* Copy chars from out to end of buffer first */ 251 if (out >= in) { 252 copyLength = count > this.buffer.length - out ? this.buffer.length - out 253 : count; 254 System.arraycopy(this.buffer, out, buffer, offset, copyLength); 255 out += copyLength; 256 if (out == this.buffer.length) { 257 out = 0; 258 } 259 if (out == in) { 260 // empty buffer 261 in = -1; 262 out = 0; 263 } 264 } 265 266 /* 267 * Did the read fully succeed in the previous copy or is the buffer 268 * empty? 269 */ 270 if (copyLength == count || in == -1) { 271 return copyLength; 272 } 273 274 int charsCopied = copyLength; 275 /* Copy bytes from 0 to the number of available bytes */ 276 copyLength = in - out > count - copyLength ? count - copyLength 277 : in - out; 278 System.arraycopy(this.buffer, out, buffer, offset + charsCopied, 279 copyLength); 280 out += copyLength; 281 if (out == in) { 282 // empty buffer 283 in = -1; 284 out = 0; 285 } 286 return charsCopied + copyLength; 287 } 288 289 /** 290 * Indicates whether this reader is ready to be read without blocking. 291 * Returns {@code true} if this reader will not block when {@code read} is 292 * called, {@code false} if unknown or blocking will occur. This 293 * implementation returns {@code true} if the internal buffer contains 294 * characters that can be read. 295 * 296 * @return always {@code false}. 297 * @throws IOException 298 * if this reader is closed or not connected, or if some other 299 * I/O error occurs. 300 * @see #read() 301 * @see #read(char[], int, int) 302 */ 303 @Override 304 public synchronized boolean ready() throws IOException { 305 if (!isConnected) { 306 throw new IOException(Msg.getString("K007b")); //$NON-NLS-1$ 307 } 308 if (buffer == null) { 309 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 310 } 311 return in != -1; 312 } 313 314 /** 315 * Receives a char and stores it into the PipedReader. This called by 316 * PipedWriter.write() when writes occur. 317 * <P> 318 * If the buffer is full and the thread sending #receive is interrupted, the 319 * InterruptedIOException will be thrown. 320 * 321 * @param oneChar 322 * the char to store into the pipe. 323 * 324 * @throws IOException 325 * If the stream is already closed or another IOException 326 * occurs. 327 */ 328 synchronized void receive(char oneChar) throws IOException { 329 if (buffer == null) { 330 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 331 } 332 if (lastReader != null && !lastReader.isAlive()) { 333 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 334 } 335 /* 336 * Set the last thread to be writing on this PipedWriter. If 337 * lastWriter dies while someone is waiting to read an IOException 338 * of "Pipe broken" will be thrown in read() 339 */ 340 lastWriter = Thread.currentThread(); 341 try { 342 while (buffer != null && out == in) { 343 notifyAll(); 344 // BEGIN android-changed 345 wait(1000); 346 // END android-changed 347 if (lastReader != null && !lastReader.isAlive()) { 348 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 349 } 350 } 351 } catch (InterruptedException e) { 352 throw new InterruptedIOException(); 353 } 354 if (buffer == null) { 355 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 356 } 357 if (in == -1) { 358 in = 0; 359 } 360 buffer[in++] = oneChar; 361 if (in == buffer.length) { 362 in = 0; 363 } 364 } 365 366 /** 367 * Receives a char array and stores it into the PipedReader. This called by 368 * PipedWriter.write() when writes occur. 369 * <P> 370 * If the buffer is full and the thread sending #receive is interrupted, the 371 * InterruptedIOException will be thrown. 372 * 373 * @param chars 374 * the char array to store into the pipe. 375 * @param offset 376 * offset to start reading from 377 * @param count 378 * total characters to read 379 * 380 * @throws IOException 381 * If the stream is already closed or another IOException 382 * occurs. 383 */ 384 synchronized void receive(char[] chars, int offset, int count) throws IOException { 385 if (chars == null) { 386 throw new NullPointerException(Msg.getString("K0047")); //$NON-NLS-1$ 387 } 388 if ((offset | count) < 0 || count > chars.length - offset) { 389 throw new IndexOutOfBoundsException(Msg.getString("K002f")); //$NON-NLS-1$ 390 } 391 if (buffer == null) { 392 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 393 } 394 if (lastReader != null && !lastReader.isAlive()) { 395 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 396 } 397 /** 398 * Set the last thread to be writing on this PipedWriter. If 399 * lastWriter dies while someone is waiting to read an IOException 400 * of "Pipe broken" will be thrown in read() 401 */ 402 lastWriter = Thread.currentThread(); 403 while (count > 0) { 404 try { 405 while (buffer != null && out == in) { 406 notifyAll(); 407 // BEGIN android-changed 408 wait(1000); 409 // END android-changed 410 if (lastReader != null && !lastReader.isAlive()) { 411 throw new IOException(Msg.getString("K0076")); //$NON-NLS-1$ 412 } 413 } 414 } catch (InterruptedException e) { 415 throw new InterruptedIOException(); 416 } 417 if (buffer == null) { 418 throw new IOException(Msg.getString("K0078")); //$NON-NLS-1$ 419 } 420 if (in == -1) { 421 in = 0; 422 } 423 if (in >= out) { 424 int length = buffer.length - in; 425 if (count < length) { 426 length = count; 427 } 428 System.arraycopy(chars, offset, buffer, in, length); 429 offset += length; 430 count -= length; 431 in += length; 432 if (in == buffer.length) { 433 in = 0; 434 } 435 } 436 if (count > 0 && in != out) { 437 int length = out - in; 438 if (count < length) { 439 length = count; 440 } 441 System.arraycopy(chars, offset, buffer, in, length); 442 offset += length; 443 count -= length; 444 in += length; 445 } 446 } 447 } 448 449 synchronized void done() { 450 isClosed = true; 451 notifyAll(); 452 } 453} 454