Channels.java revision 2c87ad3a45cecf9e344487cad1abfdebe79f2c7c
1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27package java.nio.channels; 28 29import java.io.FileInputStream; 30import java.io.FileOutputStream; 31import java.io.InputStream; 32import java.io.OutputStream; 33import java.io.Reader; 34import java.io.Writer; 35import java.io.IOException; 36import java.nio.ByteBuffer; 37import java.nio.charset.Charset; 38import java.nio.charset.CharsetDecoder; 39import java.nio.charset.CharsetEncoder; 40import java.nio.charset.UnsupportedCharsetException; 41import java.nio.channels.spi.AbstractInterruptibleChannel; 42import java.util.concurrent.ExecutionException; 43import sun.nio.ch.ChannelInputStream; 44import sun.nio.cs.StreamDecoder; 45import sun.nio.cs.StreamEncoder; 46 47 48/** 49 * Utility methods for channels and streams. 50 * 51 * <p> This class defines static methods that support the interoperation of the 52 * stream classes of the <tt>{@link java.io}</tt> package with the channel 53 * classes of this package. </p> 54 * 55 * 56 * @author Mark Reinhold 57 * @author Mike McCloskey 58 * @author JSR-51 Expert Group 59 * @since 1.4 60 */ 61 62public final class Channels { 63 64 private Channels() { } // No instantiation 65 66 private static void checkNotNull(Object o, String name) { 67 if (o == null) 68 throw new NullPointerException("\"" + name + "\" is null!"); 69 } 70 71 /** 72 * Write all remaining bytes in buffer to the given channel. 73 * If the channel is selectable then it must be configured blocking. 74 */ 75 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 76 throws IOException 77 { 78 while (bb.remaining() > 0) { 79 int n = ch.write(bb); 80 if (n <= 0) 81 throw new RuntimeException("no bytes written"); 82 } 83 } 84 85 /** 86 * Write all remaining bytes in buffer to the given channel. 87 * 88 * @throws IllegalBlockingException 89 * If the channel is selectable and configured non-blocking. 90 */ 91 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 92 throws IOException 93 { 94 if (ch instanceof SelectableChannel) { 95 SelectableChannel sc = (SelectableChannel)ch; 96 synchronized (sc.blockingLock()) { 97 if (!sc.isBlocking()) 98 throw new IllegalBlockingModeException(); 99 writeFullyImpl(ch, bb); 100 } 101 } else { 102 writeFullyImpl(ch, bb); 103 } 104 } 105 106 // -- Byte streams from channels -- 107 108 /** 109 * Constructs a stream that reads bytes from the given channel. 110 * 111 * <p> The <tt>read</tt> methods of the resulting stream will throw an 112 * {@link IllegalBlockingModeException} if invoked while the underlying 113 * channel is in non-blocking mode. The stream will not be buffered, and 114 * it will not support the {@link InputStream#mark mark} or {@link 115 * InputStream#reset reset} methods. The stream will be safe for access by 116 * multiple concurrent threads. Closing the stream will in turn cause the 117 * channel to be closed. </p> 118 * 119 * @param ch 120 * The channel from which bytes will be read 121 * 122 * @return A new input stream 123 */ 124 public static InputStream newInputStream(ReadableByteChannel ch) { 125 checkNotNull(ch, "ch"); 126 return new sun.nio.ch.ChannelInputStream(ch); 127 } 128 129 /** 130 * Constructs a stream that writes bytes to the given channel. 131 * 132 * <p> The <tt>write</tt> methods of the resulting stream will throw an 133 * {@link IllegalBlockingModeException} if invoked while the underlying 134 * channel is in non-blocking mode. The stream will not be buffered. The 135 * stream will be safe for access by multiple concurrent threads. Closing 136 * the stream will in turn cause the channel to be closed. </p> 137 * 138 * @param ch 139 * The channel to which bytes will be written 140 * 141 * @return A new output stream 142 */ 143 public static OutputStream newOutputStream(final WritableByteChannel ch) { 144 checkNotNull(ch, "ch"); 145 146 return new OutputStream() { 147 148 private ByteBuffer bb = null; 149 private byte[] bs = null; // Invoker's previous array 150 private byte[] b1 = null; 151 152 public synchronized void write(int b) throws IOException { 153 if (b1 == null) 154 b1 = new byte[1]; 155 b1[0] = (byte)b; 156 this.write(b1); 157 } 158 159 public synchronized void write(byte[] bs, int off, int len) 160 throws IOException 161 { 162 if ((off < 0) || (off > bs.length) || (len < 0) || 163 ((off + len) > bs.length) || ((off + len) < 0)) { 164 throw new IndexOutOfBoundsException(); 165 } else if (len == 0) { 166 return; 167 } 168 ByteBuffer bb = ((this.bs == bs) 169 ? this.bb 170 : ByteBuffer.wrap(bs)); 171 bb.limit(Math.min(off + len, bb.capacity())); 172 bb.position(off); 173 this.bb = bb; 174 this.bs = bs; 175 Channels.writeFully(ch, bb); 176 } 177 178 public void close() throws IOException { 179 ch.close(); 180 } 181 182 }; 183 } 184 185 /** 186 * Constructs a stream that reads bytes from the given channel. 187 * 188 * <p> The stream will not be buffered, and it will not support the {@link 189 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 190 * stream will be safe for access by multiple concurrent threads. Closing 191 * the stream will in turn cause the channel to be closed. </p> 192 * 193 * @param ch 194 * The channel from which bytes will be read 195 * 196 * @return A new input stream 197 * 198 * @since 1.7 199 */ 200 public static InputStream newInputStream(final AsynchronousByteChannel ch) { 201 checkNotNull(ch, "ch"); 202 return new InputStream() { 203 204 private ByteBuffer bb = null; 205 private byte[] bs = null; // Invoker's previous array 206 private byte[] b1 = null; 207 208 @Override 209 public synchronized int read() throws IOException { 210 if (b1 == null) 211 b1 = new byte[1]; 212 int n = this.read(b1); 213 if (n == 1) 214 return b1[0] & 0xff; 215 return -1; 216 } 217 218 @Override 219 public synchronized int read(byte[] bs, int off, int len) 220 throws IOException 221 { 222 if ((off < 0) || (off > bs.length) || (len < 0) || 223 ((off + len) > bs.length) || ((off + len) < 0)) { 224 throw new IndexOutOfBoundsException(); 225 } else if (len == 0) 226 return 0; 227 228 ByteBuffer bb = ((this.bs == bs) 229 ? this.bb 230 : ByteBuffer.wrap(bs)); 231 bb.position(off); 232 bb.limit(Math.min(off + len, bb.capacity())); 233 this.bb = bb; 234 this.bs = bs; 235 236 boolean interrupted = false; 237 try { 238 for (;;) { 239 try { 240 return ch.read(bb).get(); 241 } catch (ExecutionException ee) { 242 throw new IOException(ee.getCause()); 243 } catch (InterruptedException ie) { 244 interrupted = true; 245 } 246 } 247 } finally { 248 if (interrupted) 249 Thread.currentThread().interrupt(); 250 } 251 } 252 253 @Override 254 public void close() throws IOException { 255 ch.close(); 256 } 257 }; 258 } 259 260 /** 261 * Constructs a stream that writes bytes to the given channel. 262 * 263 * <p> The stream will not be buffered. The stream will be safe for access 264 * by multiple concurrent threads. Closing the stream will in turn cause 265 * the channel to be closed. </p> 266 * 267 * @param ch 268 * The channel to which bytes will be written 269 * 270 * @return A new output stream 271 * 272 * @since 1.7 273 */ 274 public static OutputStream newOutputStream(final AsynchronousByteChannel ch) { 275 checkNotNull(ch, "ch"); 276 return new OutputStream() { 277 278 private ByteBuffer bb = null; 279 private byte[] bs = null; // Invoker's previous array 280 private byte[] b1 = null; 281 282 @Override 283 public synchronized void write(int b) throws IOException { 284 if (b1 == null) 285 b1 = new byte[1]; 286 b1[0] = (byte)b; 287 this.write(b1); 288 } 289 290 @Override 291 public synchronized void write(byte[] bs, int off, int len) 292 throws IOException 293 { 294 if ((off < 0) || (off > bs.length) || (len < 0) || 295 ((off + len) > bs.length) || ((off + len) < 0)) { 296 throw new IndexOutOfBoundsException(); 297 } else if (len == 0) { 298 return; 299 } 300 ByteBuffer bb = ((this.bs == bs) 301 ? this.bb 302 : ByteBuffer.wrap(bs)); 303 bb.limit(Math.min(off + len, bb.capacity())); 304 bb.position(off); 305 this.bb = bb; 306 this.bs = bs; 307 308 boolean interrupted = false; 309 try { 310 while (bb.remaining() > 0) { 311 try { 312 ch.write(bb).get(); 313 } catch (ExecutionException ee) { 314 throw new IOException(ee.getCause()); 315 } catch (InterruptedException ie) { 316 interrupted = true; 317 } 318 } 319 } finally { 320 if (interrupted) 321 Thread.currentThread().interrupt(); 322 } 323 } 324 325 @Override 326 public void close() throws IOException { 327 ch.close(); 328 } 329 }; 330 } 331 332 333 // -- Channels from streams -- 334 335 /** 336 * Constructs a channel that reads bytes from the given stream. 337 * 338 * <p> The resulting channel will not be buffered; it will simply redirect 339 * its I/O operations to the given stream. Closing the channel will in 340 * turn cause the stream to be closed. </p> 341 * 342 * @param in 343 * The stream from which bytes are to be read 344 * 345 * @return A new readable byte channel 346 */ 347 public static ReadableByteChannel newChannel(final InputStream in) { 348 checkNotNull(in, "in"); 349 350 if (in instanceof FileInputStream && 351 FileInputStream.class.equals(in.getClass())) { 352 return ((FileInputStream)in).getChannel(); 353 } 354 355 return new ReadableByteChannelImpl(in); 356 } 357 358 private static class ReadableByteChannelImpl 359 extends AbstractInterruptibleChannel // Not really interruptible 360 implements ReadableByteChannel 361 { 362 InputStream in; 363 private static final int TRANSFER_SIZE = 8192; 364 private byte buf[] = new byte[0]; 365 private boolean open = true; 366 private Object readLock = new Object(); 367 368 ReadableByteChannelImpl(InputStream in) { 369 this.in = in; 370 } 371 372 public int read(ByteBuffer dst) throws IOException { 373 int len = dst.remaining(); 374 int totalRead = 0; 375 int bytesRead = 0; 376 synchronized (readLock) { 377 while (totalRead < len) { 378 int bytesToRead = Math.min((len - totalRead), 379 TRANSFER_SIZE); 380 if (buf.length < bytesToRead) 381 buf = new byte[bytesToRead]; 382 if ((totalRead > 0) && !(in.available() > 0)) 383 break; // block at most once 384 try { 385 begin(); 386 bytesRead = in.read(buf, 0, bytesToRead); 387 } finally { 388 end(bytesRead > 0); 389 } 390 if (bytesRead < 0) 391 break; 392 else 393 totalRead += bytesRead; 394 dst.put(buf, 0, bytesRead); 395 } 396 if ((bytesRead < 0) && (totalRead == 0)) 397 return -1; 398 399 return totalRead; 400 } 401 } 402 403 protected void implCloseChannel() throws IOException { 404 in.close(); 405 open = false; 406 } 407 } 408 409 410 /** 411 * Constructs a channel that writes bytes to the given stream. 412 * 413 * <p> The resulting channel will not be buffered; it will simply redirect 414 * its I/O operations to the given stream. Closing the channel will in 415 * turn cause the stream to be closed. </p> 416 * 417 * @param out 418 * The stream to which bytes are to be written 419 * 420 * @return A new writable byte channel 421 */ 422 public static WritableByteChannel newChannel(final OutputStream out) { 423 checkNotNull(out, "out"); 424 425 /* ----- BEGIN android ----- 426 if (out instanceof FileOutputStream && 427 FileOutputStream.class.equals(out.getClass())) { 428 return ((FileOutputStream)out).getChannel(); 429 }*/ 430 431 return new WritableByteChannelImpl(out); 432 } 433 434 private static class WritableByteChannelImpl 435 extends AbstractInterruptibleChannel // Not really interruptible 436 implements WritableByteChannel 437 { 438 OutputStream out; 439 private static final int TRANSFER_SIZE = 8192; 440 private byte buf[] = new byte[0]; 441 private boolean open = true; 442 private Object writeLock = new Object(); 443 444 WritableByteChannelImpl(OutputStream out) { 445 this.out = out; 446 } 447 448 public int write(ByteBuffer src) throws IOException { 449 int len = src.remaining(); 450 int totalWritten = 0; 451 synchronized (writeLock) { 452 while (totalWritten < len) { 453 int bytesToWrite = Math.min((len - totalWritten), 454 TRANSFER_SIZE); 455 if (buf.length < bytesToWrite) 456 buf = new byte[bytesToWrite]; 457 src.get(buf, 0, bytesToWrite); 458 try { 459 begin(); 460 out.write(buf, 0, bytesToWrite); 461 } finally { 462 end(bytesToWrite > 0); 463 } 464 totalWritten += bytesToWrite; 465 } 466 return totalWritten; 467 } 468 } 469 470 protected void implCloseChannel() throws IOException { 471 out.close(); 472 open = false; 473 } 474 } 475 476 477 // -- Character streams from channels -- 478 479 /** 480 * Constructs a reader that decodes bytes from the given channel using the 481 * given decoder. 482 * 483 * <p> The resulting stream will contain an internal input buffer of at 484 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 485 * will, as needed, fill the buffer by reading bytes from the underlying 486 * channel; if the channel is in non-blocking mode when bytes are to be 487 * read then an {@link IllegalBlockingModeException} will be thrown. The 488 * resulting stream will not otherwise be buffered, and it will not support 489 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 490 * Closing the stream will in turn cause the channel to be closed. </p> 491 * 492 * @param ch 493 * The channel from which bytes will be read 494 * 495 * @param dec 496 * The charset decoder to be used 497 * 498 * @param minBufferCap 499 * The minimum capacity of the internal byte buffer, 500 * or <tt>-1</tt> if an implementation-dependent 501 * default capacity is to be used 502 * 503 * @return A new reader 504 */ 505 public static Reader newReader(ReadableByteChannel ch, 506 CharsetDecoder dec, 507 int minBufferCap) 508 { 509 checkNotNull(ch, "ch"); 510 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 511 } 512 513 /** 514 * Constructs a reader that decodes bytes from the given channel according 515 * to the named charset. 516 * 517 * <p> An invocation of this method of the form 518 * 519 * <blockquote><pre> 520 * Channels.newReader(ch, csname)</pre></blockquote> 521 * 522 * behaves in exactly the same way as the expression 523 * 524 * <blockquote><pre> 525 * Channels.newReader(ch, 526 * Charset.forName(csName) 527 * .newDecoder(), 528 * -1);</pre></blockquote> 529 * 530 * @param ch 531 * The channel from which bytes will be read 532 * 533 * @param csName 534 * The name of the charset to be used 535 * 536 * @return A new reader 537 * 538 * @throws UnsupportedCharsetException 539 * If no support for the named charset is available 540 * in this instance of the Java virtual machine 541 */ 542 public static Reader newReader(ReadableByteChannel ch, 543 String csName) 544 { 545 checkNotNull(csName, "csName"); 546 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 547 } 548 549 /** 550 * Constructs a writer that encodes characters using the given encoder and 551 * writes the resulting bytes to the given channel. 552 * 553 * <p> The resulting stream will contain an internal output buffer of at 554 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 555 * will, as needed, flush the buffer by writing bytes to the underlying 556 * channel; if the channel is in non-blocking mode when bytes are to be 557 * written then an {@link IllegalBlockingModeException} will be thrown. 558 * The resulting stream will not otherwise be buffered. Closing the stream 559 * will in turn cause the channel to be closed. </p> 560 * 561 * @param ch 562 * The channel to which bytes will be written 563 * 564 * @param enc 565 * The charset encoder to be used 566 * 567 * @param minBufferCap 568 * The minimum capacity of the internal byte buffer, 569 * or <tt>-1</tt> if an implementation-dependent 570 * default capacity is to be used 571 * 572 * @return A new writer 573 */ 574 public static Writer newWriter(final WritableByteChannel ch, 575 final CharsetEncoder enc, 576 final int minBufferCap) 577 { 578 checkNotNull(ch, "ch"); 579 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 580 } 581 582 /** 583 * Constructs a writer that encodes characters according to the named 584 * charset and writes the resulting bytes to the given channel. 585 * 586 * <p> An invocation of this method of the form 587 * 588 * <blockquote><pre> 589 * Channels.newWriter(ch, csname)</pre></blockquote> 590 * 591 * behaves in exactly the same way as the expression 592 * 593 * <blockquote><pre> 594 * Channels.newWriter(ch, 595 * Charset.forName(csName) 596 * .newEncoder(), 597 * -1);</pre></blockquote> 598 * 599 * @param ch 600 * The channel to which bytes will be written 601 * 602 * @param csName 603 * The name of the charset to be used 604 * 605 * @return A new writer 606 * 607 * @throws UnsupportedCharsetException 608 * If no support for the named charset is available 609 * in this instance of the Java virtual machine 610 */ 611 public static Writer newWriter(WritableByteChannel ch, 612 String csName) 613 { 614 checkNotNull(csName, "csName"); 615 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 616 } 617} 618