SocketChannelImpl.java revision 62e34a21db1cbef5d2cec186ee4f15c5ec39d187
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ConnectException;
25import java.net.InetAddress;
26import java.net.InetSocketAddress;
27import java.net.Socket;
28import java.net.SocketAddress;
29import java.net.SocketException;
30import java.net.SocketUtils;
31import java.net.UnknownHostException;
32import java.nio.channels.AlreadyConnectedException;
33import java.nio.channels.ClosedChannelException;
34import java.nio.channels.ConnectionPendingException;
35import java.nio.channels.IllegalBlockingModeException;
36import java.nio.channels.NoConnectionPendingException;
37import java.nio.channels.NotYetConnectedException;
38import java.nio.channels.SocketChannel;
39import java.nio.channels.UnresolvedAddressException;
40import java.nio.channels.UnsupportedAddressTypeException;
41import java.nio.channels.spi.SelectorProvider;
42import java.util.Arrays;
43import libcore.io.IoUtils;
44import org.apache.harmony.luni.net.PlainSocketImpl;
45import org.apache.harmony.luni.platform.FileDescriptorHandler;
46import org.apache.harmony.luni.platform.Platform;
47
48/*
49 * The default implementation class of java.nio.channels.SocketChannel.
50 */
51class SocketChannelImpl extends SocketChannel implements FileDescriptorHandler {
52
53    private static final int EOF = -1;
54
55    // Status un-init, not initialized.
56    static final int SOCKET_STATUS_UNINIT = EOF;
57
58    // Status before connect.
59    static final int SOCKET_STATUS_UNCONNECTED = 0;
60
61    // Status connection pending.
62    static final int SOCKET_STATUS_PENDING = 1;
63
64    // Status after connection success.
65    static final int SOCKET_STATUS_CONNECTED = 2;
66
67    // Status closed.
68    static final int SOCKET_STATUS_CLOSED = 3;
69
70    // The descriptor to interact with native code.
71    final FileDescriptor fd;
72
73    // Our internal Socket.
74    private SocketAdapter socket = null;
75
76    // The address to be connected.
77    InetSocketAddress connectAddress = null;
78
79    // Local address of the this socket (package private for adapter).
80    InetAddress localAddress = null;
81
82    // Local port number.
83    int localPort;
84
85    // At first, uninitialized.
86    int status = SOCKET_STATUS_UNINIT;
87
88    // Whether the socket is bound.
89    volatile boolean isBound = false;
90
91    private final Object readLock = new Object();
92
93    private final Object writeLock = new Object();
94
95    /*
96     * Constructor for creating a connected socket channel.
97     */
98    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
99        this(selectorProvider, true);
100    }
101
102    /*
103     * Constructor for creating an optionally connected socket channel.
104     */
105    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
106        super(selectorProvider);
107        fd = new FileDescriptor();
108        status = SOCKET_STATUS_UNCONNECTED;
109        if (connect) {
110            Platform.NETWORK.socket(fd, true);
111        }
112    }
113
114    /*
115     * Getting the internal Socket If we have not the socket, we create a new
116     * one.
117     */
118    @Override
119    synchronized public Socket socket() {
120        if (socket == null) {
121            try {
122                InetAddress addr = null;
123                int port = 0;
124                if (connectAddress != null) {
125                    addr = connectAddress.getAddress();
126                    port = connectAddress.getPort();
127                }
128                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
129            } catch (SocketException e) {
130                return null;
131            }
132        }
133        return socket;
134    }
135
136    @Override
137    synchronized public boolean isConnected() {
138        return status == SOCKET_STATUS_CONNECTED;
139    }
140
141    /*
142     * Status setting used by other class.
143     */
144    synchronized void setConnected() {
145        status = SOCKET_STATUS_CONNECTED;
146    }
147
148    void setBound(boolean flag) {
149        isBound = flag;
150    }
151
152    @Override
153    synchronized public boolean isConnectionPending() {
154        return status == SOCKET_STATUS_PENDING;
155    }
156
157    @Override
158    public boolean connect(SocketAddress socketAddress) throws IOException {
159        // status must be open and unconnected
160        checkUnconnected();
161
162        // check the address
163        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
164        InetAddress normalAddr = inetSocketAddress.getAddress();
165        int port = inetSocketAddress.getPort();
166
167        // When connecting, map ANY address to Localhost
168        if (normalAddr.isAnyLocalAddress()) {
169            normalAddr = InetAddress.getLocalHost();
170        }
171
172        // connect result
173        int result = EOF;
174        boolean finished = false;
175
176        try {
177            if (isBlocking()) {
178                begin();
179                Platform.NETWORK.connect(fd, normalAddr, port, 0);
180                finished = true; // Or we'd have thrown an exception.
181            } else {
182                finished = Platform.NETWORK.connectNonBlocking(fd, normalAddr, port);
183                // set back to nonblocking to work around with a bug in portlib
184                if (!isBlocking()) {
185                    IoUtils.setBlocking(fd, false);
186                }
187            }
188            isBound = finished;
189        } catch (IOException e) {
190            if (e instanceof ConnectException && !isBlocking()) {
191                status = SOCKET_STATUS_PENDING;
192            } else {
193                if (isOpen()) {
194                    close();
195                    finished = true;
196                }
197                throw e;
198            }
199        } finally {
200            if (isBlocking()) {
201                end(finished);
202            }
203        }
204
205        initLocalAddressAndPort();
206        connectAddress = inetSocketAddress;
207        if (socket != null) {
208            socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(),
209                    connectAddress.getPort());
210        }
211
212        synchronized (this) {
213            if (isBlocking()) {
214                status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED);
215            } else {
216                status = SOCKET_STATUS_PENDING;
217            }
218        }
219        return finished;
220    }
221
222    private void initLocalAddressAndPort() {
223        localAddress = Platform.NETWORK.getSocketLocalAddress(fd);
224        localPort = Platform.NETWORK.getSocketLocalPort(fd);
225        if (socket != null) {
226            socket.socketImpl().initLocalPort(localPort);
227        }
228    }
229
230    @Override
231    public boolean finishConnect() throws IOException {
232        // status check
233        synchronized (this) {
234            if (!isOpen()) {
235                throw new ClosedChannelException();
236            }
237            if (status == SOCKET_STATUS_CONNECTED) {
238                return true;
239            }
240            if (status != SOCKET_STATUS_PENDING) {
241                throw new NoConnectionPendingException();
242            }
243        }
244
245        boolean finished = false;
246        try {
247            begin();
248            final int WAIT_FOREVER = -1;
249            final int POLL = 0;
250            finished = Platform.NETWORK.isConnected(fd, isBlocking() ? WAIT_FOREVER : POLL);
251            isBound = finished;
252            initLocalAddressAndPort();
253        } catch (ConnectException e) {
254            if (isOpen()) {
255                close();
256                finished = true;
257            }
258            throw e;
259        } finally {
260            end(finished);
261        }
262
263        synchronized (this) {
264            status = (finished ? SOCKET_STATUS_CONNECTED : status);
265            isBound = finished;
266            // TPE: Workaround for bug that turns socket back to blocking
267            if (!isBlocking()) implConfigureBlocking(false);
268        }
269        return finished;
270    }
271
272    void finishAccept() {
273        initLocalAddressAndPort();
274    }
275
276    @Override
277    public int read(ByteBuffer target) throws IOException {
278        FileChannelImpl.checkWritable(target);
279        checkOpenConnected();
280        if (!target.hasRemaining()) {
281            return 0;
282        }
283
284        int readCount;
285        if (target.isDirect() || target.hasArray()) {
286            readCount = readImpl(target);
287            if (readCount > 0) {
288                target.position(target.position() + readCount);
289            }
290        } else {
291            ByteBuffer readBuffer = null;
292            byte[] readArray = null;
293            readArray = new byte[target.remaining()];
294            readBuffer = ByteBuffer.wrap(readArray);
295            readCount = readImpl(readBuffer);
296            if (readCount > 0) {
297                target.put(readArray, 0, readCount);
298            }
299        }
300        return readCount;
301    }
302
303    @Override
304    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
305        Arrays.checkOffsetAndCount(targets.length, offset, length);
306        checkOpenConnected();
307        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
308        if (totalCount == 0) {
309            return 0;
310        }
311        byte[] readArray = new byte[totalCount];
312        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
313        int readCount;
314        // read data to readBuffer, and then transfer data from readBuffer to
315        // targets.
316        readCount = readImpl(readBuffer);
317        if (readCount > 0) {
318            int left = readCount;
319            int index = offset;
320            // transfer data from readArray to targets
321            while (left > 0) {
322                int putLength = Math.min(targets[index].remaining(), left);
323                targets[index].put(readArray, readCount - left, putLength);
324                index++;
325                left -= putLength;
326            }
327        }
328        return readCount;
329    }
330
331    /**
332     * Read from channel, and store the result in the target.
333     *
334     * @param target
335     *            output parameter
336     */
337    private int readImpl(ByteBuffer target) throws IOException {
338        synchronized (readLock) {
339            int readCount = 0;
340            try {
341                if (isBlocking()) {
342                    begin();
343                }
344                int offset = target.position();
345                int length = target.remaining();
346                if (target.isDirect()) {
347                    int address = NioUtils.getDirectBufferAddress(target);
348                    readCount = Platform.NETWORK.readDirect(fd, address + offset, length);
349                } else {
350                    // target is assured to have array.
351                    byte[] array = target.array();
352                    offset += target.arrayOffset();
353                    readCount = Platform.NETWORK.read(fd, array, offset, length);
354                }
355                return readCount;
356            } finally {
357                if (isBlocking()) {
358                    end(readCount > 0);
359                }
360            }
361        }
362    }
363
364    @Override
365    public int write(ByteBuffer source) throws IOException {
366        if (source == null) {
367            throw new NullPointerException();
368        }
369        checkOpenConnected();
370        if (!source.hasRemaining()) {
371            return 0;
372        }
373        return writeImpl(source);
374    }
375
376    @Override
377    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
378        Arrays.checkOffsetAndCount(sources.length, offset, length);
379        checkOpenConnected();
380        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
381        if (count == 0) {
382            return 0;
383        }
384        ByteBuffer writeBuf = ByteBuffer.allocate(count);
385        for (int val = offset; val < length + offset; val++) {
386            ByteBuffer source = sources[val];
387            int oldPosition = source.position();
388            writeBuf.put(source);
389            source.position(oldPosition);
390        }
391        writeBuf.flip();
392        int result = writeImpl(writeBuf);
393        int val = offset;
394        int written = result;
395        while (result > 0) {
396            ByteBuffer source = sources[val];
397            int gap = Math.min(result, source.remaining());
398            source.position(source.position() + gap);
399            val++;
400            result -= gap;
401        }
402        return written;
403    }
404
405    /*
406     * Write the source. return the count of bytes written.
407     */
408    private int writeImpl(ByteBuffer source) throws IOException {
409        synchronized (writeLock) {
410            if (!source.hasRemaining()) {
411                return 0;
412            }
413            int writeCount = 0;
414            try {
415                int pos = source.position();
416                int length = source.remaining();
417                if (isBlocking()) {
418                    begin();
419                }
420                if (source.isDirect()) {
421                    int address = NioUtils.getDirectBufferAddress(source);
422                    writeCount = Platform.NETWORK.writeDirect(fd, address, pos, length);
423                } else if (source.hasArray()) {
424                    pos += source.arrayOffset();
425                    writeCount = Platform.NETWORK.write(fd, source.array(), pos, length);
426                } else {
427                    byte[] array = new byte[length];
428                    source.get(array);
429                    writeCount = Platform.NETWORK.write(fd, array, 0, length);
430                }
431                source.position(pos + writeCount);
432            } finally {
433                if (isBlocking()) {
434                    end(writeCount >= 0);
435                }
436            }
437            return writeCount;
438        }
439    }
440
441    /*
442     * Status check, open and "connected", when read and write.
443     */
444    synchronized private void checkOpenConnected() throws ClosedChannelException {
445        if (!isOpen()) {
446            throw new ClosedChannelException();
447        }
448        if (!isConnected()) {
449            throw new NotYetConnectedException();
450        }
451    }
452
453    /*
454     * Status check, open and "unconnected", before connection.
455     */
456    synchronized private void checkUnconnected() throws IOException {
457        if (!isOpen()) {
458            throw new ClosedChannelException();
459        }
460        if (status == SOCKET_STATUS_CONNECTED) {
461            throw new AlreadyConnectedException();
462        }
463        if (status == SOCKET_STATUS_PENDING) {
464            throw new ConnectionPendingException();
465        }
466    }
467
468    /*
469     * Shared by this class and DatagramChannelImpl, to do the address transfer
470     * and check.
471     */
472    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
473        if (socketAddress == null) {
474            throw new IllegalArgumentException();
475        }
476        if (!(socketAddress instanceof InetSocketAddress)) {
477            throw new UnsupportedAddressTypeException();
478        }
479        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
480        if (inetSocketAddress.isUnresolved()) {
481            throw new UnresolvedAddressException();
482        }
483        return inetSocketAddress;
484    }
485
486    /*
487     * Get local address.
488     */
489    public InetAddress getLocalAddress() throws UnknownHostException {
490        if (!isBound) {
491            byte[] any_bytes = { 0, 0, 0, 0 };
492            return InetAddress.getByAddress(any_bytes);
493        }
494        return localAddress;
495    }
496
497    /*
498     * Do really closing action here.
499     */
500    @Override
501    protected synchronized void implCloseSelectableChannel() throws IOException {
502        if (status != SOCKET_STATUS_CLOSED) {
503            status = SOCKET_STATUS_CLOSED;
504            if (socket != null && !socket.isClosed()) {
505                socket.close();
506            } else {
507                Platform.NETWORK.close(fd);
508            }
509        }
510    }
511
512    @Override
513    protected void implConfigureBlocking(boolean blockMode) throws IOException {
514        synchronized (blockingLock()) {
515            IoUtils.setBlocking(fd, blockMode);
516        }
517    }
518
519    /*
520     * Get the fd.
521     */
522    public FileDescriptor getFD() {
523        return fd;
524    }
525
526    /*
527     * Adapter classes for internal socket.
528     */
529    private static class SocketAdapter extends Socket {
530        private final SocketChannelImpl channel;
531        private final PlainSocketImpl socketImpl;
532
533        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException {
534            super(socketImpl);
535            this.socketImpl = socketImpl;
536            this.channel = channel;
537            SocketUtils.setCreated(this);
538        }
539
540        PlainSocketImpl socketImpl() {
541            return socketImpl;
542        }
543
544        @Override
545        public SocketChannel getChannel() {
546            return channel;
547        }
548
549        @Override
550        public boolean isBound() {
551            return channel.isBound;
552        }
553
554        @Override
555        public boolean isConnected() {
556            return channel.isConnected();
557        }
558
559        @Override
560        public InetAddress getLocalAddress() {
561            try {
562                return channel.getLocalAddress();
563            } catch (UnknownHostException e) {
564                return null;
565            }
566        }
567
568        @Override
569        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
570            if (!channel.isBlocking()) {
571                throw new IllegalBlockingModeException();
572            }
573            if (isConnected()) {
574                throw new AlreadyConnectedException();
575            }
576            super.connect(remoteAddr, timeout);
577            channel.initLocalAddressAndPort();
578            if (super.isConnected()) {
579                channel.setConnected();
580                channel.isBound = super.isBound();
581            }
582        }
583
584        @Override
585        public void bind(SocketAddress localAddr) throws IOException {
586            if (channel.isConnected()) {
587                throw new AlreadyConnectedException();
588            }
589            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
590                throw new ConnectionPendingException();
591            }
592            super.bind(localAddr);
593            // keep here to see if need next version
594            // channel.Address = getLocalSocketAddress();
595            // channel.localport = getLocalPort();
596            channel.isBound = true;
597        }
598
599        @Override
600        public void close() throws IOException {
601            synchronized (channel) {
602                if (channel.isOpen()) {
603                    channel.close();
604                } else {
605                    super.close();
606                }
607                channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED;
608            }
609        }
610
611        @Override
612        public OutputStream getOutputStream() throws IOException {
613            checkOpenAndConnected();
614            if (isOutputShutdown()) {
615                throw new SocketException("Socket output is shutdown");
616            }
617            return new SocketChannelOutputStream(channel);
618        }
619
620        @Override
621        public InputStream getInputStream() throws IOException {
622            checkOpenAndConnected();
623            if (isInputShutdown()) {
624                throw new SocketException("Socket input is shutdown");
625            }
626            return new SocketChannelInputStream(channel);
627        }
628
629        private void checkOpenAndConnected() throws SocketException {
630            if (!channel.isOpen()) {
631                throw new SocketException("Socket is closed");
632            }
633            if (!channel.isConnected()) {
634                throw new SocketException("Socket is not connected");
635            }
636        }
637    }
638
639    /*
640     * This output stream delegates all operations to the associated channel.
641     * Throws an IllegalBlockingModeException if the channel is in non-blocking
642     * mode when performing write operations.
643     */
644    private static class SocketChannelOutputStream extends OutputStream {
645        private final SocketChannel channel;
646
647        public SocketChannelOutputStream(SocketChannel channel) {
648            this.channel = channel;
649        }
650
651        /*
652         * Closes this stream and channel.
653         *
654         * @exception IOException thrown if an error occurs during the close
655         */
656        @Override
657        public void close() throws IOException {
658            channel.close();
659        }
660
661        @Override
662        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
663            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
664            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
665            if (!channel.isBlocking()) {
666                throw new IllegalBlockingModeException();
667            }
668            channel.write(buf);
669        }
670
671        @Override
672        public void write(int oneByte) throws IOException {
673            if (!channel.isBlocking()) {
674                throw new IllegalBlockingModeException();
675            }
676            ByteBuffer buffer = ByteBuffer.allocate(1);
677            buffer.put(0, (byte) (oneByte & 0xFF));
678            channel.write(buffer);
679        }
680    }
681
682    /*
683     * This input stream delegates all operations to the associated channel.
684     * Throws an IllegalBlockingModeException if the channel is in non-blocking
685     * mode when performing read operations.
686     */
687    private static class SocketChannelInputStream extends InputStream {
688        private final SocketChannel channel;
689
690        public SocketChannelInputStream(SocketChannel channel) {
691            this.channel = channel;
692        }
693
694        /*
695         * Closes this stream and channel.
696         */
697        @Override
698        public void close() throws IOException {
699            channel.close();
700        }
701
702        @Override
703        public int read() throws IOException {
704            if (!channel.isBlocking()) {
705                throw new IllegalBlockingModeException();
706            }
707            ByteBuffer buf = ByteBuffer.allocate(1);
708            int result = channel.read(buf);
709            return (result == -1) ? result : (buf.get(0) & 0xff);
710        }
711
712        @Override
713        public int read(byte[] buffer, int offset, int byteCount) throws IOException {
714            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
715            if (!channel.isBlocking()) {
716                throw new IllegalBlockingModeException();
717            }
718            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
719            return channel.read(buf);
720        }
721    }
722}
723