1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27package java.nio.channels; 28 29import java.io.FileInputStream; 30import java.io.FileOutputStream; 31import java.io.InputStream; 32import java.io.OutputStream; 33import java.io.Reader; 34import java.io.Writer; 35import java.io.IOException; 36import java.nio.ByteBuffer; 37import java.nio.charset.Charset; 38import java.nio.charset.CharsetDecoder; 39import java.nio.charset.CharsetEncoder; 40import java.nio.charset.UnsupportedCharsetException; 41import java.nio.channels.spi.AbstractInterruptibleChannel; 42import java.util.concurrent.ExecutionException; 43import sun.nio.ch.ChannelInputStream; 44import sun.nio.cs.StreamDecoder; 45import sun.nio.cs.StreamEncoder; 46 47 48/** 49 * Utility methods for channels and streams. 50 * 51 * <p> This class defines static methods that support the interoperation of the 52 * stream classes of the <tt>{@link java.io}</tt> package with the channel 53 * classes of this package. </p> 54 * 55 * 56 * @author Mark Reinhold 57 * @author Mike McCloskey 58 * @author JSR-51 Expert Group 59 * @since 1.4 60 */ 61 62public final class Channels { 63 64 private Channels() { } // No instantiation 65 66 private static void checkNotNull(Object o, String name) { 67 if (o == null) 68 throw new NullPointerException("\"" + name + "\" is null!"); 69 } 70 71 /** 72 * Write all remaining bytes in buffer to the given channel. 73 * If the channel is selectable then it must be configured blocking. 74 */ 75 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 76 throws IOException 77 { 78 while (bb.remaining() > 0) { 79 int n = ch.write(bb); 80 if (n <= 0) 81 throw new RuntimeException("no bytes written"); 82 } 83 } 84 85 /** 86 * Write all remaining bytes in buffer to the given channel. 87 * 88 * @throws IllegalBlockingModeException 89 * If the channel is selectable and configured non-blocking. 90 */ 91 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 92 throws IOException 93 { 94 if (ch instanceof SelectableChannel) { 95 SelectableChannel sc = (SelectableChannel)ch; 96 synchronized (sc.blockingLock()) { 97 if (!sc.isBlocking()) 98 throw new IllegalBlockingModeException(); 99 writeFullyImpl(ch, bb); 100 } 101 } else { 102 writeFullyImpl(ch, bb); 103 } 104 } 105 106 // -- Byte streams from channels -- 107 108 /** 109 * Constructs a stream that reads bytes from the given channel. 110 * 111 * <p> The <tt>read</tt> methods of the resulting stream will throw an 112 * {@link IllegalBlockingModeException} if invoked while the underlying 113 * channel is in non-blocking mode. The stream will not be buffered, and 114 * it will not support the {@link InputStream#mark mark} or {@link 115 * InputStream#reset reset} methods. The stream will be safe for access by 116 * multiple concurrent threads. Closing the stream will in turn cause the 117 * channel to be closed. </p> 118 * 119 * @param ch 120 * The channel from which bytes will be read 121 * 122 * @return A new input stream 123 */ 124 public static InputStream newInputStream(ReadableByteChannel ch) { 125 checkNotNull(ch, "ch"); 126 return new sun.nio.ch.ChannelInputStream(ch); 127 } 128 129 /** 130 * Constructs a stream that writes bytes to the given channel. 131 * 132 * <p> The <tt>write</tt> methods of the resulting stream will throw an 133 * {@link IllegalBlockingModeException} if invoked while the underlying 134 * channel is in non-blocking mode. The stream will not be buffered. The 135 * stream will be safe for access by multiple concurrent threads. Closing 136 * the stream will in turn cause the channel to be closed. </p> 137 * 138 * @param ch 139 * The channel to which bytes will be written 140 * 141 * @return A new output stream 142 */ 143 public static OutputStream newOutputStream(final WritableByteChannel ch) { 144 checkNotNull(ch, "ch"); 145 146 return new OutputStream() { 147 148 private ByteBuffer bb = null; 149 private byte[] bs = null; // Invoker's previous array 150 private byte[] b1 = null; 151 152 public synchronized void write(int b) throws IOException { 153 if (b1 == null) 154 b1 = new byte[1]; 155 b1[0] = (byte)b; 156 this.write(b1); 157 } 158 159 public synchronized void write(byte[] bs, int off, int len) 160 throws IOException 161 { 162 if ((off < 0) || (off > bs.length) || (len < 0) || 163 ((off + len) > bs.length) || ((off + len) < 0)) { 164 throw new IndexOutOfBoundsException(); 165 } else if (len == 0) { 166 return; 167 } 168 ByteBuffer bb = ((this.bs == bs) 169 ? this.bb 170 : ByteBuffer.wrap(bs)); 171 bb.limit(Math.min(off + len, bb.capacity())); 172 bb.position(off); 173 this.bb = bb; 174 this.bs = bs; 175 Channels.writeFully(ch, bb); 176 } 177 178 public void close() throws IOException { 179 ch.close(); 180 } 181 182 }; 183 } 184 185 /** 186 * Constructs a stream that reads bytes from the given channel. 187 * 188 * <p> The stream will not be buffered, and it will not support the {@link 189 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 190 * stream will be safe for access by multiple concurrent threads. Closing 191 * the stream will in turn cause the channel to be closed. </p> 192 * 193 * @param ch 194 * The channel from which bytes will be read 195 * 196 * @return A new input stream 197 * 198 * @since 1.7 199 */ 200 public static InputStream newInputStream(final AsynchronousByteChannel ch) { 201 checkNotNull(ch, "ch"); 202 return new InputStream() { 203 204 private ByteBuffer bb = null; 205 private byte[] bs = null; // Invoker's previous array 206 private byte[] b1 = null; 207 208 @Override 209 public synchronized int read() throws IOException { 210 if (b1 == null) 211 b1 = new byte[1]; 212 int n = this.read(b1); 213 if (n == 1) 214 return b1[0] & 0xff; 215 return -1; 216 } 217 218 @Override 219 public synchronized int read(byte[] bs, int off, int len) 220 throws IOException 221 { 222 if ((off < 0) || (off > bs.length) || (len < 0) || 223 ((off + len) > bs.length) || ((off + len) < 0)) { 224 throw new IndexOutOfBoundsException(); 225 } else if (len == 0) 226 return 0; 227 228 ByteBuffer bb = ((this.bs == bs) 229 ? this.bb 230 : ByteBuffer.wrap(bs)); 231 bb.position(off); 232 bb.limit(Math.min(off + len, bb.capacity())); 233 this.bb = bb; 234 this.bs = bs; 235 236 boolean interrupted = false; 237 try { 238 for (;;) { 239 try { 240 return ch.read(bb).get(); 241 } catch (ExecutionException ee) { 242 throw new IOException(ee.getCause()); 243 } catch (InterruptedException ie) { 244 interrupted = true; 245 } 246 } 247 } finally { 248 if (interrupted) 249 Thread.currentThread().interrupt(); 250 } 251 } 252 253 @Override 254 public void close() throws IOException { 255 ch.close(); 256 } 257 }; 258 } 259 260 /** 261 * Constructs a stream that writes bytes to the given channel. 262 * 263 * <p> The stream will not be buffered. The stream will be safe for access 264 * by multiple concurrent threads. Closing the stream will in turn cause 265 * the channel to be closed. </p> 266 * 267 * @param ch 268 * The channel to which bytes will be written 269 * 270 * @return A new output stream 271 * 272 * @since 1.7 273 */ 274 public static OutputStream newOutputStream(final AsynchronousByteChannel ch) { 275 checkNotNull(ch, "ch"); 276 return new OutputStream() { 277 278 private ByteBuffer bb = null; 279 private byte[] bs = null; // Invoker's previous array 280 private byte[] b1 = null; 281 282 @Override 283 public synchronized void write(int b) throws IOException { 284 if (b1 == null) 285 b1 = new byte[1]; 286 b1[0] = (byte)b; 287 this.write(b1); 288 } 289 290 @Override 291 public synchronized void write(byte[] bs, int off, int len) 292 throws IOException 293 { 294 if ((off < 0) || (off > bs.length) || (len < 0) || 295 ((off + len) > bs.length) || ((off + len) < 0)) { 296 throw new IndexOutOfBoundsException(); 297 } else if (len == 0) { 298 return; 299 } 300 ByteBuffer bb = ((this.bs == bs) 301 ? this.bb 302 : ByteBuffer.wrap(bs)); 303 bb.limit(Math.min(off + len, bb.capacity())); 304 bb.position(off); 305 this.bb = bb; 306 this.bs = bs; 307 308 boolean interrupted = false; 309 try { 310 while (bb.remaining() > 0) { 311 try { 312 ch.write(bb).get(); 313 } catch (ExecutionException ee) { 314 throw new IOException(ee.getCause()); 315 } catch (InterruptedException ie) { 316 interrupted = true; 317 } 318 } 319 } finally { 320 if (interrupted) 321 Thread.currentThread().interrupt(); 322 } 323 } 324 325 @Override 326 public void close() throws IOException { 327 ch.close(); 328 } 329 }; 330 } 331 332 333 // -- Channels from streams -- 334 335 /** 336 * Constructs a channel that reads bytes from the given stream. 337 * 338 * <p> The resulting channel will not be buffered; it will simply redirect 339 * its I/O operations to the given stream. Closing the channel will in 340 * turn cause the stream to be closed. </p> 341 * 342 * @param in 343 * The stream from which bytes are to be read 344 * 345 * @return A new readable byte channel 346 */ 347 public static ReadableByteChannel newChannel(final InputStream in) { 348 checkNotNull(in, "in"); 349 350 if (in instanceof FileInputStream && 351 FileInputStream.class.equals(in.getClass())) { 352 return ((FileInputStream)in).getChannel(); 353 } 354 355 return new ReadableByteChannelImpl(in); 356 } 357 358 private static class ReadableByteChannelImpl 359 extends AbstractInterruptibleChannel // Not really interruptible 360 implements ReadableByteChannel 361 { 362 InputStream in; 363 private static final int TRANSFER_SIZE = 8192; 364 private byte buf[] = new byte[0]; 365 private boolean open = true; 366 private Object readLock = new Object(); 367 368 ReadableByteChannelImpl(InputStream in) { 369 this.in = in; 370 } 371 372 public int read(ByteBuffer dst) throws IOException { 373 int len = dst.remaining(); 374 int totalRead = 0; 375 int bytesRead = 0; 376 synchronized (readLock) { 377 while (totalRead < len) { 378 int bytesToRead = Math.min((len - totalRead), 379 TRANSFER_SIZE); 380 if (buf.length < bytesToRead) 381 buf = new byte[bytesToRead]; 382 if ((totalRead > 0) && !(in.available() > 0)) 383 break; // block at most once 384 try { 385 begin(); 386 bytesRead = in.read(buf, 0, bytesToRead); 387 } finally { 388 end(bytesRead > 0); 389 } 390 if (bytesRead < 0) 391 break; 392 else 393 totalRead += bytesRead; 394 dst.put(buf, 0, bytesRead); 395 } 396 if ((bytesRead < 0) && (totalRead == 0)) 397 return -1; 398 399 return totalRead; 400 } 401 } 402 403 protected void implCloseChannel() throws IOException { 404 in.close(); 405 open = false; 406 } 407 } 408 409 410 /** 411 * Constructs a channel that writes bytes to the given stream. 412 * 413 * <p> The resulting channel will not be buffered; it will simply redirect 414 * its I/O operations to the given stream. Closing the channel will in 415 * turn cause the stream to be closed. </p> 416 * 417 * @param out 418 * The stream to which bytes are to be written 419 * 420 * @return A new writable byte channel 421 */ 422 public static WritableByteChannel newChannel(final OutputStream out) { 423 checkNotNull(out, "out"); 424 return new WritableByteChannelImpl(out); 425 } 426 427 private static class WritableByteChannelImpl 428 extends AbstractInterruptibleChannel // Not really interruptible 429 implements WritableByteChannel 430 { 431 OutputStream out; 432 private static final int TRANSFER_SIZE = 8192; 433 private byte buf[] = new byte[0]; 434 private boolean open = true; 435 private Object writeLock = new Object(); 436 437 WritableByteChannelImpl(OutputStream out) { 438 this.out = out; 439 } 440 441 public int write(ByteBuffer src) throws IOException { 442 int len = src.remaining(); 443 int totalWritten = 0; 444 synchronized (writeLock) { 445 while (totalWritten < len) { 446 int bytesToWrite = Math.min((len - totalWritten), 447 TRANSFER_SIZE); 448 if (buf.length < bytesToWrite) 449 buf = new byte[bytesToWrite]; 450 src.get(buf, 0, bytesToWrite); 451 try { 452 begin(); 453 out.write(buf, 0, bytesToWrite); 454 } finally { 455 end(bytesToWrite > 0); 456 } 457 totalWritten += bytesToWrite; 458 } 459 return totalWritten; 460 } 461 } 462 463 protected void implCloseChannel() throws IOException { 464 out.close(); 465 open = false; 466 } 467 } 468 469 470 // -- Character streams from channels -- 471 472 /** 473 * Constructs a reader that decodes bytes from the given channel using the 474 * given decoder. 475 * 476 * <p> The resulting stream will contain an internal input buffer of at 477 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 478 * will, as needed, fill the buffer by reading bytes from the underlying 479 * channel; if the channel is in non-blocking mode when bytes are to be 480 * read then an {@link IllegalBlockingModeException} will be thrown. The 481 * resulting stream will not otherwise be buffered, and it will not support 482 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 483 * Closing the stream will in turn cause the channel to be closed. </p> 484 * 485 * @param ch 486 * The channel from which bytes will be read 487 * 488 * @param dec 489 * The charset decoder to be used 490 * 491 * @param minBufferCap 492 * The minimum capacity of the internal byte buffer, 493 * or <tt>-1</tt> if an implementation-dependent 494 * default capacity is to be used 495 * 496 * @return A new reader 497 */ 498 public static Reader newReader(ReadableByteChannel ch, 499 CharsetDecoder dec, 500 int minBufferCap) 501 { 502 checkNotNull(ch, "ch"); 503 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 504 } 505 506 /** 507 * Constructs a reader that decodes bytes from the given channel according 508 * to the named charset. 509 * 510 * <p> An invocation of this method of the form 511 * 512 * <blockquote><pre> 513 * Channels.newReader(ch, csname)</pre></blockquote> 514 * 515 * behaves in exactly the same way as the expression 516 * 517 * <blockquote><pre> 518 * Channels.newReader(ch, 519 * Charset.forName(csName) 520 * .newDecoder(), 521 * -1);</pre></blockquote> 522 * 523 * @param ch 524 * The channel from which bytes will be read 525 * 526 * @param csName 527 * The name of the charset to be used 528 * 529 * @return A new reader 530 * 531 * @throws UnsupportedCharsetException 532 * If no support for the named charset is available 533 * in this instance of the Java virtual machine 534 */ 535 public static Reader newReader(ReadableByteChannel ch, 536 String csName) 537 { 538 checkNotNull(csName, "csName"); 539 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 540 } 541 542 /** 543 * Constructs a writer that encodes characters using the given encoder and 544 * writes the resulting bytes to the given channel. 545 * 546 * <p> The resulting stream will contain an internal output buffer of at 547 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 548 * will, as needed, flush the buffer by writing bytes to the underlying 549 * channel; if the channel is in non-blocking mode when bytes are to be 550 * written then an {@link IllegalBlockingModeException} will be thrown. 551 * The resulting stream will not otherwise be buffered. Closing the stream 552 * will in turn cause the channel to be closed. </p> 553 * 554 * @param ch 555 * The channel to which bytes will be written 556 * 557 * @param enc 558 * The charset encoder to be used 559 * 560 * @param minBufferCap 561 * The minimum capacity of the internal byte buffer, 562 * or <tt>-1</tt> if an implementation-dependent 563 * default capacity is to be used 564 * 565 * @return A new writer 566 */ 567 public static Writer newWriter(final WritableByteChannel ch, 568 final CharsetEncoder enc, 569 final int minBufferCap) 570 { 571 checkNotNull(ch, "ch"); 572 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 573 } 574 575 /** 576 * Constructs a writer that encodes characters according to the named 577 * charset and writes the resulting bytes to the given channel. 578 * 579 * <p> An invocation of this method of the form 580 * 581 * <blockquote><pre> 582 * Channels.newWriter(ch, csname)</pre></blockquote> 583 * 584 * behaves in exactly the same way as the expression 585 * 586 * <blockquote><pre> 587 * Channels.newWriter(ch, 588 * Charset.forName(csName) 589 * .newEncoder(), 590 * -1);</pre></blockquote> 591 * 592 * @param ch 593 * The channel to which bytes will be written 594 * 595 * @param csName 596 * The name of the charset to be used 597 * 598 * @return A new writer 599 * 600 * @throws UnsupportedCharsetException 601 * If no support for the named charset is available 602 * in this instance of the Java virtual machine 603 */ 604 public static Writer newWriter(WritableByteChannel ch, 605 String csName) 606 { 607 checkNotNull(csName, "csName"); 608 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 609 } 610} 611