SocketChannelImpl.java revision be0b5bd592bed1edaed2447e7ab1764b93eceaab
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ConnectException;
25import java.net.Inet4Address;
26import java.net.InetAddress;
27import java.net.InetSocketAddress;
28import java.net.PlainSocketImpl;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.net.SocketException;
32import java.net.SocketUtils;
33import java.net.UnknownHostException;
34import java.nio.channels.AlreadyConnectedException;
35import java.nio.channels.ClosedChannelException;
36import java.nio.channels.ConnectionPendingException;
37import java.nio.channels.IllegalBlockingModeException;
38import java.nio.channels.NoConnectionPendingException;
39import java.nio.channels.NotYetConnectedException;
40import java.nio.channels.SocketChannel;
41import java.nio.channels.UnresolvedAddressException;
42import java.nio.channels.UnsupportedAddressTypeException;
43import java.nio.channels.spi.SelectorProvider;
44import java.util.Arrays;
45import libcore.io.ErrnoException;
46import libcore.io.Libcore;
47import libcore.io.IoBridge;
48import libcore.io.IoUtils;
49import static libcore.io.OsConstants.*;
50
51/*
52 * The default implementation class of java.nio.channels.SocketChannel.
53 */
54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel {
55    private static final int SOCKET_STATUS_UNINITIALIZED = -1;
56
57    // Status before connect.
58    private static final int SOCKET_STATUS_UNCONNECTED = 0;
59
60    // Status connection pending.
61    private static final int SOCKET_STATUS_PENDING = 1;
62
63    // Status after connection success.
64    private static final int SOCKET_STATUS_CONNECTED = 2;
65
66    // Status closed.
67    private static final int SOCKET_STATUS_CLOSED = 3;
68
69    private final FileDescriptor fd;
70
71    // Our internal Socket.
72    private SocketAdapter socket = null;
73
74    // The address to be connected.
75    private InetSocketAddress connectAddress = null;
76
77    private InetAddress localAddress = null;
78    private int localPort;
79
80    private int status = SOCKET_STATUS_UNINITIALIZED;
81
82    // Whether the socket is bound.
83    private volatile boolean isBound = false;
84
85    private final Object readLock = new Object();
86
87    private final Object writeLock = new Object();
88
89    /*
90     * Constructor for creating a connected socket channel.
91     */
92    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
93        this(selectorProvider, true);
94    }
95
96    /*
97     * Constructor for creating an optionally connected socket channel.
98     */
99    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
100        super(selectorProvider);
101        status = SOCKET_STATUS_UNCONNECTED;
102        fd = (connect ? IoBridge.socket(true) : new FileDescriptor());
103    }
104
105    /*
106     * Getting the internal Socket If we have not the socket, we create a new
107     * one.
108     */
109    @Override
110    synchronized public Socket socket() {
111        if (socket == null) {
112            try {
113                InetAddress addr = null;
114                int port = 0;
115                if (connectAddress != null) {
116                    addr = connectAddress.getAddress();
117                    port = connectAddress.getPort();
118                }
119                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
120            } catch (SocketException e) {
121                return null;
122            }
123        }
124        return socket;
125    }
126
127    @Override
128    synchronized public boolean isConnected() {
129        return status == SOCKET_STATUS_CONNECTED;
130    }
131
132    /*
133     * Status setting used by other class.
134     */
135    synchronized void setConnected() {
136        status = SOCKET_STATUS_CONNECTED;
137    }
138
139    void setBound(boolean flag) {
140        isBound = flag;
141    }
142
143    @Override
144    synchronized public boolean isConnectionPending() {
145        return status == SOCKET_STATUS_PENDING;
146    }
147
148    @Override
149    public boolean connect(SocketAddress socketAddress) throws IOException {
150        // status must be open and unconnected
151        checkUnconnected();
152
153        // check the address
154        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
155        InetAddress normalAddr = inetSocketAddress.getAddress();
156        int port = inetSocketAddress.getPort();
157
158        // When connecting, map ANY address to localhost
159        if (normalAddr.isAnyLocalAddress()) {
160            normalAddr = InetAddress.getLocalHost();
161        }
162
163        boolean finished = false;
164        try {
165            if (isBlocking()) {
166                begin();
167            }
168            finished = IoBridge.connect(fd, normalAddr, port);
169            isBound = finished;
170        } catch (IOException e) {
171            if (e instanceof ConnectException && !isBlocking()) {
172                status = SOCKET_STATUS_PENDING;
173            } else {
174                if (isOpen()) {
175                    close();
176                    finished = true;
177                }
178                throw e;
179            }
180        } finally {
181            if (isBlocking()) {
182                end(finished);
183            }
184        }
185
186        initLocalAddressAndPort();
187        connectAddress = inetSocketAddress;
188        if (socket != null) {
189            socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(),
190                    connectAddress.getPort());
191        }
192
193        synchronized (this) {
194            if (isBlocking()) {
195                status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED);
196            } else {
197                status = SOCKET_STATUS_PENDING;
198            }
199        }
200        return finished;
201    }
202
203    private void initLocalAddressAndPort() {
204        SocketAddress sa;
205        try {
206            sa = Libcore.os.getsockname(fd);
207        } catch (ErrnoException errnoException) {
208            throw new AssertionError(errnoException);
209        }
210        InetSocketAddress isa = (InetSocketAddress) sa;
211        localAddress = isa.getAddress();
212        localPort = isa.getPort();
213        if (socket != null) {
214            socket.socketImpl().initLocalPort(localPort);
215        }
216    }
217
218    @Override
219    public boolean finishConnect() throws IOException {
220        synchronized (this) {
221            if (!isOpen()) {
222                throw new ClosedChannelException();
223            }
224            if (status == SOCKET_STATUS_CONNECTED) {
225                return true;
226            }
227            if (status != SOCKET_STATUS_PENDING) {
228                throw new NoConnectionPendingException();
229            }
230        }
231
232        boolean finished = false;
233        try {
234            begin();
235            InetAddress inetAddress = connectAddress.getAddress();
236            int port = connectAddress.getPort();
237            finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately.
238            isBound = finished;
239        } catch (ConnectException e) {
240            if (isOpen()) {
241                close();
242                finished = true;
243            }
244            throw e;
245        } finally {
246            end(finished);
247        }
248
249        synchronized (this) {
250            status = (finished ? SOCKET_STATUS_CONNECTED : status);
251            isBound = finished;
252        }
253        return finished;
254    }
255
256    void finishAccept() {
257        initLocalAddressAndPort();
258    }
259
260    @Override
261    public int read(ByteBuffer dst) throws IOException {
262        dst.checkWritable();
263        checkOpenConnected();
264        if (!dst.hasRemaining()) {
265            return 0;
266        }
267        return readImpl(dst);
268    }
269
270    @Override
271    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
272        Arrays.checkOffsetAndCount(targets.length, offset, length);
273        checkOpenConnected();
274        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
275        if (totalCount == 0) {
276            return 0;
277        }
278        byte[] readArray = new byte[totalCount];
279        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
280        int readCount;
281        // read data to readBuffer, and then transfer data from readBuffer to targets.
282        readCount = readImpl(readBuffer);
283        readBuffer.flip();
284        if (readCount > 0) {
285            int left = readCount;
286            int index = offset;
287            // transfer data from readArray to targets
288            while (left > 0) {
289                int putLength = Math.min(targets[index].remaining(), left);
290                targets[index].put(readArray, readCount - left, putLength);
291                index++;
292                left -= putLength;
293            }
294        }
295        return readCount;
296    }
297
298    private int readImpl(ByteBuffer dst) throws IOException {
299        synchronized (readLock) {
300            int readCount = 0;
301            try {
302                if (isBlocking()) {
303                    begin();
304                }
305                readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false);
306                if (readCount > 0) {
307                    dst.position(dst.position() + readCount);
308                }
309            } finally {
310                if (isBlocking()) {
311                    end(readCount > 0);
312                }
313            }
314            return readCount;
315        }
316    }
317
318    @Override
319    public int write(ByteBuffer src) throws IOException {
320        if (src == null) {
321            throw new NullPointerException();
322        }
323        checkOpenConnected();
324        if (!src.hasRemaining()) {
325            return 0;
326        }
327        return writeImpl(src);
328    }
329
330    @Override
331    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
332        Arrays.checkOffsetAndCount(sources.length, offset, length);
333        checkOpenConnected();
334        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
335        if (count == 0) {
336            return 0;
337        }
338        ByteBuffer writeBuf = ByteBuffer.allocate(count);
339        for (int val = offset; val < length + offset; val++) {
340            ByteBuffer source = sources[val];
341            int oldPosition = source.position();
342            writeBuf.put(source);
343            source.position(oldPosition);
344        }
345        writeBuf.flip();
346        int result = writeImpl(writeBuf);
347        int val = offset;
348        int written = result;
349        while (result > 0) {
350            ByteBuffer source = sources[val];
351            int gap = Math.min(result, source.remaining());
352            source.position(source.position() + gap);
353            val++;
354            result -= gap;
355        }
356        return written;
357    }
358
359    private int writeImpl(ByteBuffer src) throws IOException {
360        synchronized (writeLock) {
361            if (!src.hasRemaining()) {
362                return 0;
363            }
364            int writeCount = 0;
365            try {
366                if (isBlocking()) {
367                    begin();
368                }
369                writeCount = IoBridge.sendto(fd, src, 0, null, 0);
370                if (writeCount > 0) {
371                    src.position(src.position() + writeCount);
372                }
373            } finally {
374                if (isBlocking()) {
375                    end(writeCount >= 0);
376                }
377            }
378            return writeCount;
379        }
380    }
381
382    /*
383     * Status check, open and "connected", when read and write.
384     */
385    synchronized private void checkOpenConnected() throws ClosedChannelException {
386        if (!isOpen()) {
387            throw new ClosedChannelException();
388        }
389        if (!isConnected()) {
390            throw new NotYetConnectedException();
391        }
392    }
393
394    /*
395     * Status check, open and "unconnected", before connection.
396     */
397    synchronized private void checkUnconnected() throws IOException {
398        if (!isOpen()) {
399            throw new ClosedChannelException();
400        }
401        if (status == SOCKET_STATUS_CONNECTED) {
402            throw new AlreadyConnectedException();
403        }
404        if (status == SOCKET_STATUS_PENDING) {
405            throw new ConnectionPendingException();
406        }
407    }
408
409    /*
410     * Shared by this class and DatagramChannelImpl, to do the address transfer
411     * and check.
412     */
413    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
414        if (socketAddress == null) {
415            throw new IllegalArgumentException();
416        }
417        if (!(socketAddress instanceof InetSocketAddress)) {
418            throw new UnsupportedAddressTypeException();
419        }
420        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
421        if (inetSocketAddress.isUnresolved()) {
422            throw new UnresolvedAddressException();
423        }
424        return inetSocketAddress;
425    }
426
427    /*
428     * Get local address.
429     */
430    public InetAddress getLocalAddress() throws UnknownHostException {
431        return isBound ? localAddress : Inet4Address.ANY;
432    }
433
434    /*
435     * Do really closing action here.
436     */
437    @Override
438    protected synchronized void implCloseSelectableChannel() throws IOException {
439        if (status != SOCKET_STATUS_CLOSED) {
440            status = SOCKET_STATUS_CLOSED;
441            if (socket != null && !socket.isClosed()) {
442                socket.close();
443            } else {
444                IoBridge.closeSocket(fd);
445            }
446        }
447    }
448
449    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
450        synchronized (blockingLock()) {
451            IoUtils.setBlocking(fd, blocking);
452        }
453    }
454
455    /*
456     * Get the fd.
457     */
458    public FileDescriptor getFD() {
459        return fd;
460    }
461
462    /*
463     * Adapter classes for internal socket.
464     */
465    private static class SocketAdapter extends Socket {
466        private final SocketChannelImpl channel;
467        private final PlainSocketImpl socketImpl;
468
469        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException {
470            super(socketImpl);
471            this.socketImpl = socketImpl;
472            this.channel = channel;
473            SocketUtils.setCreated(this);
474        }
475
476        PlainSocketImpl socketImpl() {
477            return socketImpl;
478        }
479
480        @Override
481        public SocketChannel getChannel() {
482            return channel;
483        }
484
485        @Override
486        public boolean isBound() {
487            return channel.isBound;
488        }
489
490        @Override
491        public boolean isConnected() {
492            return channel.isConnected();
493        }
494
495        @Override
496        public InetAddress getLocalAddress() {
497            try {
498                return channel.getLocalAddress();
499            } catch (UnknownHostException e) {
500                return null;
501            }
502        }
503
504        @Override
505        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
506            if (!channel.isBlocking()) {
507                throw new IllegalBlockingModeException();
508            }
509            if (isConnected()) {
510                throw new AlreadyConnectedException();
511            }
512            super.connect(remoteAddr, timeout);
513            channel.initLocalAddressAndPort();
514            if (super.isConnected()) {
515                channel.setConnected();
516                channel.isBound = super.isBound();
517            }
518        }
519
520        @Override
521        public void bind(SocketAddress localAddr) throws IOException {
522            if (channel.isConnected()) {
523                throw new AlreadyConnectedException();
524            }
525            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
526                throw new ConnectionPendingException();
527            }
528            super.bind(localAddr);
529            channel.initLocalAddressAndPort();
530            channel.isBound = true;
531        }
532
533        @Override
534        public void close() throws IOException {
535            synchronized (channel) {
536                if (channel.isOpen()) {
537                    channel.close();
538                } else {
539                    super.close();
540                }
541                channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED;
542            }
543        }
544
545        @Override
546        public OutputStream getOutputStream() throws IOException {
547            checkOpenAndConnected();
548            if (isOutputShutdown()) {
549                throw new SocketException("Socket output is shutdown");
550            }
551            return new SocketChannelOutputStream(channel);
552        }
553
554        @Override
555        public InputStream getInputStream() throws IOException {
556            checkOpenAndConnected();
557            if (isInputShutdown()) {
558                throw new SocketException("Socket input is shutdown");
559            }
560            return new SocketChannelInputStream(channel);
561        }
562
563        private void checkOpenAndConnected() throws SocketException {
564            if (!channel.isOpen()) {
565                throw new SocketException("Socket is closed");
566            }
567            if (!channel.isConnected()) {
568                throw new SocketException("Socket is not connected");
569            }
570        }
571
572        @Override
573        public FileDescriptor getFileDescriptor$() {
574            return socketImpl.getFD$();
575        }
576    }
577
578    /*
579     * This output stream delegates all operations to the associated channel.
580     * Throws an IllegalBlockingModeException if the channel is in non-blocking
581     * mode when performing write operations.
582     */
583    private static class SocketChannelOutputStream extends OutputStream {
584        private final SocketChannel channel;
585
586        public SocketChannelOutputStream(SocketChannel channel) {
587            this.channel = channel;
588        }
589
590        /*
591         * Closes this stream and channel.
592         *
593         * @exception IOException thrown if an error occurs during the close
594         */
595        @Override
596        public void close() throws IOException {
597            channel.close();
598        }
599
600        @Override
601        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
602            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
603            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
604            if (!channel.isBlocking()) {
605                throw new IllegalBlockingModeException();
606            }
607            channel.write(buf);
608        }
609
610        @Override
611        public void write(int oneByte) throws IOException {
612            if (!channel.isBlocking()) {
613                throw new IllegalBlockingModeException();
614            }
615            ByteBuffer buffer = ByteBuffer.allocate(1);
616            buffer.put(0, (byte) (oneByte & 0xFF));
617            channel.write(buffer);
618        }
619    }
620
621    /*
622     * This input stream delegates all operations to the associated channel.
623     * Throws an IllegalBlockingModeException if the channel is in non-blocking
624     * mode when performing read operations.
625     */
626    private static class SocketChannelInputStream extends InputStream {
627        private final SocketChannel channel;
628
629        public SocketChannelInputStream(SocketChannel channel) {
630            this.channel = channel;
631        }
632
633        /*
634         * Closes this stream and channel.
635         */
636        @Override
637        public void close() throws IOException {
638            channel.close();
639        }
640
641        @Override
642        public int read() throws IOException {
643            if (!channel.isBlocking()) {
644                throw new IllegalBlockingModeException();
645            }
646            ByteBuffer buf = ByteBuffer.allocate(1);
647            int result = channel.read(buf);
648            return (result == -1) ? result : (buf.get(0) & 0xff);
649        }
650
651        @Override
652        public int read(byte[] buffer, int offset, int byteCount) throws IOException {
653            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
654            if (!channel.isBlocking()) {
655                throw new IllegalBlockingModeException();
656            }
657            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
658            return channel.read(buf);
659        }
660    }
661}
662