SocketChannelImpl.java revision 47ae0b5a1d96c8030e0963ccc5b44c3ee66aaec3
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.FilterInputStream;
22import java.io.FilterOutputStream;
23import java.io.IOException;
24import java.io.InputStream;
25import java.io.OutputStream;
26import java.net.ConnectException;
27import java.net.Inet4Address;
28import java.net.InetAddress;
29import java.net.InetSocketAddress;
30import java.net.PlainSocketImpl;
31import java.net.Socket;
32import java.net.SocketAddress;
33import java.net.SocketException;
34import java.net.SocketUtils;
35import java.nio.channels.AlreadyBoundException;
36import java.nio.channels.AlreadyConnectedException;
37import java.nio.channels.ClosedChannelException;
38import java.nio.channels.ConnectionPendingException;
39import java.nio.channels.IllegalBlockingModeException;
40import java.nio.channels.NoConnectionPendingException;
41import java.nio.channels.NotYetConnectedException;
42import java.nio.channels.SocketChannel;
43import java.nio.channels.UnresolvedAddressException;
44import java.nio.channels.UnsupportedAddressTypeException;
45import java.nio.channels.spi.SelectorProvider;
46import java.util.Arrays;
47import libcore.io.ErrnoException;
48import libcore.io.Libcore;
49import libcore.io.IoBridge;
50import libcore.io.IoUtils;
51import static libcore.io.OsConstants.*;
52
53/*
54 * The default implementation class of java.nio.channels.SocketChannel.
55 */
56class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel {
57    private static final int SOCKET_STATUS_UNINITIALIZED = -1;
58
59    // Status before connect.
60    private static final int SOCKET_STATUS_UNCONNECTED = 0;
61
62    // Status connection pending.
63    private static final int SOCKET_STATUS_PENDING = 1;
64
65    // Status after connection success.
66    private static final int SOCKET_STATUS_CONNECTED = 2;
67
68    // Status closed.
69    private static final int SOCKET_STATUS_CLOSED = 3;
70
71    private final FileDescriptor fd;
72
73    // Our internal Socket.
74    private SocketAdapter socket = null;
75
76    // The address to be connected.
77    private InetSocketAddress connectAddress = null;
78
79    // The local address the socket is bound to.
80    private InetAddress localAddress = null;
81    private int localPort;
82
83    private int status = SOCKET_STATUS_UNINITIALIZED;
84
85    // Whether the socket is bound.
86    private volatile boolean isBound = false;
87
88    private final Object readLock = new Object();
89
90    private final Object writeLock = new Object();
91
92    /*
93     * Constructor for creating a connected socket channel.
94     */
95    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
96        this(selectorProvider, true);
97    }
98
99    /*
100     * Constructor for creating an optionally connected socket channel.
101     */
102    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
103        super(selectorProvider);
104        status = SOCKET_STATUS_UNCONNECTED;
105        fd = (connect ? IoBridge.socket(true) : new FileDescriptor());
106    }
107
108    /*
109     * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel.
110     */
111    public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException {
112        super(selectorProvider);
113        status = SOCKET_STATUS_CONNECTED;
114        fd = existingFd;
115    }
116
117    /*
118     * Getting the internal Socket If we have not the socket, we create a new
119     * one.
120     */
121    @Override
122    synchronized public Socket socket() {
123        if (socket == null) {
124            try {
125                InetAddress addr = null;
126                int port = 0;
127                if (connectAddress != null) {
128                    addr = connectAddress.getAddress();
129                    port = connectAddress.getPort();
130                }
131                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
132            } catch (SocketException e) {
133                return null;
134            }
135        }
136        return socket;
137    }
138
139    /** @hide Until ready for a public API change */
140    @Override
141    synchronized public final SocketChannel bind(SocketAddress local) throws IOException {
142        if (!isOpen()) {
143            throw new ClosedChannelException();
144        }
145        if (isBound) {
146            throw new AlreadyBoundException();
147        }
148
149        if (local == null) {
150            local = new InetSocketAddress(Inet4Address.ANY, 0);
151        } else if (!(local instanceof InetSocketAddress)) {
152            throw new UnsupportedAddressTypeException();
153        }
154
155        InetSocketAddress localAddress = (InetSocketAddress) local;
156        IoBridge.bind(fd, localAddress.getAddress(), localAddress.getPort());
157        onBind(true /* updateSocketState */);
158        return this;
159    }
160
161    /**
162     * Initialise the isBound, localAddress and localPort state from the file descriptor. Used when
163     * some or all of the bound state has been left to the OS to decide, or when the Socket handled
164     * bind() or connect().
165     *
166     * @param updateSocketState
167     *      if the associated socket (if present) needs to be updated
168     * @hide package visible for other nio classes
169     */
170    void onBind(boolean updateSocketState) {
171        SocketAddress sa;
172        try {
173            sa = Libcore.os.getsockname(fd);
174        } catch (ErrnoException errnoException) {
175            throw new AssertionError(errnoException);
176        }
177        isBound = true;
178        InetSocketAddress localSocketAddress = (InetSocketAddress) sa;
179        localAddress = localSocketAddress.getAddress();
180        localPort = localSocketAddress.getPort();
181        if (updateSocketState && socket != null) {
182            socket.onBind(localAddress, localPort);
183        }
184    }
185
186    /** @hide Until ready for a public API change */
187    @Override
188    synchronized public SocketAddress getLocalAddress() throws IOException {
189        if (!isOpen()) {
190            throw new ClosedChannelException();
191        }
192        return isBound ? new InetSocketAddress(localAddress, localPort) : null;
193    }
194
195    @Override
196    synchronized public boolean isConnected() {
197        return status == SOCKET_STATUS_CONNECTED;
198    }
199
200    @Override
201    synchronized public boolean isConnectionPending() {
202        return status == SOCKET_STATUS_PENDING;
203    }
204
205    @Override
206    public boolean connect(SocketAddress socketAddress) throws IOException {
207        // status must be open and unconnected
208        checkUnconnected();
209
210        // check the address
211        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
212        InetAddress normalAddr = inetSocketAddress.getAddress();
213        int port = inetSocketAddress.getPort();
214
215        // When connecting, map ANY address to localhost
216        if (normalAddr.isAnyLocalAddress()) {
217            normalAddr = InetAddress.getLocalHost();
218        }
219
220        boolean isBlocking = isBlocking();
221        boolean finished = false;
222        int newStatus;
223        try {
224            if (isBlocking) {
225                begin();
226            }
227            // When in blocking mode, IoBridge.connect() will return without an exception when the
228            // socket is connected. When in non-blocking mode it will return without an exception
229            // without knowing the result of the connection attempt, which could still be going on.
230            IoBridge.connect(fd, normalAddr, port);
231            newStatus = isBlocking ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_PENDING;
232            finished = true;
233        } catch (IOException e) {
234            if (isEINPROGRESS(e)) {
235                newStatus = SOCKET_STATUS_PENDING;
236            } else {
237                if (isOpen()) {
238                    close();
239                    finished = true;
240                }
241                throw e;
242            }
243        } finally {
244            if (isBlocking) {
245                end(finished);
246            }
247        }
248
249        // If the channel was not bound, a connection attempt will have caused an implicit bind() to
250        // take place. Keep the local address state held by the channel and the socket up to date.
251        if (!isBound) {
252            onBind(true /* updateSocketState */);
253        }
254
255        // Keep the connected state held by the channel and the socket up to date.
256        onConnectStatusChanged(inetSocketAddress, newStatus, true /* updateSocketState */);
257
258        return status == SOCKET_STATUS_CONNECTED;
259    }
260
261    /**
262     * Initialise the connect() state with the supplied information.
263     *
264     * @param updateSocketState
265     *     if the associated socket (if present) needs to be updated
266     * @hide package visible for other nio classes
267     */
268    void onConnectStatusChanged(InetSocketAddress address, int status, boolean updateSocketState) {
269        this.status = status;
270        connectAddress = address;
271        if (status == SOCKET_STATUS_CONNECTED && updateSocketState && socket != null) {
272            socket.onConnect(connectAddress.getAddress(), connectAddress.getPort());
273        }
274    }
275
276    private boolean isEINPROGRESS(IOException e) {
277        if (isBlocking()) {
278            return false;
279        }
280        if (e instanceof ConnectException) {
281            Throwable cause = e.getCause();
282            if (cause instanceof ErrnoException) {
283                return ((ErrnoException) cause).errno == EINPROGRESS;
284            }
285        }
286        return false;
287    }
288
289    @Override
290    public boolean finishConnect() throws IOException {
291        synchronized (this) {
292            if (!isOpen()) {
293                throw new ClosedChannelException();
294            }
295            if (status == SOCKET_STATUS_CONNECTED) {
296                return true;
297            }
298            if (status != SOCKET_STATUS_PENDING) {
299                throw new NoConnectionPendingException();
300            }
301        }
302
303        boolean finished = false;
304        try {
305            begin();
306            InetAddress inetAddress = connectAddress.getAddress();
307            int port = connectAddress.getPort();
308            finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately.
309        } catch (ConnectException e) {
310            if (isOpen()) {
311                close();
312                finished = true;
313            }
314            throw e;
315        } finally {
316            end(finished);
317        }
318
319        synchronized (this) {
320            status = (finished ? SOCKET_STATUS_CONNECTED : status);
321            if (finished && socket != null) {
322                socket.onConnect(connectAddress.getAddress(), connectAddress.getPort());
323            }
324        }
325        return finished;
326    }
327
328    @Override
329    public int read(ByteBuffer dst) throws IOException {
330        dst.checkWritable();
331        checkOpenConnected();
332        if (!dst.hasRemaining()) {
333            return 0;
334        }
335        return readImpl(dst);
336    }
337
338    @Override
339    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
340        Arrays.checkOffsetAndCount(targets.length, offset, length);
341        checkOpenConnected();
342        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
343        if (totalCount == 0) {
344            return 0;
345        }
346        byte[] readArray = new byte[totalCount];
347        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
348        int readCount;
349        // read data to readBuffer, and then transfer data from readBuffer to targets.
350        readCount = readImpl(readBuffer);
351        readBuffer.flip();
352        if (readCount > 0) {
353            int left = readCount;
354            int index = offset;
355            // transfer data from readArray to targets
356            while (left > 0) {
357                int putLength = Math.min(targets[index].remaining(), left);
358                targets[index].put(readArray, readCount - left, putLength);
359                index++;
360                left -= putLength;
361            }
362        }
363        return readCount;
364    }
365
366    private int readImpl(ByteBuffer dst) throws IOException {
367        synchronized (readLock) {
368            int readCount = 0;
369            try {
370                if (isBlocking()) {
371                    begin();
372                }
373                readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false);
374                if (readCount > 0) {
375                    dst.position(dst.position() + readCount);
376                }
377            } finally {
378                if (isBlocking()) {
379                    end(readCount > 0);
380                }
381            }
382            return readCount;
383        }
384    }
385
386    @Override
387    public int write(ByteBuffer src) throws IOException {
388        if (src == null) {
389            throw new NullPointerException("src == null");
390        }
391        checkOpenConnected();
392        if (!src.hasRemaining()) {
393            return 0;
394        }
395        return writeImpl(src);
396    }
397
398    @Override
399    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
400        Arrays.checkOffsetAndCount(sources.length, offset, length);
401        checkOpenConnected();
402        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
403        if (count == 0) {
404            return 0;
405        }
406        ByteBuffer writeBuf = ByteBuffer.allocate(count);
407        for (int val = offset; val < length + offset; val++) {
408            ByteBuffer source = sources[val];
409            int oldPosition = source.position();
410            writeBuf.put(source);
411            source.position(oldPosition);
412        }
413        writeBuf.flip();
414        int result = writeImpl(writeBuf);
415        int val = offset;
416        int written = result;
417        while (result > 0) {
418            ByteBuffer source = sources[val];
419            int gap = Math.min(result, source.remaining());
420            source.position(source.position() + gap);
421            val++;
422            result -= gap;
423        }
424        return written;
425    }
426
427    private int writeImpl(ByteBuffer src) throws IOException {
428        synchronized (writeLock) {
429            if (!src.hasRemaining()) {
430                return 0;
431            }
432            int writeCount = 0;
433            try {
434                if (isBlocking()) {
435                    begin();
436                }
437                writeCount = IoBridge.sendto(fd, src, 0, null, 0);
438                if (writeCount > 0) {
439                    src.position(src.position() + writeCount);
440                }
441            } finally {
442                if (isBlocking()) {
443                    end(writeCount >= 0);
444                }
445            }
446            return writeCount;
447        }
448    }
449
450    /*
451     * Status check, open and "connected", when read and write.
452     */
453    synchronized private void checkOpenConnected() throws ClosedChannelException {
454        if (!isOpen()) {
455            throw new ClosedChannelException();
456        }
457        if (!isConnected()) {
458            throw new NotYetConnectedException();
459        }
460    }
461
462    /*
463     * Status check, open and "unconnected", before connection.
464     */
465    synchronized private void checkUnconnected() throws IOException {
466        if (!isOpen()) {
467            throw new ClosedChannelException();
468        }
469        if (status == SOCKET_STATUS_CONNECTED) {
470            throw new AlreadyConnectedException();
471        }
472        if (status == SOCKET_STATUS_PENDING) {
473            throw new ConnectionPendingException();
474        }
475    }
476
477    /*
478     * Shared by this class and DatagramChannelImpl, to do the address transfer
479     * and check.
480     */
481    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
482        if (socketAddress == null) {
483            throw new IllegalArgumentException("socketAddress == null");
484        }
485        if (!(socketAddress instanceof InetSocketAddress)) {
486            throw new UnsupportedAddressTypeException();
487        }
488        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
489        if (inetSocketAddress.isUnresolved()) {
490            throw new UnresolvedAddressException();
491        }
492        return inetSocketAddress;
493    }
494
495    /*
496     * Do really closing action here.
497     */
498    @Override
499    protected synchronized void implCloseSelectableChannel() throws IOException {
500        if (status != SOCKET_STATUS_CLOSED) {
501            status = SOCKET_STATUS_CLOSED;
502            // IoBridge.closeSocket(fd) is idempotent: It is safe to call on an already-closed file
503            // descriptor.
504            IoBridge.closeSocket(fd);
505            if (socket != null && !socket.isClosed()) {
506                socket.onClose();
507            }
508        }
509    }
510
511    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
512        IoUtils.setBlocking(fd, blocking);
513    }
514
515    /*
516     * Get the fd.
517     */
518    public FileDescriptor getFD() {
519        return fd;
520    }
521
522    /* @hide used by ServerSocketChannelImpl to sync channel state during accept() */
523    public void onAccept(InetSocketAddress remoteAddress, boolean updateSocketState) {
524        onBind(updateSocketState);
525        onConnectStatusChanged(remoteAddress, SOCKET_STATUS_CONNECTED, updateSocketState);
526    }
527
528    /*
529     * Adapter classes for internal socket.
530     */
531    private static class SocketAdapter extends Socket {
532        private final SocketChannelImpl channel;
533        private final PlainSocketImpl socketImpl;
534
535        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel)
536                throws SocketException {
537            super(socketImpl);
538            this.socketImpl = socketImpl;
539            this.channel = channel;
540            SocketUtils.setCreated(this);
541
542            // Sync state socket state with the channel it is being created from
543            if (channel.isBound) {
544                onBind(channel.localAddress, channel.localPort);
545            }
546            if (channel.isConnected()) {
547                onConnect(channel.connectAddress.getAddress(), channel.connectAddress.getPort());
548            }
549            if (!channel.isOpen()) {
550                onClose();
551            }
552
553        }
554
555        @Override
556        public SocketChannel getChannel() {
557            return channel;
558        }
559
560        @Override
561        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
562            if (!channel.isBlocking()) {
563                throw new IllegalBlockingModeException();
564            }
565            if (isConnected()) {
566                throw new AlreadyConnectedException();
567            }
568            super.connect(remoteAddr, timeout);
569            channel.onBind(false);
570            if (super.isConnected()) {
571                InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddr;
572                channel.onConnectStatusChanged(
573                        remoteInetAddress, SOCKET_STATUS_CONNECTED, false /* updateSocketState */);
574            }
575        }
576
577        @Override
578        public void bind(SocketAddress localAddr) throws IOException {
579            if (channel.isConnected()) {
580                throw new AlreadyConnectedException();
581            }
582            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
583                throw new ConnectionPendingException();
584            }
585            super.bind(localAddr);
586            channel.onBind(false);
587        }
588
589        @Override
590        public void close() throws IOException {
591            synchronized (channel) {
592                super.close();
593                if (channel.isOpen()) {
594                    // channel.close() recognizes the socket is closed and avoids recursion. There
595                    // is no channel.onClose() because the "closed" field is private.
596                    channel.close();
597                }
598            }
599        }
600
601        @Override
602        public OutputStream getOutputStream() throws IOException {
603            return new BlockingCheckOutputStream(super.getOutputStream(), channel);
604        }
605
606        @Override
607        public InputStream getInputStream() throws IOException {
608            return new BlockingCheckInputStream(super.getInputStream(), channel);
609        }
610
611        @Override
612        public FileDescriptor getFileDescriptor$() {
613            return socketImpl.getFD$();
614        }
615    }
616
617    /*
618     * Throws an IllegalBlockingModeException if the channel is in non-blocking
619     * mode when performing write operations.
620     */
621    private static class BlockingCheckOutputStream extends FilterOutputStream {
622        private final SocketChannel channel;
623
624        public BlockingCheckOutputStream(OutputStream out, SocketChannel channel) {
625            super(out);
626            this.channel = channel;
627        }
628
629        @Override
630        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
631            checkBlocking();
632            out.write(buffer, offset, byteCount);
633        }
634
635        @Override
636        public void write(int oneByte) throws IOException {
637            checkBlocking();
638            out.write(oneByte);
639        }
640
641        @Override
642        public void write(byte[] buffer) throws IOException {
643            checkBlocking();
644            out.write(buffer);
645        }
646
647        @Override
648        public void close() throws IOException {
649            super.close();
650            // channel.close() recognizes the socket is closed and avoids recursion. There is no
651            // channel.onClose() because the "closed" field is private.
652            channel.close();
653        }
654
655        private void checkBlocking() {
656            if (!channel.isBlocking()) {
657                throw new IllegalBlockingModeException();
658            }
659        }
660    }
661
662    /*
663     * Throws an IllegalBlockingModeException if the channel is in non-blocking
664     * mode when performing read operations.
665     */
666    private static class BlockingCheckInputStream extends FilterInputStream {
667        private final SocketChannel channel;
668
669        public BlockingCheckInputStream(InputStream in, SocketChannel channel) {
670            super(in);
671            this.channel = channel;
672        }
673
674        @Override
675        public int read() throws IOException {
676            checkBlocking();
677            return in.read();
678        }
679
680        @Override
681        public int read(byte[] buffer, int byteOffset, int byteCount) throws IOException {
682            checkBlocking();
683            return in.read(buffer, byteOffset, byteCount);
684        }
685
686        @Override
687        public int read(byte[] buffer) throws IOException {
688            checkBlocking();
689            return in.read(buffer);
690        }
691
692        @Override
693        public void close() throws IOException {
694            super.close();
695            // channel.close() recognizes the socket is closed and avoids recursion. There is no
696            // channel.onClose() because the "closed" field is private.
697            channel.close();
698        }
699
700        private void checkBlocking() {
701            if (!channel.isBlocking()) {
702                throw new IllegalBlockingModeException();
703            }
704        }
705    }
706}
707