Channels.java revision 1d857a6e70606d1cec7330edfde0e43111d8d7c1
1/* 2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package java.nio.channels; 27 28import java.io.FileInputStream; 29import java.io.FileOutputStream; 30import java.io.InputStream; 31import java.io.OutputStream; 32import java.io.Reader; 33import java.io.Writer; 34import java.io.IOException; 35import java.nio.ByteBuffer; 36import java.nio.charset.Charset; 37import java.nio.charset.CharsetDecoder; 38import java.nio.charset.CharsetEncoder; 39import java.nio.charset.UnsupportedCharsetException; 40import java.nio.channels.spi.AbstractInterruptibleChannel; 41import java.util.concurrent.ExecutionException; 42import sun.nio.ch.ChannelInputStream; 43import sun.nio.cs.StreamDecoder; 44import sun.nio.cs.StreamEncoder; 45 46 47/** 48 * Utility methods for channels and streams. 49 * 50 * <p> This class defines static methods that support the interoperation of the 51 * stream classes of the <tt>{@link java.io}</tt> package with the channel 52 * classes of this package. </p> 53 * 54 * 55 * @author Mark Reinhold 56 * @author Mike McCloskey 57 * @author JSR-51 Expert Group 58 * @since 1.4 59 */ 60 61public final class Channels { 62 63 private Channels() { } // No instantiation 64 65 private static void checkNotNull(Object o, String name) { 66 if (o == null) 67 throw new NullPointerException("\"" + name + "\" is null!"); 68 } 69 70 /** 71 * Write all remaining bytes in buffer to the given channel. 72 * If the channel is selectable then it must be configured blocking. 73 */ 74 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 75 throws IOException 76 { 77 while (bb.remaining() > 0) { 78 int n = ch.write(bb); 79 if (n <= 0) 80 throw new RuntimeException("no bytes written"); 81 } 82 } 83 84 /** 85 * Write all remaining bytes in buffer to the given channel. 86 * 87 * @throws IllegalBlockingException 88 * If the channel is selectable and configured non-blocking. 89 */ 90 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 91 throws IOException 92 { 93 if (ch instanceof SelectableChannel) { 94 SelectableChannel sc = (SelectableChannel)ch; 95 synchronized (sc.blockingLock()) { 96 if (!sc.isBlocking()) 97 throw new IllegalBlockingModeException(); 98 writeFullyImpl(ch, bb); 99 } 100 } else { 101 writeFullyImpl(ch, bb); 102 } 103 } 104 105 // -- Byte streams from channels -- 106 107 /** 108 * Constructs a stream that reads bytes from the given channel. 109 * 110 * <p> The <tt>read</tt> methods of the resulting stream will throw an 111 * {@link IllegalBlockingModeException} if invoked while the underlying 112 * channel is in non-blocking mode. The stream will not be buffered, and 113 * it will not support the {@link InputStream#mark mark} or {@link 114 * InputStream#reset reset} methods. The stream will be safe for access by 115 * multiple concurrent threads. Closing the stream will in turn cause the 116 * channel to be closed. </p> 117 * 118 * @param ch 119 * The channel from which bytes will be read 120 * 121 * @return A new input stream 122 */ 123 public static InputStream newInputStream(ReadableByteChannel ch) { 124 checkNotNull(ch, "ch"); 125 return new sun.nio.ch.ChannelInputStream(ch); 126 } 127 128 /** 129 * Constructs a stream that writes bytes to the given channel. 130 * 131 * <p> The <tt>write</tt> methods of the resulting stream will throw an 132 * {@link IllegalBlockingModeException} if invoked while the underlying 133 * channel is in non-blocking mode. The stream will not be buffered. The 134 * stream will be safe for access by multiple concurrent threads. Closing 135 * the stream will in turn cause the channel to be closed. </p> 136 * 137 * @param ch 138 * The channel to which bytes will be written 139 * 140 * @return A new output stream 141 */ 142 public static OutputStream newOutputStream(final WritableByteChannel ch) { 143 checkNotNull(ch, "ch"); 144 145 return new OutputStream() { 146 147 private ByteBuffer bb = null; 148 private byte[] bs = null; // Invoker's previous array 149 private byte[] b1 = null; 150 151 public synchronized void write(int b) throws IOException { 152 if (b1 == null) 153 b1 = new byte[1]; 154 b1[0] = (byte)b; 155 this.write(b1); 156 } 157 158 public synchronized void write(byte[] bs, int off, int len) 159 throws IOException 160 { 161 if ((off < 0) || (off > bs.length) || (len < 0) || 162 ((off + len) > bs.length) || ((off + len) < 0)) { 163 throw new IndexOutOfBoundsException(); 164 } else if (len == 0) { 165 return; 166 } 167 ByteBuffer bb = ((this.bs == bs) 168 ? this.bb 169 : ByteBuffer.wrap(bs)); 170 bb.limit(Math.min(off + len, bb.capacity())); 171 bb.position(off); 172 this.bb = bb; 173 this.bs = bs; 174 Channels.writeFully(ch, bb); 175 } 176 177 public void close() throws IOException { 178 ch.close(); 179 } 180 181 }; 182 } 183 184 /** 185 * Constructs a stream that reads bytes from the given channel. 186 * 187 * <p> The stream will not be buffered, and it will not support the {@link 188 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 189 * stream will be safe for access by multiple concurrent threads. Closing 190 * the stream will in turn cause the channel to be closed. </p> 191 * 192 * @param ch 193 * The channel from which bytes will be read 194 * 195 * @return A new input stream 196 * 197 * @since 1.7 198 */ 199 public static InputStream newInputStream(final AsynchronousByteChannel ch) { 200 checkNotNull(ch, "ch"); 201 return new InputStream() { 202 203 private ByteBuffer bb = null; 204 private byte[] bs = null; // Invoker's previous array 205 private byte[] b1 = null; 206 207 @Override 208 public synchronized int read() throws IOException { 209 if (b1 == null) 210 b1 = new byte[1]; 211 int n = this.read(b1); 212 if (n == 1) 213 return b1[0] & 0xff; 214 return -1; 215 } 216 217 @Override 218 public synchronized int read(byte[] bs, int off, int len) 219 throws IOException 220 { 221 if ((off < 0) || (off > bs.length) || (len < 0) || 222 ((off + len) > bs.length) || ((off + len) < 0)) { 223 throw new IndexOutOfBoundsException(); 224 } else if (len == 0) 225 return 0; 226 227 ByteBuffer bb = ((this.bs == bs) 228 ? this.bb 229 : ByteBuffer.wrap(bs)); 230 bb.position(off); 231 bb.limit(Math.min(off + len, bb.capacity())); 232 this.bb = bb; 233 this.bs = bs; 234 235 boolean interrupted = false; 236 try { 237 for (;;) { 238 try { 239 return ch.read(bb).get(); 240 } catch (ExecutionException ee) { 241 throw new IOException(ee.getCause()); 242 } catch (InterruptedException ie) { 243 interrupted = true; 244 } 245 } 246 } finally { 247 if (interrupted) 248 Thread.currentThread().interrupt(); 249 } 250 } 251 252 @Override 253 public void close() throws IOException { 254 ch.close(); 255 } 256 }; 257 } 258 259 /** 260 * Constructs a stream that writes bytes to the given channel. 261 * 262 * <p> The stream will not be buffered. The stream will be safe for access 263 * by multiple concurrent threads. Closing the stream will in turn cause 264 * the channel to be closed. </p> 265 * 266 * @param ch 267 * The channel to which bytes will be written 268 * 269 * @return A new output stream 270 * 271 * @since 1.7 272 */ 273 public static OutputStream newOutputStream(final AsynchronousByteChannel ch) { 274 checkNotNull(ch, "ch"); 275 return new OutputStream() { 276 277 private ByteBuffer bb = null; 278 private byte[] bs = null; // Invoker's previous array 279 private byte[] b1 = null; 280 281 @Override 282 public synchronized void write(int b) throws IOException { 283 if (b1 == null) 284 b1 = new byte[1]; 285 b1[0] = (byte)b; 286 this.write(b1); 287 } 288 289 @Override 290 public synchronized void write(byte[] bs, int off, int len) 291 throws IOException 292 { 293 if ((off < 0) || (off > bs.length) || (len < 0) || 294 ((off + len) > bs.length) || ((off + len) < 0)) { 295 throw new IndexOutOfBoundsException(); 296 } else if (len == 0) { 297 return; 298 } 299 ByteBuffer bb = ((this.bs == bs) 300 ? this.bb 301 : ByteBuffer.wrap(bs)); 302 bb.limit(Math.min(off + len, bb.capacity())); 303 bb.position(off); 304 this.bb = bb; 305 this.bs = bs; 306 307 boolean interrupted = false; 308 try { 309 while (bb.remaining() > 0) { 310 try { 311 ch.write(bb).get(); 312 } catch (ExecutionException ee) { 313 throw new IOException(ee.getCause()); 314 } catch (InterruptedException ie) { 315 interrupted = true; 316 } 317 } 318 } finally { 319 if (interrupted) 320 Thread.currentThread().interrupt(); 321 } 322 } 323 324 @Override 325 public void close() throws IOException { 326 ch.close(); 327 } 328 }; 329 } 330 331 332 // -- Channels from streams -- 333 334 /** 335 * Constructs a channel that reads bytes from the given stream. 336 * 337 * <p> The resulting channel will not be buffered; it will simply redirect 338 * its I/O operations to the given stream. Closing the channel will in 339 * turn cause the stream to be closed. </p> 340 * 341 * @param in 342 * The stream from which bytes are to be read 343 * 344 * @return A new readable byte channel 345 */ 346 public static ReadableByteChannel newChannel(final InputStream in) { 347 checkNotNull(in, "in"); 348 349 if (in instanceof FileInputStream && 350 FileInputStream.class.equals(in.getClass())) { 351 return ((FileInputStream)in).getChannel(); 352 } 353 354 return new ReadableByteChannelImpl(in); 355 } 356 357 private static class ReadableByteChannelImpl 358 extends AbstractInterruptibleChannel // Not really interruptible 359 implements ReadableByteChannel 360 { 361 InputStream in; 362 private static final int TRANSFER_SIZE = 8192; 363 private byte buf[] = new byte[0]; 364 private boolean open = true; 365 private Object readLock = new Object(); 366 367 ReadableByteChannelImpl(InputStream in) { 368 this.in = in; 369 } 370 371 public int read(ByteBuffer dst) throws IOException { 372 int len = dst.remaining(); 373 int totalRead = 0; 374 int bytesRead = 0; 375 synchronized (readLock) { 376 while (totalRead < len) { 377 int bytesToRead = Math.min((len - totalRead), 378 TRANSFER_SIZE); 379 if (buf.length < bytesToRead) 380 buf = new byte[bytesToRead]; 381 if ((totalRead > 0) && !(in.available() > 0)) 382 break; // block at most once 383 try { 384 begin(); 385 bytesRead = in.read(buf, 0, bytesToRead); 386 } finally { 387 end(bytesRead > 0); 388 } 389 if (bytesRead < 0) 390 break; 391 else 392 totalRead += bytesRead; 393 dst.put(buf, 0, bytesRead); 394 } 395 if ((bytesRead < 0) && (totalRead == 0)) 396 return -1; 397 398 return totalRead; 399 } 400 } 401 402 protected void implCloseChannel() throws IOException { 403 in.close(); 404 open = false; 405 } 406 } 407 408 409 /** 410 * Constructs a channel that writes bytes to the given stream. 411 * 412 * <p> The resulting channel will not be buffered; it will simply redirect 413 * its I/O operations to the given stream. Closing the channel will in 414 * turn cause the stream to be closed. </p> 415 * 416 * @param out 417 * The stream to which bytes are to be written 418 * 419 * @return A new writable byte channel 420 */ 421 public static WritableByteChannel newChannel(final OutputStream out) { 422 checkNotNull(out, "out"); 423 424 /* ----- BEGIN android ----- 425 if (out instanceof FileOutputStream && 426 FileOutputStream.class.equals(out.getClass())) { 427 return ((FileOutputStream)out).getChannel(); 428 }*/ 429 430 return new WritableByteChannelImpl(out); 431 } 432 433 private static class WritableByteChannelImpl 434 extends AbstractInterruptibleChannel // Not really interruptible 435 implements WritableByteChannel 436 { 437 OutputStream out; 438 private static final int TRANSFER_SIZE = 8192; 439 private byte buf[] = new byte[0]; 440 private boolean open = true; 441 private Object writeLock = new Object(); 442 443 WritableByteChannelImpl(OutputStream out) { 444 this.out = out; 445 } 446 447 public int write(ByteBuffer src) throws IOException { 448 int len = src.remaining(); 449 int totalWritten = 0; 450 synchronized (writeLock) { 451 while (totalWritten < len) { 452 int bytesToWrite = Math.min((len - totalWritten), 453 TRANSFER_SIZE); 454 if (buf.length < bytesToWrite) 455 buf = new byte[bytesToWrite]; 456 src.get(buf, 0, bytesToWrite); 457 try { 458 begin(); 459 out.write(buf, 0, bytesToWrite); 460 } finally { 461 end(bytesToWrite > 0); 462 } 463 totalWritten += bytesToWrite; 464 } 465 return totalWritten; 466 } 467 } 468 469 protected void implCloseChannel() throws IOException { 470 out.close(); 471 open = false; 472 } 473 } 474 475 476 // -- Character streams from channels -- 477 478 /** 479 * Constructs a reader that decodes bytes from the given channel using the 480 * given decoder. 481 * 482 * <p> The resulting stream will contain an internal input buffer of at 483 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 484 * will, as needed, fill the buffer by reading bytes from the underlying 485 * channel; if the channel is in non-blocking mode when bytes are to be 486 * read then an {@link IllegalBlockingModeException} will be thrown. The 487 * resulting stream will not otherwise be buffered, and it will not support 488 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 489 * Closing the stream will in turn cause the channel to be closed. </p> 490 * 491 * @param ch 492 * The channel from which bytes will be read 493 * 494 * @param dec 495 * The charset decoder to be used 496 * 497 * @param minBufferCap 498 * The minimum capacity of the internal byte buffer, 499 * or <tt>-1</tt> if an implementation-dependent 500 * default capacity is to be used 501 * 502 * @return A new reader 503 */ 504 public static Reader newReader(ReadableByteChannel ch, 505 CharsetDecoder dec, 506 int minBufferCap) 507 { 508 checkNotNull(ch, "ch"); 509 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 510 } 511 512 /** 513 * Constructs a reader that decodes bytes from the given channel according 514 * to the named charset. 515 * 516 * <p> An invocation of this method of the form 517 * 518 * <blockquote><pre> 519 * Channels.newReader(ch, csname)</pre></blockquote> 520 * 521 * behaves in exactly the same way as the expression 522 * 523 * <blockquote><pre> 524 * Channels.newReader(ch, 525 * Charset.forName(csName) 526 * .newDecoder(), 527 * -1);</pre></blockquote> 528 * 529 * @param ch 530 * The channel from which bytes will be read 531 * 532 * @param csName 533 * The name of the charset to be used 534 * 535 * @return A new reader 536 * 537 * @throws UnsupportedCharsetException 538 * If no support for the named charset is available 539 * in this instance of the Java virtual machine 540 */ 541 public static Reader newReader(ReadableByteChannel ch, 542 String csName) 543 { 544 checkNotNull(csName, "csName"); 545 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 546 } 547 548 /** 549 * Constructs a writer that encodes characters using the given encoder and 550 * writes the resulting bytes to the given channel. 551 * 552 * <p> The resulting stream will contain an internal output buffer of at 553 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 554 * will, as needed, flush the buffer by writing bytes to the underlying 555 * channel; if the channel is in non-blocking mode when bytes are to be 556 * written then an {@link IllegalBlockingModeException} will be thrown. 557 * The resulting stream will not otherwise be buffered. Closing the stream 558 * will in turn cause the channel to be closed. </p> 559 * 560 * @param ch 561 * The channel to which bytes will be written 562 * 563 * @param enc 564 * The charset encoder to be used 565 * 566 * @param minBufferCap 567 * The minimum capacity of the internal byte buffer, 568 * or <tt>-1</tt> if an implementation-dependent 569 * default capacity is to be used 570 * 571 * @return A new writer 572 */ 573 public static Writer newWriter(final WritableByteChannel ch, 574 final CharsetEncoder enc, 575 final int minBufferCap) 576 { 577 checkNotNull(ch, "ch"); 578 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 579 } 580 581 /** 582 * Constructs a writer that encodes characters according to the named 583 * charset and writes the resulting bytes to the given channel. 584 * 585 * <p> An invocation of this method of the form 586 * 587 * <blockquote><pre> 588 * Channels.newWriter(ch, csname)</pre></blockquote> 589 * 590 * behaves in exactly the same way as the expression 591 * 592 * <blockquote><pre> 593 * Channels.newWriter(ch, 594 * Charset.forName(csName) 595 * .newEncoder(), 596 * -1);</pre></blockquote> 597 * 598 * @param ch 599 * The channel to which bytes will be written 600 * 601 * @param csName 602 * The name of the charset to be used 603 * 604 * @return A new writer 605 * 606 * @throws UnsupportedCharsetException 607 * If no support for the named charset is available 608 * in this instance of the Java virtual machine 609 */ 610 public static Writer newWriter(WritableByteChannel ch, 611 String csName) 612 { 613 checkNotNull(csName, "csName"); 614 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 615 } 616} 617