1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio.channels;
19
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.OutputStream;
23import java.io.Reader;
24import java.io.Writer;
25import java.nio.ByteBuffer;
26import java.nio.CharBuffer;
27import java.nio.channels.spi.AbstractInterruptibleChannel;
28import java.nio.charset.Charset;
29import java.nio.charset.CharsetDecoder;
30import java.nio.charset.CharsetEncoder;
31import org.apache.harmony.nio.internal.IOUtil;
32
33/**
34 * This class provides several utilities to get I/O streams from channels.
35 */
36public final class Channels {
37
38    /*
39     * Not intended to be instantiated.
40     */
41    private Channels() {
42        super();
43    }
44
45    /**
46     * Returns an input stream on the given channel. The resulting stream has
47     * the following properties:
48     * <ul>
49     * <li>If the stream is closed, then the underlying channel is closed as
50     * well.</li>
51     * <li>It is thread safe.</li>
52     * <li>It throws an {@link IllegalBlockingModeException} if the channel is
53     * in non-blocking mode and {@code read} is called.</li>
54     * <li>Neither {@code mark} nor {@code reset} is supported.</li>
55     * <li>It is not buffered.</li>
56     * </ul>
57     *
58     * @param channel
59     *            the channel to be wrapped by an InputStream.
60     * @return an InputStream that takes bytes from the given byte channel.
61     */
62    public static InputStream newInputStream(ReadableByteChannel channel) {
63        return new ReadableByteChannelInputStream(channel);
64    }
65
66    /**
67     * Returns an output stream on the given channel. The resulting stream has
68     * the following properties:
69     * <ul>
70     * <li>If the stream is closed, then the underlying channel is closed as
71     * well.</li>
72     * <li>It is thread safe.</li>
73     * <li>It throws an {@link IllegalBlockingModeException} if the channel is
74     * in non-blocking mode and {@code write} is called.</li>
75     * <li>It is not buffered.</li>
76     * </ul>
77     *
78     * @param channel
79     *            the channel to be wrapped by an OutputStream.
80     * @return an OutputStream that puts bytes onto the given byte channel.
81     */
82    public static OutputStream newOutputStream(WritableByteChannel channel) {
83        return new WritableByteChannelOutputStream(channel);
84    }
85
86    /**
87     * Returns a readable channel on the given input stream. The resulting
88     * channel has the following properties:
89     * <ul>
90     * <li>If the channel is closed, then the underlying stream is closed as
91     * well.</li>
92     * <li>It is not buffered.</li>
93     * </ul>
94     *
95     * @param inputStream
96     *            the stream to be wrapped by a byte channel.
97     * @return a byte channel that reads bytes from the input stream.
98     */
99    public static ReadableByteChannel newChannel(InputStream inputStream) {
100        return new ReadableByteChannelImpl(inputStream);
101    }
102
103    /**
104     * Returns a writable channel on the given output stream.
105     *
106     * The resulting channel has following properties:
107     * <ul>
108     * <li>If the channel is closed, then the underlying stream is closed as
109     * well.</li>
110     * <li>It is not buffered.</li>
111     * </ul>
112     *
113     * @param outputStream
114     *            the stream to be wrapped by a byte channel.
115     * @return a byte channel that writes bytes to the output stream.
116     */
117    public static WritableByteChannel newChannel(OutputStream outputStream) {
118        return new WritableByteChannelImpl(outputStream);
119    }
120
121    /**
122     * Returns a reader that decodes bytes from a channel.
123     *
124     * @param channel
125     *            the Channel to be read.
126     * @param decoder
127     *            the Charset decoder to be used.
128     * @param minBufferCapacity
129     *            The minimum size of the byte buffer, -1 means to use the
130     *            default size.
131     * @return the reader.
132     */
133    public static Reader newReader(ReadableByteChannel channel,
134            CharsetDecoder decoder, int minBufferCapacity) {
135        return new ByteChannelReader(new ReaderInputStream(channel), decoder,
136                minBufferCapacity);
137    }
138
139    /**
140     * Returns a reader that decodes bytes from a channel. This method creates a
141     * reader with a buffer of default size.
142     *
143     * @param channel
144     *            the Channel to be read.
145     * @param charsetName
146     *            the name of the charset.
147     * @return the reader.
148     * @throws java.nio.charset.UnsupportedCharsetException
149     *             if the given charset name is not supported.
150     */
151    public static Reader newReader(ReadableByteChannel channel,
152            String charsetName) {
153        return newReader(channel, Charset.forName(charsetName).newDecoder(), -1);
154    }
155
156    /**
157     * Returns a writer that encodes characters with the specified
158     * {@code encoder} and sends the bytes to the specified channel.
159     *
160     * @param channel
161     *            the Channel to write to.
162     * @param encoder
163     *            the CharsetEncoder to be used.
164     * @param minBufferCapacity
165     *            the minimum size of the byte buffer, -1 means to use the
166     *            default size.
167     * @return the writer.
168     */
169    public static Writer newWriter(WritableByteChannel channel,
170            CharsetEncoder encoder, int minBufferCapacity) {
171        return new ByteChannelWriter(new WritableByteChannelOutputStream(
172                channel), encoder, minBufferCapacity);
173    }
174
175    /**
176     * Returns a writer that encodes characters with the specified
177     * {@code encoder} and sends the bytes to the specified channel. This method
178     * creates a writer with a buffer of default size.
179     *
180     * @param channel
181     *            the Channel to be written to.
182     * @param charsetName
183     *            the name of the charset.
184     * @return the writer.
185     * @throws java.nio.charset.UnsupportedCharsetException
186     *             if the given charset name is not supported.
187     */
188    public static Writer newWriter(WritableByteChannel channel,
189            String charsetName) {
190        return newWriter(channel, Charset.forName(charsetName).newEncoder(), -1);
191    }
192
193    /*
194     * wrap a byte array to a ByteBuffer
195     */
196    static ByteBuffer wrapByteBuffer(byte[] bytes, int offset, int length) {
197        ByteBuffer buffer = ByteBuffer.wrap(bytes);
198        int newLimit = offset + length <= buffer.capacity() ? offset + length
199                : buffer.capacity();
200        buffer.limit(newLimit);
201        buffer.position(offset);
202        return buffer;
203    }
204
205    private static class ChannelInputStream extends InputStream {
206
207        protected ReadableByteChannel channel;
208
209        public ChannelInputStream(ReadableByteChannel aChannel) {
210            super();
211            channel = aChannel;
212        }
213
214        @Override
215        public synchronized int read() throws IOException {
216            byte[] oneByte = new byte[1];
217            int n = read(oneByte);
218            if (n == 1) {
219                // reads a single byte 0-255
220                return oneByte[0] & 0xff;
221            }
222            return -1;
223        }
224
225        @Override
226        public synchronized void close() throws IOException {
227            channel.close();
228        }
229    }
230
231    /*
232     * Wrapper class used for newInputStream(ReadableByteChannel channel)
233     */
234    private static class ReadableByteChannelInputStream extends
235            ChannelInputStream {
236
237        public ReadableByteChannelInputStream(ReadableByteChannel aChannel) {
238            super(aChannel);
239        }
240
241        @Override
242        public synchronized int read(byte[] target, int offset, int length)
243                throws IOException {
244            // avoid int overflow, check null target
245            if (length + offset > target.length || length < 0 || offset < 0) {
246                throw new ArrayIndexOutOfBoundsException();
247            }
248            if (0 == length) {
249                return 0;
250            }
251            if (channel instanceof SelectableChannel) {
252                if (!((SelectableChannel) channel).isBlocking()) {
253                    throw new IllegalBlockingModeException();
254                }
255            }
256            ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
257            return channel.read(buffer);
258        }
259    }
260
261    /*
262     * Wrapper class used for newReader(ReadableByteChannel channel,
263     * CharsetDecoder decoder, int minBufferCapacity)
264     */
265    private static class ReaderInputStream extends ChannelInputStream {
266
267        public ReaderInputStream(ReadableByteChannel aChannel) {
268            super(aChannel);
269        }
270
271        @Override
272        public synchronized int read(byte[] target, int offset, int length)
273                throws IOException {
274            // avoid int overflow, check null target
275            if (length + offset > target.length || length < 0 || offset < 0) {
276                throw new ArrayIndexOutOfBoundsException();
277            }
278            if (0 == length) {
279                return 0;
280            }
281            ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
282            return channel.read(buffer);
283        }
284    }
285
286    /*
287     * Wrapper class used for newOutputStream(WritableByteChannel channel)
288     */
289    private static class WritableByteChannelOutputStream extends OutputStream {
290
291        private WritableByteChannel channel;
292
293        public WritableByteChannelOutputStream(WritableByteChannel aChannel) {
294            super();
295            channel = aChannel;
296        }
297
298        @Override
299        public synchronized void write(int oneByte) throws IOException {
300            byte[] wrappedByte = new byte[1];
301            wrappedByte[0] = (byte) oneByte;
302            write(wrappedByte);
303        }
304
305        @Override
306        public synchronized void write(byte[] source, int offset, int length)
307                throws IOException {
308            // avoid int overflow, check null source
309            if (length + offset > source.length || length < 0 || offset < 0) {
310                throw new ArrayIndexOutOfBoundsException();
311            }
312            if (0 == length) {
313                return;
314            }
315            if (channel instanceof SelectableChannel) {
316                if (!((SelectableChannel) channel).isBlocking()) {
317                    throw new IllegalBlockingModeException();
318                }
319            }
320            ByteBuffer buffer = ByteBuffer.wrap(source, offset, length);
321            channel.write(buffer);
322        }
323
324        @Override
325        public synchronized void close() throws IOException {
326            channel.close();
327        }
328    }
329
330    /*
331     * Wrapper class used for newChannel(InputStream inputStream)
332     */
333    private static class ReadableByteChannelImpl extends
334            AbstractInterruptibleChannel implements ReadableByteChannel {
335        private InputStream inputStream;
336
337        ReadableByteChannelImpl(InputStream aInputStream) {
338            super();
339            inputStream = aInputStream;
340        }
341
342        public synchronized int read(ByteBuffer target) throws IOException {
343            if (!isOpen()) {
344                throw new ClosedChannelException();
345            }
346            int bytesRemain = target.remaining();
347            byte[] bytes = new byte[bytesRemain];
348            int readCount = 0;
349            try {
350                begin();
351                readCount = inputStream.read(bytes);
352            } finally {
353                end(readCount >= 0);
354            }
355            if (readCount > 0) {
356                target.put(bytes, 0, readCount);
357            }
358            return readCount;
359        }
360
361        @Override
362        protected void implCloseChannel() throws IOException {
363            inputStream.close();
364        }
365    }
366
367    /*
368     * Wrapper class used for newChannel(OutputStream outputStream)
369     */
370    private static class WritableByteChannelImpl extends
371            AbstractInterruptibleChannel implements WritableByteChannel {
372        private OutputStream outputStream;
373
374        WritableByteChannelImpl(OutputStream aOutputStream) {
375            super();
376            outputStream = aOutputStream;
377        }
378
379        public synchronized int write(ByteBuffer source) throws IOException {
380            if (!isOpen()) {
381                throw new ClosedChannelException();
382            }
383            int bytesRemain = source.remaining();
384            if (bytesRemain == 0) {
385                return 0;
386            }
387            byte[] buf = new byte[bytesRemain];
388            source.get(buf);
389            try {
390                begin();
391                outputStream.write(buf, 0, bytesRemain);
392            } finally {
393                end(bytesRemain >= 0);
394            }
395            return bytesRemain;
396        }
397
398        @Override
399        protected void implCloseChannel() throws IOException {
400            outputStream.close();
401        }
402    }
403
404    /*
405     * Wrapper class used for newReader(ReadableByteChannel channel,
406     * CharsetDecoder decoder, int minBufferCapacity)
407     */
408    private static class ByteChannelReader extends Reader {
409
410        private InputStream inputStream;
411
412        private static final int BUFFER_SIZE = 8192;
413
414        CharsetDecoder decoder;
415
416        ByteBuffer bytes;
417
418        CharBuffer chars;
419
420        public ByteChannelReader(InputStream aInputStream,
421                CharsetDecoder aDecoder, int minBufferCapacity) {
422            super(aInputStream);
423            aDecoder.reset();
424            inputStream = aInputStream;
425            int bufferSize = Math.max(minBufferCapacity, BUFFER_SIZE);
426            bytes = ByteBuffer.allocate(bufferSize);
427            chars = CharBuffer.allocate(bufferSize);
428            decoder = aDecoder;
429            chars.limit(0);
430        }
431
432        @Override
433        public void close() throws IOException {
434            synchronized (lock) {
435                decoder = null;
436                if (inputStream != null) {
437                    inputStream.close();
438                    inputStream = null;
439                }
440            }
441        }
442
443        @Override
444        public boolean ready() {
445            synchronized (lock) {
446                if (null == inputStream) {
447                    return false;
448                }
449                try {
450                    return chars.limit() > chars.position()
451                            || inputStream.available() > 0;
452                } catch (IOException e) {
453                    return false;
454                }
455            }
456        }
457
458        @Override
459        public int read() throws IOException {
460            return IOUtil.readInputStreamReader(inputStream, bytes, chars,
461                    decoder, lock);
462        }
463
464        @Override
465        public int read(char[] buf, int offset, int length) throws IOException {
466            return IOUtil.readInputStreamReader(buf, offset, length,
467                    inputStream, bytes, chars, decoder, lock);
468        }
469    }
470
471    /*
472     * Wrapper class used for newWriter(WritableByteChannel channel,
473     * CharsetEncoder encoder, int minBufferCapacity)
474     */
475    private static class ByteChannelWriter extends Writer {
476
477        private static final int BUFFER_SIZE = 8192;
478
479        private OutputStream outputStream;
480
481        private CharsetEncoder encoder;
482
483        private ByteBuffer byteBuf;
484
485        public ByteChannelWriter(OutputStream aOutputStream,
486                CharsetEncoder aEncoder, int minBufferCap) {
487            super(aOutputStream);
488            aEncoder.charset();
489            outputStream = aOutputStream;
490            byteBuf = ByteBuffer.allocate(Math.max(minBufferCap, BUFFER_SIZE));
491            encoder = aEncoder;
492        }
493
494        @Override
495        public void close() throws IOException {
496            synchronized (lock) {
497                if (encoder != null) {
498                    flush();
499                    outputStream.flush();
500                    outputStream.close();
501                    encoder = null;
502                    byteBuf = null;
503                }
504            }
505        }
506
507        @Override
508        public void flush() throws IOException {
509            IOUtil
510                    .flushOutputStreamWriter(outputStream, byteBuf, encoder,
511                            lock);
512        }
513
514        @Override
515        public void write(char[] buf, int offset, int count) throws IOException {
516            IOUtil.writeOutputStreamWriter(buf, offset, count, outputStream,
517                    byteBuf, encoder, lock);
518        }
519
520        @Override
521        public void write(int oneChar) throws IOException {
522            IOUtil.writeOutputStreamWriter(oneChar, outputStream, byteBuf,
523                    encoder, lock);
524        }
525
526        @Override
527        public void write(String str, int offset, int count) throws IOException {
528            IOUtil.writeOutputStreamWriter(str, offset, count, outputStream,
529                    byteBuf, encoder, lock);
530        }
531    }
532}
533