Channels.java revision 2c87ad3a45cecf9e344487cad1abfdebe79f2c7c
1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This code is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 only, as
8 * published by the Free Software Foundation.  Oracle designates this
9 * particular file as subject to the "Classpath" exception as provided
10 * by Oracle in the LICENSE file that accompanied this code.
11 *
12 * This code is distributed in the hope that it will be useful, but WITHOUT
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 * version 2 for more details (a copy is included in the LICENSE file that
16 * accompanied this code).
17 *
18 * You should have received a copy of the GNU General Public License version
19 * 2 along with this work; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21 *
22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23 * or visit www.oracle.com if you need additional information or have any
24 * questions.
25 */
26
27package java.nio.channels;
28
29import java.io.FileInputStream;
30import java.io.FileOutputStream;
31import java.io.InputStream;
32import java.io.OutputStream;
33import java.io.Reader;
34import java.io.Writer;
35import java.io.IOException;
36import java.nio.ByteBuffer;
37import java.nio.charset.Charset;
38import java.nio.charset.CharsetDecoder;
39import java.nio.charset.CharsetEncoder;
40import java.nio.charset.UnsupportedCharsetException;
41import java.nio.channels.spi.AbstractInterruptibleChannel;
42import java.util.concurrent.ExecutionException;
43import sun.nio.ch.ChannelInputStream;
44import sun.nio.cs.StreamDecoder;
45import sun.nio.cs.StreamEncoder;
46
47
48/**
49 * Utility methods for channels and streams.
50 *
51 * <p> This class defines static methods that support the interoperation of the
52 * stream classes of the <tt>{@link java.io}</tt> package with the channel
53 * classes of this package.  </p>
54 *
55 *
56 * @author Mark Reinhold
57 * @author Mike McCloskey
58 * @author JSR-51 Expert Group
59 * @since 1.4
60 */
61
62public final class Channels {
63
64    private Channels() { }              // No instantiation
65
66    private static void checkNotNull(Object o, String name) {
67        if (o == null)
68            throw new NullPointerException("\"" + name + "\" is null!");
69    }
70
71    /**
72     * Write all remaining bytes in buffer to the given channel.
73     * If the channel is selectable then it must be configured blocking.
74     */
75    private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
76        throws IOException
77    {
78        while (bb.remaining() > 0) {
79            int n = ch.write(bb);
80            if (n <= 0)
81                throw new RuntimeException("no bytes written");
82        }
83    }
84
85    /**
86     * Write all remaining bytes in buffer to the given channel.
87     *
88     * @throws  IllegalBlockingException
89     *          If the channel is selectable and configured non-blocking.
90     */
91    private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
92        throws IOException
93    {
94        if (ch instanceof SelectableChannel) {
95            SelectableChannel sc = (SelectableChannel)ch;
96            synchronized (sc.blockingLock()) {
97                if (!sc.isBlocking())
98                    throw new IllegalBlockingModeException();
99                writeFullyImpl(ch, bb);
100            }
101        } else {
102            writeFullyImpl(ch, bb);
103        }
104    }
105
106    // -- Byte streams from channels --
107
108    /**
109     * Constructs a stream that reads bytes from the given channel.
110     *
111     * <p> The <tt>read</tt> methods of the resulting stream will throw an
112     * {@link IllegalBlockingModeException} if invoked while the underlying
113     * channel is in non-blocking mode.  The stream will not be buffered, and
114     * it will not support the {@link InputStream#mark mark} or {@link
115     * InputStream#reset reset} methods.  The stream will be safe for access by
116     * multiple concurrent threads.  Closing the stream will in turn cause the
117     * channel to be closed.  </p>
118     *
119     * @param  ch
120     *         The channel from which bytes will be read
121     *
122     * @return  A new input stream
123     */
124    public static InputStream newInputStream(ReadableByteChannel ch) {
125        checkNotNull(ch, "ch");
126        return new sun.nio.ch.ChannelInputStream(ch);
127    }
128
129    /**
130     * Constructs a stream that writes bytes to the given channel.
131     *
132     * <p> The <tt>write</tt> methods of the resulting stream will throw an
133     * {@link IllegalBlockingModeException} if invoked while the underlying
134     * channel is in non-blocking mode.  The stream will not be buffered.  The
135     * stream will be safe for access by multiple concurrent threads.  Closing
136     * the stream will in turn cause the channel to be closed.  </p>
137     *
138     * @param  ch
139     *         The channel to which bytes will be written
140     *
141     * @return  A new output stream
142     */
143    public static OutputStream newOutputStream(final WritableByteChannel ch) {
144        checkNotNull(ch, "ch");
145
146        return new OutputStream() {
147
148                private ByteBuffer bb = null;
149                private byte[] bs = null;       // Invoker's previous array
150                private byte[] b1 = null;
151
152                public synchronized void write(int b) throws IOException {
153                   if (b1 == null)
154                        b1 = new byte[1];
155                    b1[0] = (byte)b;
156                    this.write(b1);
157                }
158
159                public synchronized void write(byte[] bs, int off, int len)
160                    throws IOException
161                {
162                    if ((off < 0) || (off > bs.length) || (len < 0) ||
163                        ((off + len) > bs.length) || ((off + len) < 0)) {
164                        throw new IndexOutOfBoundsException();
165                    } else if (len == 0) {
166                        return;
167                    }
168                    ByteBuffer bb = ((this.bs == bs)
169                                     ? this.bb
170                                     : ByteBuffer.wrap(bs));
171                    bb.limit(Math.min(off + len, bb.capacity()));
172                    bb.position(off);
173                    this.bb = bb;
174                    this.bs = bs;
175                    Channels.writeFully(ch, bb);
176                }
177
178                public void close() throws IOException {
179                    ch.close();
180                }
181
182            };
183    }
184
185    /**
186     * Constructs a stream that reads bytes from the given channel.
187     *
188     * <p> The stream will not be buffered, and it will not support the {@link
189     * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
190     * stream will be safe for access by multiple concurrent threads.  Closing
191     * the stream will in turn cause the channel to be closed.  </p>
192     *
193     * @param  ch
194     *         The channel from which bytes will be read
195     *
196     * @return  A new input stream
197     *
198     * @since 1.7
199     */
200    public static InputStream newInputStream(final AsynchronousByteChannel ch) {
201        checkNotNull(ch, "ch");
202        return new InputStream() {
203
204            private ByteBuffer bb = null;
205            private byte[] bs = null;           // Invoker's previous array
206            private byte[] b1 = null;
207
208            @Override
209            public synchronized int read() throws IOException {
210                if (b1 == null)
211                    b1 = new byte[1];
212                int n = this.read(b1);
213                if (n == 1)
214                    return b1[0] & 0xff;
215                return -1;
216            }
217
218            @Override
219            public synchronized int read(byte[] bs, int off, int len)
220                throws IOException
221            {
222                if ((off < 0) || (off > bs.length) || (len < 0) ||
223                    ((off + len) > bs.length) || ((off + len) < 0)) {
224                    throw new IndexOutOfBoundsException();
225                } else if (len == 0)
226                    return 0;
227
228                ByteBuffer bb = ((this.bs == bs)
229                                 ? this.bb
230                                 : ByteBuffer.wrap(bs));
231                bb.position(off);
232                bb.limit(Math.min(off + len, bb.capacity()));
233                this.bb = bb;
234                this.bs = bs;
235
236                boolean interrupted = false;
237                try {
238                    for (;;) {
239                        try {
240                            return ch.read(bb).get();
241                        } catch (ExecutionException ee) {
242                            throw new IOException(ee.getCause());
243                        } catch (InterruptedException ie) {
244                            interrupted = true;
245                        }
246                    }
247                } finally {
248                    if (interrupted)
249                        Thread.currentThread().interrupt();
250                }
251            }
252
253            @Override
254            public void close() throws IOException {
255                ch.close();
256            }
257        };
258    }
259
260    /**
261     * Constructs a stream that writes bytes to the given channel.
262     *
263     * <p> The stream will not be buffered. The stream will be safe for access
264     * by multiple concurrent threads.  Closing the stream will in turn cause
265     * the channel to be closed.  </p>
266     *
267     * @param  ch
268     *         The channel to which bytes will be written
269     *
270     * @return  A new output stream
271     *
272     * @since 1.7
273     */
274    public static OutputStream newOutputStream(final AsynchronousByteChannel ch) {
275        checkNotNull(ch, "ch");
276        return new OutputStream() {
277
278            private ByteBuffer bb = null;
279            private byte[] bs = null;   // Invoker's previous array
280            private byte[] b1 = null;
281
282            @Override
283            public synchronized void write(int b) throws IOException {
284               if (b1 == null)
285                    b1 = new byte[1];
286                b1[0] = (byte)b;
287                this.write(b1);
288            }
289
290            @Override
291            public synchronized void write(byte[] bs, int off, int len)
292                throws IOException
293            {
294                if ((off < 0) || (off > bs.length) || (len < 0) ||
295                    ((off + len) > bs.length) || ((off + len) < 0)) {
296                    throw new IndexOutOfBoundsException();
297                } else if (len == 0) {
298                    return;
299                }
300                ByteBuffer bb = ((this.bs == bs)
301                                 ? this.bb
302                                 : ByteBuffer.wrap(bs));
303                bb.limit(Math.min(off + len, bb.capacity()));
304                bb.position(off);
305                this.bb = bb;
306                this.bs = bs;
307
308                boolean interrupted = false;
309                try {
310                    while (bb.remaining() > 0) {
311                        try {
312                            ch.write(bb).get();
313                        } catch (ExecutionException ee) {
314                            throw new IOException(ee.getCause());
315                        } catch (InterruptedException ie) {
316                            interrupted = true;
317                        }
318                    }
319                } finally {
320                    if (interrupted)
321                        Thread.currentThread().interrupt();
322                }
323            }
324
325            @Override
326            public void close() throws IOException {
327                ch.close();
328            }
329        };
330    }
331
332
333    // -- Channels from streams --
334
335    /**
336     * Constructs a channel that reads bytes from the given stream.
337     *
338     * <p> The resulting channel will not be buffered; it will simply redirect
339     * its I/O operations to the given stream.  Closing the channel will in
340     * turn cause the stream to be closed.  </p>
341     *
342     * @param  in
343     *         The stream from which bytes are to be read
344     *
345     * @return  A new readable byte channel
346     */
347    public static ReadableByteChannel newChannel(final InputStream in) {
348        checkNotNull(in, "in");
349
350        if (in instanceof FileInputStream &&
351            FileInputStream.class.equals(in.getClass())) {
352            return ((FileInputStream)in).getChannel();
353        }
354
355        return new ReadableByteChannelImpl(in);
356    }
357
358    private static class ReadableByteChannelImpl
359        extends AbstractInterruptibleChannel    // Not really interruptible
360        implements ReadableByteChannel
361    {
362        InputStream in;
363        private static final int TRANSFER_SIZE = 8192;
364        private byte buf[] = new byte[0];
365        private boolean open = true;
366        private Object readLock = new Object();
367
368        ReadableByteChannelImpl(InputStream in) {
369            this.in = in;
370        }
371
372        public int read(ByteBuffer dst) throws IOException {
373            int len = dst.remaining();
374            int totalRead = 0;
375            int bytesRead = 0;
376            synchronized (readLock) {
377                while (totalRead < len) {
378                    int bytesToRead = Math.min((len - totalRead),
379                                               TRANSFER_SIZE);
380                    if (buf.length < bytesToRead)
381                        buf = new byte[bytesToRead];
382                    if ((totalRead > 0) && !(in.available() > 0))
383                        break; // block at most once
384                    try {
385                        begin();
386                        bytesRead = in.read(buf, 0, bytesToRead);
387                    } finally {
388                        end(bytesRead > 0);
389                    }
390                    if (bytesRead < 0)
391                        break;
392                    else
393                        totalRead += bytesRead;
394                    dst.put(buf, 0, bytesRead);
395                }
396                if ((bytesRead < 0) && (totalRead == 0))
397                    return -1;
398
399                return totalRead;
400            }
401        }
402
403        protected void implCloseChannel() throws IOException {
404            in.close();
405            open = false;
406        }
407    }
408
409
410    /**
411     * Constructs a channel that writes bytes to the given stream.
412     *
413     * <p> The resulting channel will not be buffered; it will simply redirect
414     * its I/O operations to the given stream.  Closing the channel will in
415     * turn cause the stream to be closed.  </p>
416     *
417     * @param  out
418     *         The stream to which bytes are to be written
419     *
420     * @return  A new writable byte channel
421     */
422    public static WritableByteChannel newChannel(final OutputStream out) {
423        checkNotNull(out, "out");
424
425        /* ----- BEGIN android -----
426           if (out instanceof FileOutputStream &&
427           FileOutputStream.class.equals(out.getClass())) {
428           return ((FileOutputStream)out).getChannel();
429        }*/
430
431        return new WritableByteChannelImpl(out);
432    }
433
434    private static class WritableByteChannelImpl
435        extends AbstractInterruptibleChannel    // Not really interruptible
436        implements WritableByteChannel
437    {
438        OutputStream out;
439        private static final int TRANSFER_SIZE = 8192;
440        private byte buf[] = new byte[0];
441        private boolean open = true;
442        private Object writeLock = new Object();
443
444        WritableByteChannelImpl(OutputStream out) {
445            this.out = out;
446        }
447
448        public int write(ByteBuffer src) throws IOException {
449            int len = src.remaining();
450            int totalWritten = 0;
451            synchronized (writeLock) {
452                while (totalWritten < len) {
453                    int bytesToWrite = Math.min((len - totalWritten),
454                                                TRANSFER_SIZE);
455                    if (buf.length < bytesToWrite)
456                        buf = new byte[bytesToWrite];
457                    src.get(buf, 0, bytesToWrite);
458                    try {
459                        begin();
460                        out.write(buf, 0, bytesToWrite);
461                    } finally {
462                        end(bytesToWrite > 0);
463                    }
464                    totalWritten += bytesToWrite;
465                }
466                return totalWritten;
467            }
468        }
469
470        protected void implCloseChannel() throws IOException {
471            out.close();
472            open = false;
473        }
474    }
475
476
477    // -- Character streams from channels --
478
479    /**
480     * Constructs a reader that decodes bytes from the given channel using the
481     * given decoder.
482     *
483     * <p> The resulting stream will contain an internal input buffer of at
484     * least <tt>minBufferCap</tt> bytes.  The stream's <tt>read</tt> methods
485     * will, as needed, fill the buffer by reading bytes from the underlying
486     * channel; if the channel is in non-blocking mode when bytes are to be
487     * read then an {@link IllegalBlockingModeException} will be thrown.  The
488     * resulting stream will not otherwise be buffered, and it will not support
489     * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
490     * Closing the stream will in turn cause the channel to be closed.  </p>
491     *
492     * @param  ch
493     *         The channel from which bytes will be read
494     *
495     * @param  dec
496     *         The charset decoder to be used
497     *
498     * @param  minBufferCap
499     *         The minimum capacity of the internal byte buffer,
500     *         or <tt>-1</tt> if an implementation-dependent
501     *         default capacity is to be used
502     *
503     * @return  A new reader
504     */
505    public static Reader newReader(ReadableByteChannel ch,
506                                   CharsetDecoder dec,
507                                   int minBufferCap)
508    {
509        checkNotNull(ch, "ch");
510        return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
511    }
512
513    /**
514     * Constructs a reader that decodes bytes from the given channel according
515     * to the named charset.
516     *
517     * <p> An invocation of this method of the form
518     *
519     * <blockquote><pre>
520     * Channels.newReader(ch, csname)</pre></blockquote>
521     *
522     * behaves in exactly the same way as the expression
523     *
524     * <blockquote><pre>
525     * Channels.newReader(ch,
526     *                    Charset.forName(csName)
527     *                        .newDecoder(),
528     *                    -1);</pre></blockquote>
529     *
530     * @param  ch
531     *         The channel from which bytes will be read
532     *
533     * @param  csName
534     *         The name of the charset to be used
535     *
536     * @return  A new reader
537     *
538     * @throws  UnsupportedCharsetException
539     *          If no support for the named charset is available
540     *          in this instance of the Java virtual machine
541     */
542    public static Reader newReader(ReadableByteChannel ch,
543                                   String csName)
544    {
545        checkNotNull(csName, "csName");
546        return newReader(ch, Charset.forName(csName).newDecoder(), -1);
547    }
548
549    /**
550     * Constructs a writer that encodes characters using the given encoder and
551     * writes the resulting bytes to the given channel.
552     *
553     * <p> The resulting stream will contain an internal output buffer of at
554     * least <tt>minBufferCap</tt> bytes.  The stream's <tt>write</tt> methods
555     * will, as needed, flush the buffer by writing bytes to the underlying
556     * channel; if the channel is in non-blocking mode when bytes are to be
557     * written then an {@link IllegalBlockingModeException} will be thrown.
558     * The resulting stream will not otherwise be buffered.  Closing the stream
559     * will in turn cause the channel to be closed.  </p>
560     *
561     * @param  ch
562     *         The channel to which bytes will be written
563     *
564     * @param  enc
565     *         The charset encoder to be used
566     *
567     * @param  minBufferCap
568     *         The minimum capacity of the internal byte buffer,
569     *         or <tt>-1</tt> if an implementation-dependent
570     *         default capacity is to be used
571     *
572     * @return  A new writer
573     */
574    public static Writer newWriter(final WritableByteChannel ch,
575                                   final CharsetEncoder enc,
576                                   final int minBufferCap)
577    {
578        checkNotNull(ch, "ch");
579        return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
580    }
581
582    /**
583     * Constructs a writer that encodes characters according to the named
584     * charset and writes the resulting bytes to the given channel.
585     *
586     * <p> An invocation of this method of the form
587     *
588     * <blockquote><pre>
589     * Channels.newWriter(ch, csname)</pre></blockquote>
590     *
591     * behaves in exactly the same way as the expression
592     *
593     * <blockquote><pre>
594     * Channels.newWriter(ch,
595     *                    Charset.forName(csName)
596     *                        .newEncoder(),
597     *                    -1);</pre></blockquote>
598     *
599     * @param  ch
600     *         The channel to which bytes will be written
601     *
602     * @param  csName
603     *         The name of the charset to be used
604     *
605     * @return  A new writer
606     *
607     * @throws  UnsupportedCharsetException
608     *          If no support for the named charset is available
609     *          in this instance of the Java virtual machine
610     */
611    public static Writer newWriter(WritableByteChannel ch,
612                                   String csName)
613    {
614        checkNotNull(csName, "csName");
615        return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
616    }
617}
618