1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio.channels;
19
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.InputStreamReader;
23import java.io.OutputStream;
24import java.io.OutputStreamWriter;
25import java.io.Reader;
26import java.io.Writer;
27import java.nio.ByteBuffer;
28import java.nio.channels.spi.AbstractInterruptibleChannel;
29import java.nio.charset.Charset;
30import java.nio.charset.CharsetDecoder;
31import java.nio.charset.CharsetEncoder;
32import libcore.io.Streams;
33
34/**
35 * This class provides several utilities to get I/O streams from channels.
36 */
37public final class Channels {
38
39    private Channels() {}
40
41    /**
42     * Returns an input stream on the given channel. The resulting stream has
43     * the following properties:
44     * <ul>
45     * <li>If the stream is closed, then the underlying channel is closed as
46     * well.</li>
47     * <li>It is thread safe.</li>
48     * <li>It throws an {@link IllegalBlockingModeException} if the channel is
49     * in non-blocking mode and {@code read} is called.</li>
50     * <li>Neither {@code mark} nor {@code reset} is supported.</li>
51     * <li>It is not buffered.</li>
52     * </ul>
53     *
54     * @param channel
55     *            the channel to be wrapped by an InputStream.
56     * @return an InputStream that takes bytes from the given byte channel.
57     */
58    public static InputStream newInputStream(ReadableByteChannel channel) {
59        return new ChannelInputStream(channel);
60    }
61
62    /**
63     * Returns an output stream on the given channel. The resulting stream has
64     * the following properties:
65     * <ul>
66     * <li>If the stream is closed, then the underlying channel is closed as
67     * well.</li>
68     * <li>It is thread safe.</li>
69     * <li>It throws an {@link IllegalBlockingModeException} if the channel is
70     * in non-blocking mode and {@code write} is called.</li>
71     * <li>It is not buffered.</li>
72     * </ul>
73     *
74     * @param channel
75     *            the channel to be wrapped by an OutputStream.
76     * @return an OutputStream that puts bytes onto the given byte channel.
77     */
78    public static OutputStream newOutputStream(WritableByteChannel channel) {
79        return new ChannelOutputStream(channel);
80    }
81
82    /**
83     * Returns a readable channel on the given input stream. The resulting
84     * channel has the following properties:
85     * <ul>
86     * <li>If the channel is closed, then the underlying stream is closed as
87     * well.</li>
88     * <li>It is not buffered.</li>
89     * </ul>
90     *
91     * @param inputStream
92     *            the stream to be wrapped by a byte channel.
93     * @return a byte channel that reads bytes from the input stream.
94     */
95    public static ReadableByteChannel newChannel(InputStream inputStream) {
96        return new InputStreamChannel(inputStream);
97    }
98
99    /**
100     * Returns a writable channel on the given output stream.
101     *
102     * The resulting channel has following properties:
103     * <ul>
104     * <li>If the channel is closed, then the underlying stream is closed as
105     * well.</li>
106     * <li>It is not buffered.</li>
107     * </ul>
108     *
109     * @param outputStream
110     *            the stream to be wrapped by a byte channel.
111     * @return a byte channel that writes bytes to the output stream.
112     */
113    public static WritableByteChannel newChannel(OutputStream outputStream) {
114        return new OutputStreamChannel(outputStream);
115    }
116
117    /**
118     * Returns a reader that decodes bytes from a channel.
119     *
120     * @param channel
121     *            the Channel to be read.
122     * @param decoder
123     *            the Charset decoder to be used.
124     * @param minBufferCapacity
125     *            The minimum size of the byte buffer, -1 means to use the
126     *            default size.
127     * @return the reader.
128     */
129    public static Reader newReader(ReadableByteChannel channel,
130            CharsetDecoder decoder, int minBufferCapacity) {
131        /*
132         * This method doesn't honor minBufferCapacity. Ignoring that parameter
133         * saves us from having to add a hidden constructor to InputStreamReader.
134         */
135        return new InputStreamReader(new ChannelInputStream(channel), decoder);
136    }
137
138    /**
139     * Returns a reader that decodes bytes from a channel. This method creates a
140     * reader with a buffer of default size.
141     *
142     * @param channel
143     *            the Channel to be read.
144     * @param charsetName
145     *            the name of the charset.
146     * @return the reader.
147     * @throws java.nio.charset.UnsupportedCharsetException
148     *             if the given charset name is not supported.
149     */
150    public static Reader newReader(ReadableByteChannel channel,
151            String charsetName) {
152        if (charsetName == null) {
153            throw new NullPointerException();
154        }
155        return newReader(channel, Charset.forName(charsetName).newDecoder(), -1);
156    }
157
158    /**
159     * Returns a writer that encodes characters with the specified
160     * {@code encoder} and sends the bytes to the specified channel.
161     *
162     * @param channel
163     *            the Channel to write to.
164     * @param encoder
165     *            the CharsetEncoder to be used.
166     * @param minBufferCapacity
167     *            the minimum size of the byte buffer, -1 means to use the
168     *            default size.
169     * @return the writer.
170     */
171    public static Writer newWriter(WritableByteChannel channel,
172            CharsetEncoder encoder, int minBufferCapacity) {
173        /*
174         * This method doesn't honor minBufferCapacity. Ignoring that parameter
175         * saves us from having to add a hidden constructor to OutputStreamWriter.
176         */
177        return new OutputStreamWriter(new ChannelOutputStream(channel), encoder);
178    }
179
180    /**
181     * Returns a writer that encodes characters with the specified
182     * {@code encoder} and sends the bytes to the specified channel. This method
183     * creates a writer with a buffer of default size.
184     *
185     * @param channel
186     *            the Channel to be written to.
187     * @param charsetName
188     *            the name of the charset.
189     * @return the writer.
190     * @throws java.nio.charset.UnsupportedCharsetException
191     *             if the given charset name is not supported.
192     */
193    public static Writer newWriter(WritableByteChannel channel,
194            String charsetName) {
195        if (charsetName == null) {
196            throw new NullPointerException();
197        }
198        return newWriter(channel, Charset.forName(charsetName).newEncoder(), -1);
199    }
200
201    /**
202     * An input stream that delegates to a readable channel.
203     */
204    private static class ChannelInputStream extends InputStream {
205
206        private final ReadableByteChannel channel;
207
208        ChannelInputStream(ReadableByteChannel channel) {
209            if (channel == null) {
210                throw new NullPointerException();
211            }
212            this.channel = channel;
213        }
214
215        @Override public synchronized int read() throws IOException {
216            return Streams.readSingleByte(this);
217        }
218
219        @Override public synchronized int read(byte[] target, int offset, int length) throws IOException {
220            ByteBuffer buffer = ByteBuffer.wrap(target, offset, length);
221            checkBlocking(channel);
222            return channel.read(buffer);
223        }
224
225        @Override public int available() throws IOException {
226            if (channel instanceof FileChannel) {
227                FileChannel fileChannel = (FileChannel) channel;
228                long result = fileChannel.size() - fileChannel.position();
229                return result > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) result;
230            } else {
231                return super.available();
232            }
233        }
234
235        @Override
236        public synchronized void close() throws IOException {
237            channel.close();
238        }
239    }
240
241    /**
242     * An output stream that delegates to a writable channel.
243     */
244    private static class ChannelOutputStream extends OutputStream {
245
246        private final WritableByteChannel channel;
247
248        ChannelOutputStream(WritableByteChannel channel) {
249            if (channel == null) {
250                throw new NullPointerException();
251            }
252            this.channel = channel;
253        }
254
255        @Override
256        public synchronized void write(int oneByte) throws IOException {
257            byte[] wrappedByte = { (byte) oneByte };
258            write(wrappedByte);
259        }
260
261        @Override
262        public synchronized void write(byte[] source, int offset, int length) throws IOException {
263            ByteBuffer buffer = ByteBuffer.wrap(source, offset, length);
264            checkBlocking(channel);
265            int total = 0;
266            while (total < length) {
267                total += channel.write(buffer);
268            }
269        }
270
271        @Override
272        public synchronized void close() throws IOException {
273            channel.close();
274        }
275    }
276
277    static void checkBlocking(Channel channel) {
278        if (channel instanceof SelectableChannel && !((SelectableChannel) channel).isBlocking()) {
279            throw new IllegalBlockingModeException();
280        }
281    }
282
283    /**
284     * A readable channel that delegates to an input stream.
285     */
286    private static class InputStreamChannel extends AbstractInterruptibleChannel
287            implements ReadableByteChannel {
288        private final InputStream inputStream;
289
290        InputStreamChannel(InputStream inputStream) {
291            if (inputStream == null) {
292                throw new NullPointerException();
293            }
294            this.inputStream = inputStream;
295        }
296
297        public synchronized int read(ByteBuffer target) throws IOException {
298            if (!isOpen()) {
299                throw new ClosedChannelException();
300            }
301            int bytesRemain = target.remaining();
302            byte[] bytes = new byte[bytesRemain];
303            int readCount = 0;
304            try {
305                begin();
306                readCount = inputStream.read(bytes);
307            } finally {
308                end(readCount >= 0);
309            }
310            if (readCount > 0) {
311                target.put(bytes, 0, readCount);
312            }
313            return readCount;
314        }
315
316        @Override
317        protected void implCloseChannel() throws IOException {
318            inputStream.close();
319        }
320    }
321
322    /**
323     * A writable channel that delegates to an output stream.
324     */
325    private static class OutputStreamChannel extends AbstractInterruptibleChannel
326            implements WritableByteChannel {
327        private final OutputStream outputStream;
328
329        OutputStreamChannel(OutputStream outputStream) {
330            if (outputStream == null) {
331                throw new NullPointerException();
332            }
333            this.outputStream = outputStream;
334        }
335
336        public synchronized int write(ByteBuffer source) throws IOException {
337            if (!isOpen()) {
338                throw new ClosedChannelException();
339            }
340            int bytesRemain = source.remaining();
341            if (bytesRemain == 0) {
342                return 0;
343            }
344            byte[] buf = new byte[bytesRemain];
345            source.get(buf);
346            try {
347                begin();
348                outputStream.write(buf, 0, bytesRemain);
349            } finally {
350                end(bytesRemain >= 0);
351            }
352            return bytesRemain;
353        }
354
355        @Override
356        protected void implCloseChannel() throws IOException {
357            outputStream.close();
358        }
359    }
360}
361