Channels.java revision 1d857a6e70606d1cec7330edfde0e43111d8d7c1
1/*
2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package java.nio.channels;
27
28import java.io.FileInputStream;
29import java.io.FileOutputStream;
30import java.io.InputStream;
31import java.io.OutputStream;
32import java.io.Reader;
33import java.io.Writer;
34import java.io.IOException;
35import java.nio.ByteBuffer;
36import java.nio.charset.Charset;
37import java.nio.charset.CharsetDecoder;
38import java.nio.charset.CharsetEncoder;
39import java.nio.charset.UnsupportedCharsetException;
40import java.nio.channels.spi.AbstractInterruptibleChannel;
41import java.util.concurrent.ExecutionException;
42import sun.nio.ch.ChannelInputStream;
43import sun.nio.cs.StreamDecoder;
44import sun.nio.cs.StreamEncoder;
45
46
47/**
48 * Utility methods for channels and streams.
49 *
50 * <p> This class defines static methods that support the interoperation of the
51 * stream classes of the <tt>{@link java.io}</tt> package with the channel
52 * classes of this package.  </p>
53 *
54 *
55 * @author Mark Reinhold
56 * @author Mike McCloskey
57 * @author JSR-51 Expert Group
58 * @since 1.4
59 */
60
61public final class Channels {
62
63    private Channels() { }              // No instantiation
64
65    private static void checkNotNull(Object o, String name) {
66        if (o == null)
67            throw new NullPointerException("\"" + name + "\" is null!");
68    }
69
70    /**
71     * Write all remaining bytes in buffer to the given channel.
72     * If the channel is selectable then it must be configured blocking.
73     */
74    private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
75        throws IOException
76    {
77        while (bb.remaining() > 0) {
78            int n = ch.write(bb);
79            if (n <= 0)
80                throw new RuntimeException("no bytes written");
81        }
82    }
83
84    /**
85     * Write all remaining bytes in buffer to the given channel.
86     *
87     * @throws  IllegalBlockingException
88     *          If the channel is selectable and configured non-blocking.
89     */
90    private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
91        throws IOException
92    {
93        if (ch instanceof SelectableChannel) {
94            SelectableChannel sc = (SelectableChannel)ch;
95            synchronized (sc.blockingLock()) {
96                if (!sc.isBlocking())
97                    throw new IllegalBlockingModeException();
98                writeFullyImpl(ch, bb);
99            }
100        } else {
101            writeFullyImpl(ch, bb);
102        }
103    }
104
105    // -- Byte streams from channels --
106
107    /**
108     * Constructs a stream that reads bytes from the given channel.
109     *
110     * <p> The <tt>read</tt> methods of the resulting stream will throw an
111     * {@link IllegalBlockingModeException} if invoked while the underlying
112     * channel is in non-blocking mode.  The stream will not be buffered, and
113     * it will not support the {@link InputStream#mark mark} or {@link
114     * InputStream#reset reset} methods.  The stream will be safe for access by
115     * multiple concurrent threads.  Closing the stream will in turn cause the
116     * channel to be closed.  </p>
117     *
118     * @param  ch
119     *         The channel from which bytes will be read
120     *
121     * @return  A new input stream
122     */
123    public static InputStream newInputStream(ReadableByteChannel ch) {
124        checkNotNull(ch, "ch");
125        return new sun.nio.ch.ChannelInputStream(ch);
126    }
127
128    /**
129     * Constructs a stream that writes bytes to the given channel.
130     *
131     * <p> The <tt>write</tt> methods of the resulting stream will throw an
132     * {@link IllegalBlockingModeException} if invoked while the underlying
133     * channel is in non-blocking mode.  The stream will not be buffered.  The
134     * stream will be safe for access by multiple concurrent threads.  Closing
135     * the stream will in turn cause the channel to be closed.  </p>
136     *
137     * @param  ch
138     *         The channel to which bytes will be written
139     *
140     * @return  A new output stream
141     */
142    public static OutputStream newOutputStream(final WritableByteChannel ch) {
143        checkNotNull(ch, "ch");
144
145        return new OutputStream() {
146
147                private ByteBuffer bb = null;
148                private byte[] bs = null;       // Invoker's previous array
149                private byte[] b1 = null;
150
151                public synchronized void write(int b) throws IOException {
152                   if (b1 == null)
153                        b1 = new byte[1];
154                    b1[0] = (byte)b;
155                    this.write(b1);
156                }
157
158                public synchronized void write(byte[] bs, int off, int len)
159                    throws IOException
160                {
161                    if ((off < 0) || (off > bs.length) || (len < 0) ||
162                        ((off + len) > bs.length) || ((off + len) < 0)) {
163                        throw new IndexOutOfBoundsException();
164                    } else if (len == 0) {
165                        return;
166                    }
167                    ByteBuffer bb = ((this.bs == bs)
168                                     ? this.bb
169                                     : ByteBuffer.wrap(bs));
170                    bb.limit(Math.min(off + len, bb.capacity()));
171                    bb.position(off);
172                    this.bb = bb;
173                    this.bs = bs;
174                    Channels.writeFully(ch, bb);
175                }
176
177                public void close() throws IOException {
178                    ch.close();
179                }
180
181            };
182    }
183
184    /**
185     * Constructs a stream that reads bytes from the given channel.
186     *
187     * <p> The stream will not be buffered, and it will not support the {@link
188     * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
189     * stream will be safe for access by multiple concurrent threads.  Closing
190     * the stream will in turn cause the channel to be closed.  </p>
191     *
192     * @param  ch
193     *         The channel from which bytes will be read
194     *
195     * @return  A new input stream
196     *
197     * @since 1.7
198     */
199    public static InputStream newInputStream(final AsynchronousByteChannel ch) {
200        checkNotNull(ch, "ch");
201        return new InputStream() {
202
203            private ByteBuffer bb = null;
204            private byte[] bs = null;           // Invoker's previous array
205            private byte[] b1 = null;
206
207            @Override
208            public synchronized int read() throws IOException {
209                if (b1 == null)
210                    b1 = new byte[1];
211                int n = this.read(b1);
212                if (n == 1)
213                    return b1[0] & 0xff;
214                return -1;
215            }
216
217            @Override
218            public synchronized int read(byte[] bs, int off, int len)
219                throws IOException
220            {
221                if ((off < 0) || (off > bs.length) || (len < 0) ||
222                    ((off + len) > bs.length) || ((off + len) < 0)) {
223                    throw new IndexOutOfBoundsException();
224                } else if (len == 0)
225                    return 0;
226
227                ByteBuffer bb = ((this.bs == bs)
228                                 ? this.bb
229                                 : ByteBuffer.wrap(bs));
230                bb.position(off);
231                bb.limit(Math.min(off + len, bb.capacity()));
232                this.bb = bb;
233                this.bs = bs;
234
235                boolean interrupted = false;
236                try {
237                    for (;;) {
238                        try {
239                            return ch.read(bb).get();
240                        } catch (ExecutionException ee) {
241                            throw new IOException(ee.getCause());
242                        } catch (InterruptedException ie) {
243                            interrupted = true;
244                        }
245                    }
246                } finally {
247                    if (interrupted)
248                        Thread.currentThread().interrupt();
249                }
250            }
251
252            @Override
253            public void close() throws IOException {
254                ch.close();
255            }
256        };
257    }
258
259    /**
260     * Constructs a stream that writes bytes to the given channel.
261     *
262     * <p> The stream will not be buffered. The stream will be safe for access
263     * by multiple concurrent threads.  Closing the stream will in turn cause
264     * the channel to be closed.  </p>
265     *
266     * @param  ch
267     *         The channel to which bytes will be written
268     *
269     * @return  A new output stream
270     *
271     * @since 1.7
272     */
273    public static OutputStream newOutputStream(final AsynchronousByteChannel ch) {
274        checkNotNull(ch, "ch");
275        return new OutputStream() {
276
277            private ByteBuffer bb = null;
278            private byte[] bs = null;   // Invoker's previous array
279            private byte[] b1 = null;
280
281            @Override
282            public synchronized void write(int b) throws IOException {
283               if (b1 == null)
284                    b1 = new byte[1];
285                b1[0] = (byte)b;
286                this.write(b1);
287            }
288
289            @Override
290            public synchronized void write(byte[] bs, int off, int len)
291                throws IOException
292            {
293                if ((off < 0) || (off > bs.length) || (len < 0) ||
294                    ((off + len) > bs.length) || ((off + len) < 0)) {
295                    throw new IndexOutOfBoundsException();
296                } else if (len == 0) {
297                    return;
298                }
299                ByteBuffer bb = ((this.bs == bs)
300                                 ? this.bb
301                                 : ByteBuffer.wrap(bs));
302                bb.limit(Math.min(off + len, bb.capacity()));
303                bb.position(off);
304                this.bb = bb;
305                this.bs = bs;
306
307                boolean interrupted = false;
308                try {
309                    while (bb.remaining() > 0) {
310                        try {
311                            ch.write(bb).get();
312                        } catch (ExecutionException ee) {
313                            throw new IOException(ee.getCause());
314                        } catch (InterruptedException ie) {
315                            interrupted = true;
316                        }
317                    }
318                } finally {
319                    if (interrupted)
320                        Thread.currentThread().interrupt();
321                }
322            }
323
324            @Override
325            public void close() throws IOException {
326                ch.close();
327            }
328        };
329    }
330
331
332    // -- Channels from streams --
333
334    /**
335     * Constructs a channel that reads bytes from the given stream.
336     *
337     * <p> The resulting channel will not be buffered; it will simply redirect
338     * its I/O operations to the given stream.  Closing the channel will in
339     * turn cause the stream to be closed.  </p>
340     *
341     * @param  in
342     *         The stream from which bytes are to be read
343     *
344     * @return  A new readable byte channel
345     */
346    public static ReadableByteChannel newChannel(final InputStream in) {
347        checkNotNull(in, "in");
348
349        if (in instanceof FileInputStream &&
350            FileInputStream.class.equals(in.getClass())) {
351            return ((FileInputStream)in).getChannel();
352        }
353
354        return new ReadableByteChannelImpl(in);
355    }
356
357    private static class ReadableByteChannelImpl
358        extends AbstractInterruptibleChannel    // Not really interruptible
359        implements ReadableByteChannel
360    {
361        InputStream in;
362        private static final int TRANSFER_SIZE = 8192;
363        private byte buf[] = new byte[0];
364        private boolean open = true;
365        private Object readLock = new Object();
366
367        ReadableByteChannelImpl(InputStream in) {
368            this.in = in;
369        }
370
371        public int read(ByteBuffer dst) throws IOException {
372            int len = dst.remaining();
373            int totalRead = 0;
374            int bytesRead = 0;
375            synchronized (readLock) {
376                while (totalRead < len) {
377                    int bytesToRead = Math.min((len - totalRead),
378                                               TRANSFER_SIZE);
379                    if (buf.length < bytesToRead)
380                        buf = new byte[bytesToRead];
381                    if ((totalRead > 0) && !(in.available() > 0))
382                        break; // block at most once
383                    try {
384                        begin();
385                        bytesRead = in.read(buf, 0, bytesToRead);
386                    } finally {
387                        end(bytesRead > 0);
388                    }
389                    if (bytesRead < 0)
390                        break;
391                    else
392                        totalRead += bytesRead;
393                    dst.put(buf, 0, bytesRead);
394                }
395                if ((bytesRead < 0) && (totalRead == 0))
396                    return -1;
397
398                return totalRead;
399            }
400        }
401
402        protected void implCloseChannel() throws IOException {
403            in.close();
404            open = false;
405        }
406    }
407
408
409    /**
410     * Constructs a channel that writes bytes to the given stream.
411     *
412     * <p> The resulting channel will not be buffered; it will simply redirect
413     * its I/O operations to the given stream.  Closing the channel will in
414     * turn cause the stream to be closed.  </p>
415     *
416     * @param  out
417     *         The stream to which bytes are to be written
418     *
419     * @return  A new writable byte channel
420     */
421    public static WritableByteChannel newChannel(final OutputStream out) {
422        checkNotNull(out, "out");
423
424        /* ----- BEGIN android -----
425           if (out instanceof FileOutputStream &&
426           FileOutputStream.class.equals(out.getClass())) {
427           return ((FileOutputStream)out).getChannel();
428        }*/
429
430        return new WritableByteChannelImpl(out);
431    }
432
433    private static class WritableByteChannelImpl
434        extends AbstractInterruptibleChannel    // Not really interruptible
435        implements WritableByteChannel
436    {
437        OutputStream out;
438        private static final int TRANSFER_SIZE = 8192;
439        private byte buf[] = new byte[0];
440        private boolean open = true;
441        private Object writeLock = new Object();
442
443        WritableByteChannelImpl(OutputStream out) {
444            this.out = out;
445        }
446
447        public int write(ByteBuffer src) throws IOException {
448            int len = src.remaining();
449            int totalWritten = 0;
450            synchronized (writeLock) {
451                while (totalWritten < len) {
452                    int bytesToWrite = Math.min((len - totalWritten),
453                                                TRANSFER_SIZE);
454                    if (buf.length < bytesToWrite)
455                        buf = new byte[bytesToWrite];
456                    src.get(buf, 0, bytesToWrite);
457                    try {
458                        begin();
459                        out.write(buf, 0, bytesToWrite);
460                    } finally {
461                        end(bytesToWrite > 0);
462                    }
463                    totalWritten += bytesToWrite;
464                }
465                return totalWritten;
466            }
467        }
468
469        protected void implCloseChannel() throws IOException {
470            out.close();
471            open = false;
472        }
473    }
474
475
476    // -- Character streams from channels --
477
478    /**
479     * Constructs a reader that decodes bytes from the given channel using the
480     * given decoder.
481     *
482     * <p> The resulting stream will contain an internal input buffer of at
483     * least <tt>minBufferCap</tt> bytes.  The stream's <tt>read</tt> methods
484     * will, as needed, fill the buffer by reading bytes from the underlying
485     * channel; if the channel is in non-blocking mode when bytes are to be
486     * read then an {@link IllegalBlockingModeException} will be thrown.  The
487     * resulting stream will not otherwise be buffered, and it will not support
488     * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
489     * Closing the stream will in turn cause the channel to be closed.  </p>
490     *
491     * @param  ch
492     *         The channel from which bytes will be read
493     *
494     * @param  dec
495     *         The charset decoder to be used
496     *
497     * @param  minBufferCap
498     *         The minimum capacity of the internal byte buffer,
499     *         or <tt>-1</tt> if an implementation-dependent
500     *         default capacity is to be used
501     *
502     * @return  A new reader
503     */
504    public static Reader newReader(ReadableByteChannel ch,
505                                   CharsetDecoder dec,
506                                   int minBufferCap)
507    {
508        checkNotNull(ch, "ch");
509        return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
510    }
511
512    /**
513     * Constructs a reader that decodes bytes from the given channel according
514     * to the named charset.
515     *
516     * <p> An invocation of this method of the form
517     *
518     * <blockquote><pre>
519     * Channels.newReader(ch, csname)</pre></blockquote>
520     *
521     * behaves in exactly the same way as the expression
522     *
523     * <blockquote><pre>
524     * Channels.newReader(ch,
525     *                    Charset.forName(csName)
526     *                        .newDecoder(),
527     *                    -1);</pre></blockquote>
528     *
529     * @param  ch
530     *         The channel from which bytes will be read
531     *
532     * @param  csName
533     *         The name of the charset to be used
534     *
535     * @return  A new reader
536     *
537     * @throws  UnsupportedCharsetException
538     *          If no support for the named charset is available
539     *          in this instance of the Java virtual machine
540     */
541    public static Reader newReader(ReadableByteChannel ch,
542                                   String csName)
543    {
544        checkNotNull(csName, "csName");
545        return newReader(ch, Charset.forName(csName).newDecoder(), -1);
546    }
547
548    /**
549     * Constructs a writer that encodes characters using the given encoder and
550     * writes the resulting bytes to the given channel.
551     *
552     * <p> The resulting stream will contain an internal output buffer of at
553     * least <tt>minBufferCap</tt> bytes.  The stream's <tt>write</tt> methods
554     * will, as needed, flush the buffer by writing bytes to the underlying
555     * channel; if the channel is in non-blocking mode when bytes are to be
556     * written then an {@link IllegalBlockingModeException} will be thrown.
557     * The resulting stream will not otherwise be buffered.  Closing the stream
558     * will in turn cause the channel to be closed.  </p>
559     *
560     * @param  ch
561     *         The channel to which bytes will be written
562     *
563     * @param  enc
564     *         The charset encoder to be used
565     *
566     * @param  minBufferCap
567     *         The minimum capacity of the internal byte buffer,
568     *         or <tt>-1</tt> if an implementation-dependent
569     *         default capacity is to be used
570     *
571     * @return  A new writer
572     */
573    public static Writer newWriter(final WritableByteChannel ch,
574                                   final CharsetEncoder enc,
575                                   final int minBufferCap)
576    {
577        checkNotNull(ch, "ch");
578        return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
579    }
580
581    /**
582     * Constructs a writer that encodes characters according to the named
583     * charset and writes the resulting bytes to the given channel.
584     *
585     * <p> An invocation of this method of the form
586     *
587     * <blockquote><pre>
588     * Channels.newWriter(ch, csname)</pre></blockquote>
589     *
590     * behaves in exactly the same way as the expression
591     *
592     * <blockquote><pre>
593     * Channels.newWriter(ch,
594     *                    Charset.forName(csName)
595     *                        .newEncoder(),
596     *                    -1);</pre></blockquote>
597     *
598     * @param  ch
599     *         The channel to which bytes will be written
600     *
601     * @param  csName
602     *         The name of the charset to be used
603     *
604     * @return  A new writer
605     *
606     * @throws  UnsupportedCharsetException
607     *          If no support for the named charset is available
608     *          in this instance of the Java virtual machine
609     */
610    public static Writer newWriter(WritableByteChannel ch,
611                                   String csName)
612    {
613        checkNotNull(csName, "csName");
614        return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
615    }
616}
617