Channels.java revision ae2e19f0088a92c7a2a24385a6383ba7749413d3
1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27package java.nio.channels; 28 29import java.io.FileInputStream; 30import java.io.FileOutputStream; 31import java.io.InputStream; 32import java.io.OutputStream; 33import java.io.Reader; 34import java.io.Writer; 35import java.io.IOException; 36import java.nio.ByteBuffer; 37import java.nio.charset.Charset; 38import java.nio.charset.CharsetDecoder; 39import java.nio.charset.CharsetEncoder; 40import java.nio.charset.UnsupportedCharsetException; 41import java.nio.channels.spi.AbstractInterruptibleChannel; 42import java.util.concurrent.ExecutionException; 43import sun.nio.ch.ChannelInputStream; 44import sun.nio.cs.StreamDecoder; 45import sun.nio.cs.StreamEncoder; 46 47 48/** 49 * Utility methods for channels and streams. 50 * 51 * <p> This class defines static methods that support the interoperation of the 52 * stream classes of the <tt>{@link java.io}</tt> package with the channel 53 * classes of this package. </p> 54 * 55 * 56 * @author Mark Reinhold 57 * @author Mike McCloskey 58 * @author JSR-51 Expert Group 59 * @since 1.4 60 */ 61 62public final class Channels { 63 64 private Channels() { } // No instantiation 65 66 private static void checkNotNull(Object o, String name) { 67 if (o == null) 68 throw new NullPointerException("\"" + name + "\" is null!"); 69 } 70 71 /** 72 * Write all remaining bytes in buffer to the given channel. 73 * If the channel is selectable then it must be configured blocking. 74 */ 75 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 76 throws IOException 77 { 78 while (bb.remaining() > 0) { 79 int n = ch.write(bb); 80 if (n <= 0) 81 throw new RuntimeException("no bytes written"); 82 } 83 } 84 85 /** 86 * Write all remaining bytes in buffer to the given channel. 87 * 88 * @throws IllegalBlockingException 89 * If the channel is selectable and configured non-blocking. 90 */ 91 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 92 throws IOException 93 { 94 if (ch instanceof SelectableChannel) { 95 SelectableChannel sc = (SelectableChannel)ch; 96 synchronized (sc.blockingLock()) { 97 if (!sc.isBlocking()) 98 throw new IllegalBlockingModeException(); 99 writeFullyImpl(ch, bb); 100 } 101 } else { 102 writeFullyImpl(ch, bb); 103 } 104 } 105 106 // -- Byte streams from channels -- 107 108 /** 109 * Constructs a stream that reads bytes from the given channel. 110 * 111 * <p> The <tt>read</tt> methods of the resulting stream will throw an 112 * {@link IllegalBlockingModeException} if invoked while the underlying 113 * channel is in non-blocking mode. The stream will not be buffered, and 114 * it will not support the {@link InputStream#mark mark} or {@link 115 * InputStream#reset reset} methods. The stream will be safe for access by 116 * multiple concurrent threads. Closing the stream will in turn cause the 117 * channel to be closed. </p> 118 * 119 * @param ch 120 * The channel from which bytes will be read 121 * 122 * @return A new input stream 123 */ 124 public static InputStream newInputStream(ReadableByteChannel ch) { 125 checkNotNull(ch, "ch"); 126 return new sun.nio.ch.ChannelInputStream(ch); 127 } 128 129 /** 130 * Constructs a stream that writes bytes to the given channel. 131 * 132 * <p> The <tt>write</tt> methods of the resulting stream will throw an 133 * {@link IllegalBlockingModeException} if invoked while the underlying 134 * channel is in non-blocking mode. The stream will not be buffered. The 135 * stream will be safe for access by multiple concurrent threads. Closing 136 * the stream will in turn cause the channel to be closed. </p> 137 * 138 * @param ch 139 * The channel to which bytes will be written 140 * 141 * @return A new output stream 142 */ 143 public static OutputStream newOutputStream(final WritableByteChannel ch) { 144 checkNotNull(ch, "ch"); 145 146 return new OutputStream() { 147 148 private ByteBuffer bb = null; 149 private byte[] bs = null; // Invoker's previous array 150 private byte[] b1 = null; 151 152 public synchronized void write(int b) throws IOException { 153 if (b1 == null) 154 b1 = new byte[1]; 155 b1[0] = (byte)b; 156 this.write(b1); 157 } 158 159 public synchronized void write(byte[] bs, int off, int len) 160 throws IOException 161 { 162 if ((off < 0) || (off > bs.length) || (len < 0) || 163 ((off + len) > bs.length) || ((off + len) < 0)) { 164 throw new IndexOutOfBoundsException(); 165 } else if (len == 0) { 166 return; 167 } 168 ByteBuffer bb = ((this.bs == bs) 169 ? this.bb 170 : ByteBuffer.wrap(bs)); 171 bb.limit(Math.min(off + len, bb.capacity())); 172 bb.position(off); 173 this.bb = bb; 174 this.bs = bs; 175 Channels.writeFully(ch, bb); 176 } 177 178 public void close() throws IOException { 179 ch.close(); 180 } 181 182 }; 183 } 184 // -- Channels from streams -- 185 186 /** 187 * Constructs a channel that reads bytes from the given stream. 188 * 189 * <p> The resulting channel will not be buffered; it will simply redirect 190 * its I/O operations to the given stream. Closing the channel will in 191 * turn cause the stream to be closed. </p> 192 * 193 * @param in 194 * The stream from which bytes are to be read 195 * 196 * @return A new readable byte channel 197 */ 198 public static ReadableByteChannel newChannel(final InputStream in) { 199 checkNotNull(in, "in"); 200 201 if (in instanceof FileInputStream && 202 FileInputStream.class.equals(in.getClass())) { 203 return ((FileInputStream)in).getChannel(); 204 } 205 206 return new ReadableByteChannelImpl(in); 207 } 208 209 private static class ReadableByteChannelImpl 210 extends AbstractInterruptibleChannel // Not really interruptible 211 implements ReadableByteChannel 212 { 213 InputStream in; 214 private static final int TRANSFER_SIZE = 8192; 215 private byte buf[] = new byte[0]; 216 private boolean open = true; 217 private Object readLock = new Object(); 218 219 ReadableByteChannelImpl(InputStream in) { 220 this.in = in; 221 } 222 223 public int read(ByteBuffer dst) throws IOException { 224 int len = dst.remaining(); 225 int totalRead = 0; 226 int bytesRead = 0; 227 synchronized (readLock) { 228 while (totalRead < len) { 229 int bytesToRead = Math.min((len - totalRead), 230 TRANSFER_SIZE); 231 if (buf.length < bytesToRead) 232 buf = new byte[bytesToRead]; 233 if ((totalRead > 0) && !(in.available() > 0)) 234 break; // block at most once 235 try { 236 begin(); 237 bytesRead = in.read(buf, 0, bytesToRead); 238 } finally { 239 end(bytesRead > 0); 240 } 241 if (bytesRead < 0) 242 break; 243 else 244 totalRead += bytesRead; 245 dst.put(buf, 0, bytesRead); 246 } 247 if ((bytesRead < 0) && (totalRead == 0)) 248 return -1; 249 250 return totalRead; 251 } 252 } 253 254 protected void implCloseChannel() throws IOException { 255 in.close(); 256 open = false; 257 } 258 } 259 260 261 /** 262 * Constructs a channel that writes bytes to the given stream. 263 * 264 * <p> The resulting channel will not be buffered; it will simply redirect 265 * its I/O operations to the given stream. Closing the channel will in 266 * turn cause the stream to be closed. </p> 267 * 268 * @param out 269 * The stream to which bytes are to be written 270 * 271 * @return A new writable byte channel 272 */ 273 public static WritableByteChannel newChannel(final OutputStream out) { 274 checkNotNull(out, "out"); 275 return new WritableByteChannelImpl(out); 276 } 277 278 private static class WritableByteChannelImpl 279 extends AbstractInterruptibleChannel // Not really interruptible 280 implements WritableByteChannel 281 { 282 OutputStream out; 283 private static final int TRANSFER_SIZE = 8192; 284 private byte buf[] = new byte[0]; 285 private boolean open = true; 286 private Object writeLock = new Object(); 287 288 WritableByteChannelImpl(OutputStream out) { 289 this.out = out; 290 } 291 292 public int write(ByteBuffer src) throws IOException { 293 int len = src.remaining(); 294 int totalWritten = 0; 295 synchronized (writeLock) { 296 while (totalWritten < len) { 297 int bytesToWrite = Math.min((len - totalWritten), 298 TRANSFER_SIZE); 299 if (buf.length < bytesToWrite) 300 buf = new byte[bytesToWrite]; 301 src.get(buf, 0, bytesToWrite); 302 try { 303 begin(); 304 out.write(buf, 0, bytesToWrite); 305 } finally { 306 end(bytesToWrite > 0); 307 } 308 totalWritten += bytesToWrite; 309 } 310 return totalWritten; 311 } 312 } 313 314 protected void implCloseChannel() throws IOException { 315 out.close(); 316 open = false; 317 } 318 } 319 320 321 // -- Character streams from channels -- 322 323 /** 324 * Constructs a reader that decodes bytes from the given channel using the 325 * given decoder. 326 * 327 * <p> The resulting stream will contain an internal input buffer of at 328 * least <tt>minBufferCap</tt> bytes. The stream's <tt>read</tt> methods 329 * will, as needed, fill the buffer by reading bytes from the underlying 330 * channel; if the channel is in non-blocking mode when bytes are to be 331 * read then an {@link IllegalBlockingModeException} will be thrown. The 332 * resulting stream will not otherwise be buffered, and it will not support 333 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 334 * Closing the stream will in turn cause the channel to be closed. </p> 335 * 336 * @param ch 337 * The channel from which bytes will be read 338 * 339 * @param dec 340 * The charset decoder to be used 341 * 342 * @param minBufferCap 343 * The minimum capacity of the internal byte buffer, 344 * or <tt>-1</tt> if an implementation-dependent 345 * default capacity is to be used 346 * 347 * @return A new reader 348 */ 349 public static Reader newReader(ReadableByteChannel ch, 350 CharsetDecoder dec, 351 int minBufferCap) 352 { 353 checkNotNull(ch, "ch"); 354 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 355 } 356 357 /** 358 * Constructs a reader that decodes bytes from the given channel according 359 * to the named charset. 360 * 361 * <p> An invocation of this method of the form 362 * 363 * <blockquote><pre> 364 * Channels.newReader(ch, csname)</pre></blockquote> 365 * 366 * behaves in exactly the same way as the expression 367 * 368 * <blockquote><pre> 369 * Channels.newReader(ch, 370 * Charset.forName(csName) 371 * .newDecoder(), 372 * -1);</pre></blockquote> 373 * 374 * @param ch 375 * The channel from which bytes will be read 376 * 377 * @param csName 378 * The name of the charset to be used 379 * 380 * @return A new reader 381 * 382 * @throws UnsupportedCharsetException 383 * If no support for the named charset is available 384 * in this instance of the Java virtual machine 385 */ 386 public static Reader newReader(ReadableByteChannel ch, 387 String csName) 388 { 389 checkNotNull(csName, "csName"); 390 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 391 } 392 393 /** 394 * Constructs a writer that encodes characters using the given encoder and 395 * writes the resulting bytes to the given channel. 396 * 397 * <p> The resulting stream will contain an internal output buffer of at 398 * least <tt>minBufferCap</tt> bytes. The stream's <tt>write</tt> methods 399 * will, as needed, flush the buffer by writing bytes to the underlying 400 * channel; if the channel is in non-blocking mode when bytes are to be 401 * written then an {@link IllegalBlockingModeException} will be thrown. 402 * The resulting stream will not otherwise be buffered. Closing the stream 403 * will in turn cause the channel to be closed. </p> 404 * 405 * @param ch 406 * The channel to which bytes will be written 407 * 408 * @param enc 409 * The charset encoder to be used 410 * 411 * @param minBufferCap 412 * The minimum capacity of the internal byte buffer, 413 * or <tt>-1</tt> if an implementation-dependent 414 * default capacity is to be used 415 * 416 * @return A new writer 417 */ 418 public static Writer newWriter(final WritableByteChannel ch, 419 final CharsetEncoder enc, 420 final int minBufferCap) 421 { 422 checkNotNull(ch, "ch"); 423 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 424 } 425 426 /** 427 * Constructs a writer that encodes characters according to the named 428 * charset and writes the resulting bytes to the given channel. 429 * 430 * <p> An invocation of this method of the form 431 * 432 * <blockquote><pre> 433 * Channels.newWriter(ch, csname)</pre></blockquote> 434 * 435 * behaves in exactly the same way as the expression 436 * 437 * <blockquote><pre> 438 * Channels.newWriter(ch, 439 * Charset.forName(csName) 440 * .newEncoder(), 441 * -1);</pre></blockquote> 442 * 443 * @param ch 444 * The channel to which bytes will be written 445 * 446 * @param csName 447 * The name of the charset to be used 448 * 449 * @return A new writer 450 * 451 * @throws UnsupportedCharsetException 452 * If no support for the named charset is available 453 * in this instance of the Java virtual machine 454 */ 455 public static Writer newWriter(WritableByteChannel ch, 456 String csName) 457 { 458 checkNotNull(csName, "csName"); 459 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 460 } 461} 462