1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio.channels; 19 20import java.io.IOException; 21import java.io.InputStream; 22import java.io.OutputStream; 23import java.io.Reader; 24import java.io.Writer; 25import java.nio.ByteBuffer; 26import java.nio.CharBuffer; 27import java.nio.channels.spi.AbstractInterruptibleChannel; 28import java.nio.charset.Charset; 29import java.nio.charset.CharsetDecoder; 30import java.nio.charset.CharsetEncoder; 31import org.apache.harmony.nio.internal.IOUtil; 32 33/** 34 * This class provides several utilities to get I/O streams from channels. 35 */ 36public final class Channels { 37 38 /* 39 * Not intended to be instantiated. 40 */ 41 private Channels() { 42 super(); 43 } 44 45 /** 46 * Returns an input stream on the given channel. The resulting stream has 47 * the following properties: 48 * <ul> 49 * <li>If the stream is closed, then the underlying channel is closed as 50 * well.</li> 51 * <li>It is thread safe.</li> 52 * <li>It throws an {@link IllegalBlockingModeException} if the channel is 53 * in non-blocking mode and {@code read} is called.</li> 54 * <li>Neither {@code mark} nor {@code reset} is supported.</li> 55 * <li>It is not buffered.</li> 56 * </ul> 57 * 58 * @param channel 59 * the channel to be wrapped by an InputStream. 60 * @return an InputStream that takes bytes from the given byte channel. 61 */ 62 public static InputStream newInputStream(ReadableByteChannel channel) { 63 return new ReadableByteChannelInputStream(channel); 64 } 65 66 /** 67 * Returns an output stream on the given channel. The resulting stream has 68 * the following properties: 69 * <ul> 70 * <li>If the stream is closed, then the underlying channel is closed as 71 * well.</li> 72 * <li>It is thread safe.</li> 73 * <li>It throws an {@link IllegalBlockingModeException} if the channel is 74 * in non-blocking mode and {@code write} is called.</li> 75 * <li>It is not buffered.</li> 76 * </ul> 77 * 78 * @param channel 79 * the channel to be wrapped by an OutputStream. 80 * @return an OutputStream that puts bytes onto the given byte channel. 81 */ 82 public static OutputStream newOutputStream(WritableByteChannel channel) { 83 return new WritableByteChannelOutputStream(channel); 84 } 85 86 /** 87 * Returns a readable channel on the given input stream. The resulting 88 * channel has the following properties: 89 * <ul> 90 * <li>If the channel is closed, then the underlying stream is closed as 91 * well.</li> 92 * <li>It is not buffered.</li> 93 * </ul> 94 * 95 * @param inputStream 96 * the stream to be wrapped by a byte channel. 97 * @return a byte channel that reads bytes from the input stream. 98 */ 99 public static ReadableByteChannel newChannel(InputStream inputStream) { 100 return new ReadableByteChannelImpl(inputStream); 101 } 102 103 /** 104 * Returns a writable channel on the given output stream. 105 * 106 * The resulting channel has following properties: 107 * <ul> 108 * <li>If the channel is closed, then the underlying stream is closed as 109 * well.</li> 110 * <li>It is not buffered.</li> 111 * </ul> 112 * 113 * @param outputStream 114 * the stream to be wrapped by a byte channel. 115 * @return a byte channel that writes bytes to the output stream. 116 */ 117 public static WritableByteChannel newChannel(OutputStream outputStream) { 118 return new WritableByteChannelImpl(outputStream); 119 } 120 121 /** 122 * Returns a reader that decodes bytes from a channel. 123 * 124 * @param channel 125 * the Channel to be read. 126 * @param decoder 127 * the Charset decoder to be used. 128 * @param minBufferCapacity 129 * The minimum size of the byte buffer, -1 means to use the 130 * default size. 131 * @return the reader. 132 */ 133 public static Reader newReader(ReadableByteChannel channel, 134 CharsetDecoder decoder, int minBufferCapacity) { 135 return new ByteChannelReader(new ReaderInputStream(channel), decoder, 136 minBufferCapacity); 137 } 138 139 /** 140 * Returns a reader that decodes bytes from a channel. This method creates a 141 * reader with a buffer of default size. 142 * 143 * @param channel 144 * the Channel to be read. 145 * @param charsetName 146 * the name of the charset. 147 * @return the reader. 148 * @throws java.nio.charset.UnsupportedCharsetException 149 * if the given charset name is not supported. 150 */ 151 public static Reader newReader(ReadableByteChannel channel, 152 String charsetName) { 153 return newReader(channel, Charset.forName(charsetName).newDecoder(), -1); 154 } 155 156 /** 157 * Returns a writer that encodes characters with the specified 158 * {@code encoder} and sends the bytes to the specified channel. 159 * 160 * @param channel 161 * the Channel to write to. 162 * @param encoder 163 * the CharsetEncoder to be used. 164 * @param minBufferCapacity 165 * the minimum size of the byte buffer, -1 means to use the 166 * default size. 167 * @return the writer. 168 */ 169 public static Writer newWriter(WritableByteChannel channel, 170 CharsetEncoder encoder, int minBufferCapacity) { 171 return new ByteChannelWriter(new WritableByteChannelOutputStream( 172 channel), encoder, minBufferCapacity); 173 } 174 175 /** 176 * Returns a writer that encodes characters with the specified 177 * {@code encoder} and sends the bytes to the specified channel. This method 178 * creates a writer with a buffer of default size. 179 * 180 * @param channel 181 * the Channel to be written to. 182 * @param charsetName 183 * the name of the charset. 184 * @return the writer. 185 * @throws java.nio.charset.UnsupportedCharsetException 186 * if the given charset name is not supported. 187 */ 188 public static Writer newWriter(WritableByteChannel channel, 189 String charsetName) { 190 return newWriter(channel, Charset.forName(charsetName).newEncoder(), -1); 191 } 192 193 /* 194 * wrap a byte array to a ByteBuffer 195 */ 196 static ByteBuffer wrapByteBuffer(byte[] bytes, int offset, int length) { 197 ByteBuffer buffer = ByteBuffer.wrap(bytes); 198 int newLimit = offset + length <= buffer.capacity() ? offset + length 199 : buffer.capacity(); 200 buffer.limit(newLimit); 201 buffer.position(offset); 202 return buffer; 203 } 204 205 private static class ChannelInputStream extends InputStream { 206 207 protected ReadableByteChannel channel; 208 209 public ChannelInputStream(ReadableByteChannel aChannel) { 210 super(); 211 channel = aChannel; 212 } 213 214 @Override 215 public synchronized int read() throws IOException { 216 byte[] oneByte = new byte[1]; 217 int n = read(oneByte); 218 if (n == 1) { 219 // reads a single byte 0-255 220 return oneByte[0] & 0xff; 221 } 222 return -1; 223 } 224 225 @Override 226 public synchronized void close() throws IOException { 227 channel.close(); 228 } 229 } 230 231 /* 232 * Wrapper class used for newInputStream(ReadableByteChannel channel) 233 */ 234 private static class ReadableByteChannelInputStream extends 235 ChannelInputStream { 236 237 public ReadableByteChannelInputStream(ReadableByteChannel aChannel) { 238 super(aChannel); 239 } 240 241 @Override 242 public synchronized int read(byte[] target, int offset, int length) 243 throws IOException { 244 // avoid int overflow, check null target 245 if (length + offset > target.length || length < 0 || offset < 0) { 246 throw new ArrayIndexOutOfBoundsException(); 247 } 248 if (0 == length) { 249 return 0; 250 } 251 if (channel instanceof SelectableChannel) { 252 if (!((SelectableChannel) channel).isBlocking()) { 253 throw new IllegalBlockingModeException(); 254 } 255 } 256 ByteBuffer buffer = ByteBuffer.wrap(target, offset, length); 257 return channel.read(buffer); 258 } 259 } 260 261 /* 262 * Wrapper class used for newReader(ReadableByteChannel channel, 263 * CharsetDecoder decoder, int minBufferCapacity) 264 */ 265 private static class ReaderInputStream extends ChannelInputStream { 266 267 public ReaderInputStream(ReadableByteChannel aChannel) { 268 super(aChannel); 269 } 270 271 @Override 272 public synchronized int read(byte[] target, int offset, int length) 273 throws IOException { 274 // avoid int overflow, check null target 275 if (length + offset > target.length || length < 0 || offset < 0) { 276 throw new ArrayIndexOutOfBoundsException(); 277 } 278 if (0 == length) { 279 return 0; 280 } 281 ByteBuffer buffer = ByteBuffer.wrap(target, offset, length); 282 return channel.read(buffer); 283 } 284 } 285 286 /* 287 * Wrapper class used for newOutputStream(WritableByteChannel channel) 288 */ 289 private static class WritableByteChannelOutputStream extends OutputStream { 290 291 private WritableByteChannel channel; 292 293 public WritableByteChannelOutputStream(WritableByteChannel aChannel) { 294 super(); 295 channel = aChannel; 296 } 297 298 @Override 299 public synchronized void write(int oneByte) throws IOException { 300 byte[] wrappedByte = new byte[1]; 301 wrappedByte[0] = (byte) oneByte; 302 write(wrappedByte); 303 } 304 305 @Override 306 public synchronized void write(byte[] source, int offset, int length) 307 throws IOException { 308 // avoid int overflow, check null source 309 if (length + offset > source.length || length < 0 || offset < 0) { 310 throw new ArrayIndexOutOfBoundsException(); 311 } 312 if (0 == length) { 313 return; 314 } 315 if (channel instanceof SelectableChannel) { 316 if (!((SelectableChannel) channel).isBlocking()) { 317 throw new IllegalBlockingModeException(); 318 } 319 } 320 ByteBuffer buffer = ByteBuffer.wrap(source, offset, length); 321 channel.write(buffer); 322 } 323 324 @Override 325 public synchronized void close() throws IOException { 326 channel.close(); 327 } 328 } 329 330 /* 331 * Wrapper class used for newChannel(InputStream inputStream) 332 */ 333 private static class ReadableByteChannelImpl extends 334 AbstractInterruptibleChannel implements ReadableByteChannel { 335 private InputStream inputStream; 336 337 ReadableByteChannelImpl(InputStream aInputStream) { 338 super(); 339 inputStream = aInputStream; 340 } 341 342 public synchronized int read(ByteBuffer target) throws IOException { 343 if (!isOpen()) { 344 throw new ClosedChannelException(); 345 } 346 int bytesRemain = target.remaining(); 347 byte[] bytes = new byte[bytesRemain]; 348 int readCount = 0; 349 try { 350 begin(); 351 readCount = inputStream.read(bytes); 352 } finally { 353 end(readCount >= 0); 354 } 355 if (readCount > 0) { 356 target.put(bytes, 0, readCount); 357 } 358 return readCount; 359 } 360 361 @Override 362 protected void implCloseChannel() throws IOException { 363 inputStream.close(); 364 } 365 } 366 367 /* 368 * Wrapper class used for newChannel(OutputStream outputStream) 369 */ 370 private static class WritableByteChannelImpl extends 371 AbstractInterruptibleChannel implements WritableByteChannel { 372 private OutputStream outputStream; 373 374 WritableByteChannelImpl(OutputStream aOutputStream) { 375 super(); 376 outputStream = aOutputStream; 377 } 378 379 public synchronized int write(ByteBuffer source) throws IOException { 380 if (!isOpen()) { 381 throw new ClosedChannelException(); 382 } 383 int bytesRemain = source.remaining(); 384 if (bytesRemain == 0) { 385 return 0; 386 } 387 byte[] buf = new byte[bytesRemain]; 388 source.get(buf); 389 try { 390 begin(); 391 outputStream.write(buf, 0, bytesRemain); 392 } finally { 393 end(bytesRemain >= 0); 394 } 395 return bytesRemain; 396 } 397 398 @Override 399 protected void implCloseChannel() throws IOException { 400 outputStream.close(); 401 } 402 } 403 404 /* 405 * Wrapper class used for newReader(ReadableByteChannel channel, 406 * CharsetDecoder decoder, int minBufferCapacity) 407 */ 408 private static class ByteChannelReader extends Reader { 409 410 private InputStream inputStream; 411 412 private static final int BUFFER_SIZE = 8192; 413 414 CharsetDecoder decoder; 415 416 ByteBuffer bytes; 417 418 CharBuffer chars; 419 420 public ByteChannelReader(InputStream aInputStream, 421 CharsetDecoder aDecoder, int minBufferCapacity) { 422 super(aInputStream); 423 aDecoder.reset(); 424 inputStream = aInputStream; 425 int bufferSize = Math.max(minBufferCapacity, BUFFER_SIZE); 426 bytes = ByteBuffer.allocate(bufferSize); 427 chars = CharBuffer.allocate(bufferSize); 428 decoder = aDecoder; 429 chars.limit(0); 430 } 431 432 @Override 433 public void close() throws IOException { 434 synchronized (lock) { 435 decoder = null; 436 if (inputStream != null) { 437 inputStream.close(); 438 inputStream = null; 439 } 440 } 441 } 442 443 @Override 444 public boolean ready() { 445 synchronized (lock) { 446 if (null == inputStream) { 447 return false; 448 } 449 try { 450 return chars.limit() > chars.position() 451 || inputStream.available() > 0; 452 } catch (IOException e) { 453 return false; 454 } 455 } 456 } 457 458 @Override 459 public int read() throws IOException { 460 return IOUtil.readInputStreamReader(inputStream, bytes, chars, 461 decoder, lock); 462 } 463 464 @Override 465 public int read(char[] buf, int offset, int length) throws IOException { 466 return IOUtil.readInputStreamReader(buf, offset, length, 467 inputStream, bytes, chars, decoder, lock); 468 } 469 } 470 471 /* 472 * Wrapper class used for newWriter(WritableByteChannel channel, 473 * CharsetEncoder encoder, int minBufferCapacity) 474 */ 475 private static class ByteChannelWriter extends Writer { 476 477 private static final int BUFFER_SIZE = 8192; 478 479 private OutputStream outputStream; 480 481 private CharsetEncoder encoder; 482 483 private ByteBuffer byteBuf; 484 485 public ByteChannelWriter(OutputStream aOutputStream, 486 CharsetEncoder aEncoder, int minBufferCap) { 487 super(aOutputStream); 488 aEncoder.charset(); 489 outputStream = aOutputStream; 490 byteBuf = ByteBuffer.allocate(Math.max(minBufferCap, BUFFER_SIZE)); 491 encoder = aEncoder; 492 } 493 494 @Override 495 public void close() throws IOException { 496 synchronized (lock) { 497 if (encoder != null) { 498 flush(); 499 outputStream.flush(); 500 outputStream.close(); 501 encoder = null; 502 byteBuf = null; 503 } 504 } 505 } 506 507 @Override 508 public void flush() throws IOException { 509 IOUtil 510 .flushOutputStreamWriter(outputStream, byteBuf, encoder, 511 lock); 512 } 513 514 @Override 515 public void write(char[] buf, int offset, int count) throws IOException { 516 IOUtil.writeOutputStreamWriter(buf, offset, count, outputStream, 517 byteBuf, encoder, lock); 518 } 519 520 @Override 521 public void write(int oneChar) throws IOException { 522 IOUtil.writeOutputStreamWriter(oneChar, outputStream, byteBuf, 523 encoder, lock); 524 } 525 526 @Override 527 public void write(String str, int offset, int count) throws IOException { 528 IOUtil.writeOutputStreamWriter(str, offset, count, outputStream, 529 byteBuf, encoder, lock); 530 } 531 } 532} 533