SocketChannelImpl.java revision 3218082325b6b8713a8ac15731482e3da86a7df9
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ConnectException;
25import java.net.Inet4Address;
26import java.net.InetAddress;
27import java.net.InetSocketAddress;
28import java.net.PlainSocketImpl;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.net.SocketException;
32import java.net.SocketUtils;
33import java.net.UnknownHostException;
34import java.nio.channels.AlreadyConnectedException;
35import java.nio.channels.ClosedChannelException;
36import java.nio.channels.ConnectionPendingException;
37import java.nio.channels.IllegalBlockingModeException;
38import java.nio.channels.NoConnectionPendingException;
39import java.nio.channels.NotYetConnectedException;
40import java.nio.channels.SocketChannel;
41import java.nio.channels.UnresolvedAddressException;
42import java.nio.channels.UnsupportedAddressTypeException;
43import java.nio.channels.spi.SelectorProvider;
44import java.util.Arrays;
45import libcore.io.ErrnoException;
46import libcore.io.Libcore;
47import libcore.io.IoBridge;
48import libcore.io.IoUtils;
49import static libcore.io.OsConstants.*;
50
51/*
52 * The default implementation class of java.nio.channels.SocketChannel.
53 */
54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel {
55    private static final int SOCKET_STATUS_UNINITIALIZED = -1;
56
57    // Status before connect.
58    private static final int SOCKET_STATUS_UNCONNECTED = 0;
59
60    // Status connection pending.
61    private static final int SOCKET_STATUS_PENDING = 1;
62
63    // Status after connection success.
64    private static final int SOCKET_STATUS_CONNECTED = 2;
65
66    // Status closed.
67    private static final int SOCKET_STATUS_CLOSED = 3;
68
69    private final FileDescriptor fd;
70
71    // Our internal Socket.
72    private SocketAdapter socket = null;
73
74    // The address to be connected.
75    private InetSocketAddress connectAddress = null;
76
77    private InetAddress localAddress = null;
78    private int localPort;
79
80    private int status = SOCKET_STATUS_UNINITIALIZED;
81
82    // Whether the socket is bound.
83    private volatile boolean isBound = false;
84
85    private final Object readLock = new Object();
86
87    private final Object writeLock = new Object();
88
89    /*
90     * Constructor for creating a connected socket channel.
91     */
92    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
93        this(selectorProvider, true);
94    }
95
96    /*
97     * Constructor for creating an optionally connected socket channel.
98     */
99    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
100        super(selectorProvider);
101        status = SOCKET_STATUS_UNCONNECTED;
102        fd = (connect ? IoBridge.socket(true) : new FileDescriptor());
103    }
104
105    /*
106     * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel.
107     */
108    public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException {
109        super(selectorProvider);
110        status = SOCKET_STATUS_CONNECTED;
111        fd = existingFd;
112    }
113
114    /*
115     * Getting the internal Socket If we have not the socket, we create a new
116     * one.
117     */
118    @Override
119    synchronized public Socket socket() {
120        if (socket == null) {
121            try {
122                InetAddress addr = null;
123                int port = 0;
124                if (connectAddress != null) {
125                    addr = connectAddress.getAddress();
126                    port = connectAddress.getPort();
127                }
128                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
129            } catch (SocketException e) {
130                return null;
131            }
132        }
133        return socket;
134    }
135
136    @Override
137    synchronized public boolean isConnected() {
138        return status == SOCKET_STATUS_CONNECTED;
139    }
140
141    /*
142     * Status setting used by other class.
143     */
144    synchronized void setConnected() {
145        status = SOCKET_STATUS_CONNECTED;
146    }
147
148    void setBound(boolean flag) {
149        isBound = flag;
150    }
151
152    @Override
153    synchronized public boolean isConnectionPending() {
154        return status == SOCKET_STATUS_PENDING;
155    }
156
157    @Override
158    public boolean connect(SocketAddress socketAddress) throws IOException {
159        // status must be open and unconnected
160        checkUnconnected();
161
162        // check the address
163        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
164        InetAddress normalAddr = inetSocketAddress.getAddress();
165        int port = inetSocketAddress.getPort();
166
167        // When connecting, map ANY address to localhost
168        if (normalAddr.isAnyLocalAddress()) {
169            normalAddr = InetAddress.getLocalHost();
170        }
171
172        boolean finished = false;
173        try {
174            if (isBlocking()) {
175                begin();
176            }
177            finished = IoBridge.connect(fd, normalAddr, port);
178            isBound = finished;
179        } catch (IOException e) {
180            if (e instanceof ConnectException && !isBlocking()) {
181                status = SOCKET_STATUS_PENDING;
182            } else {
183                if (isOpen()) {
184                    close();
185                    finished = true;
186                }
187                throw e;
188            }
189        } finally {
190            if (isBlocking()) {
191                end(finished);
192            }
193        }
194
195        initLocalAddressAndPort();
196        connectAddress = inetSocketAddress;
197        if (socket != null) {
198            socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(),
199                    connectAddress.getPort());
200        }
201
202        synchronized (this) {
203            if (isBlocking()) {
204                status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED);
205            } else {
206                status = SOCKET_STATUS_PENDING;
207            }
208        }
209        return finished;
210    }
211
212    private void initLocalAddressAndPort() {
213        SocketAddress sa;
214        try {
215            sa = Libcore.os.getsockname(fd);
216        } catch (ErrnoException errnoException) {
217            throw new AssertionError(errnoException);
218        }
219        InetSocketAddress isa = (InetSocketAddress) sa;
220        localAddress = isa.getAddress();
221        localPort = isa.getPort();
222        if (socket != null) {
223            socket.socketImpl().initLocalPort(localPort);
224        }
225    }
226
227    @Override
228    public boolean finishConnect() throws IOException {
229        synchronized (this) {
230            if (!isOpen()) {
231                throw new ClosedChannelException();
232            }
233            if (status == SOCKET_STATUS_CONNECTED) {
234                return true;
235            }
236            if (status != SOCKET_STATUS_PENDING) {
237                throw new NoConnectionPendingException();
238            }
239        }
240
241        boolean finished = false;
242        try {
243            begin();
244            InetAddress inetAddress = connectAddress.getAddress();
245            int port = connectAddress.getPort();
246            finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately.
247            isBound = finished;
248        } catch (ConnectException e) {
249            if (isOpen()) {
250                close();
251                finished = true;
252            }
253            throw e;
254        } finally {
255            end(finished);
256        }
257
258        synchronized (this) {
259            status = (finished ? SOCKET_STATUS_CONNECTED : status);
260            isBound = finished;
261        }
262        return finished;
263    }
264
265    void finishAccept() {
266        initLocalAddressAndPort();
267    }
268
269    @Override
270    public int read(ByteBuffer dst) throws IOException {
271        dst.checkWritable();
272        checkOpenConnected();
273        if (!dst.hasRemaining()) {
274            return 0;
275        }
276        return readImpl(dst);
277    }
278
279    @Override
280    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
281        Arrays.checkOffsetAndCount(targets.length, offset, length);
282        checkOpenConnected();
283        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
284        if (totalCount == 0) {
285            return 0;
286        }
287        byte[] readArray = new byte[totalCount];
288        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
289        int readCount;
290        // read data to readBuffer, and then transfer data from readBuffer to targets.
291        readCount = readImpl(readBuffer);
292        readBuffer.flip();
293        if (readCount > 0) {
294            int left = readCount;
295            int index = offset;
296            // transfer data from readArray to targets
297            while (left > 0) {
298                int putLength = Math.min(targets[index].remaining(), left);
299                targets[index].put(readArray, readCount - left, putLength);
300                index++;
301                left -= putLength;
302            }
303        }
304        return readCount;
305    }
306
307    private int readImpl(ByteBuffer dst) throws IOException {
308        synchronized (readLock) {
309            int readCount = 0;
310            try {
311                if (isBlocking()) {
312                    begin();
313                }
314                readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false);
315                if (readCount > 0) {
316                    dst.position(dst.position() + readCount);
317                }
318            } finally {
319                if (isBlocking()) {
320                    end(readCount > 0);
321                }
322            }
323            return readCount;
324        }
325    }
326
327    @Override
328    public int write(ByteBuffer src) throws IOException {
329        if (src == null) {
330            throw new NullPointerException("src == null");
331        }
332        checkOpenConnected();
333        if (!src.hasRemaining()) {
334            return 0;
335        }
336        return writeImpl(src);
337    }
338
339    @Override
340    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
341        Arrays.checkOffsetAndCount(sources.length, offset, length);
342        checkOpenConnected();
343        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
344        if (count == 0) {
345            return 0;
346        }
347        ByteBuffer writeBuf = ByteBuffer.allocate(count);
348        for (int val = offset; val < length + offset; val++) {
349            ByteBuffer source = sources[val];
350            int oldPosition = source.position();
351            writeBuf.put(source);
352            source.position(oldPosition);
353        }
354        writeBuf.flip();
355        int result = writeImpl(writeBuf);
356        int val = offset;
357        int written = result;
358        while (result > 0) {
359            ByteBuffer source = sources[val];
360            int gap = Math.min(result, source.remaining());
361            source.position(source.position() + gap);
362            val++;
363            result -= gap;
364        }
365        return written;
366    }
367
368    private int writeImpl(ByteBuffer src) throws IOException {
369        synchronized (writeLock) {
370            if (!src.hasRemaining()) {
371                return 0;
372            }
373            int writeCount = 0;
374            try {
375                if (isBlocking()) {
376                    begin();
377                }
378                writeCount = IoBridge.sendto(fd, src, 0, null, 0);
379                if (writeCount > 0) {
380                    src.position(src.position() + writeCount);
381                }
382            } finally {
383                if (isBlocking()) {
384                    end(writeCount >= 0);
385                }
386            }
387            return writeCount;
388        }
389    }
390
391    /*
392     * Status check, open and "connected", when read and write.
393     */
394    synchronized private void checkOpenConnected() throws ClosedChannelException {
395        if (!isOpen()) {
396            throw new ClosedChannelException();
397        }
398        if (!isConnected()) {
399            throw new NotYetConnectedException();
400        }
401    }
402
403    /*
404     * Status check, open and "unconnected", before connection.
405     */
406    synchronized private void checkUnconnected() throws IOException {
407        if (!isOpen()) {
408            throw new ClosedChannelException();
409        }
410        if (status == SOCKET_STATUS_CONNECTED) {
411            throw new AlreadyConnectedException();
412        }
413        if (status == SOCKET_STATUS_PENDING) {
414            throw new ConnectionPendingException();
415        }
416    }
417
418    /*
419     * Shared by this class and DatagramChannelImpl, to do the address transfer
420     * and check.
421     */
422    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
423        if (socketAddress == null) {
424            throw new IllegalArgumentException("socketAddress == null");
425        }
426        if (!(socketAddress instanceof InetSocketAddress)) {
427            throw new UnsupportedAddressTypeException();
428        }
429        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
430        if (inetSocketAddress.isUnresolved()) {
431            throw new UnresolvedAddressException();
432        }
433        return inetSocketAddress;
434    }
435
436    /*
437     * Get local address.
438     */
439    public InetAddress getLocalAddress() throws UnknownHostException {
440        return isBound ? localAddress : Inet4Address.ANY;
441    }
442
443    /*
444     * Do really closing action here.
445     */
446    @Override
447    protected synchronized void implCloseSelectableChannel() throws IOException {
448        if (status != SOCKET_STATUS_CLOSED) {
449            status = SOCKET_STATUS_CLOSED;
450            if (socket != null && !socket.isClosed()) {
451                socket.close();
452            } else {
453                IoBridge.closeSocket(fd);
454            }
455        }
456    }
457
458    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
459        synchronized (blockingLock()) {
460            IoUtils.setBlocking(fd, blocking);
461        }
462    }
463
464    /*
465     * Get the fd.
466     */
467    public FileDescriptor getFD() {
468        return fd;
469    }
470
471    /*
472     * Adapter classes for internal socket.
473     */
474    private static class SocketAdapter extends Socket {
475        private final SocketChannelImpl channel;
476        private final PlainSocketImpl socketImpl;
477
478        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException {
479            super(socketImpl);
480            this.socketImpl = socketImpl;
481            this.channel = channel;
482            SocketUtils.setCreated(this);
483        }
484
485        PlainSocketImpl socketImpl() {
486            return socketImpl;
487        }
488
489        @Override
490        public SocketChannel getChannel() {
491            return channel;
492        }
493
494        @Override
495        public boolean isBound() {
496            return channel.isBound;
497        }
498
499        @Override
500        public boolean isConnected() {
501            return channel.isConnected();
502        }
503
504        @Override
505        public InetAddress getLocalAddress() {
506            try {
507                return channel.getLocalAddress();
508            } catch (UnknownHostException e) {
509                return null;
510            }
511        }
512
513        @Override
514        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
515            if (!channel.isBlocking()) {
516                throw new IllegalBlockingModeException();
517            }
518            if (isConnected()) {
519                throw new AlreadyConnectedException();
520            }
521            super.connect(remoteAddr, timeout);
522            channel.initLocalAddressAndPort();
523            if (super.isConnected()) {
524                channel.setConnected();
525                channel.isBound = super.isBound();
526            }
527        }
528
529        @Override
530        public void bind(SocketAddress localAddr) throws IOException {
531            if (channel.isConnected()) {
532                throw new AlreadyConnectedException();
533            }
534            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
535                throw new ConnectionPendingException();
536            }
537            super.bind(localAddr);
538            channel.initLocalAddressAndPort();
539            channel.isBound = true;
540        }
541
542        @Override
543        public void close() throws IOException {
544            synchronized (channel) {
545                if (channel.isOpen()) {
546                    channel.close();
547                } else {
548                    super.close();
549                }
550                channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED;
551            }
552        }
553
554        @Override
555        public OutputStream getOutputStream() throws IOException {
556            checkOpenAndConnected();
557            if (isOutputShutdown()) {
558                throw new SocketException("Socket output is shutdown");
559            }
560            return new SocketChannelOutputStream(channel);
561        }
562
563        @Override
564        public InputStream getInputStream() throws IOException {
565            checkOpenAndConnected();
566            if (isInputShutdown()) {
567                throw new SocketException("Socket input is shutdown");
568            }
569            return new SocketChannelInputStream(channel);
570        }
571
572        private void checkOpenAndConnected() throws SocketException {
573            if (!channel.isOpen()) {
574                throw new SocketException("Socket is closed");
575            }
576            if (!channel.isConnected()) {
577                throw new SocketException("Socket is not connected");
578            }
579        }
580
581        @Override
582        public FileDescriptor getFileDescriptor$() {
583            return socketImpl.getFD$();
584        }
585    }
586
587    /*
588     * This output stream delegates all operations to the associated channel.
589     * Throws an IllegalBlockingModeException if the channel is in non-blocking
590     * mode when performing write operations.
591     */
592    private static class SocketChannelOutputStream extends OutputStream {
593        private final SocketChannel channel;
594
595        public SocketChannelOutputStream(SocketChannel channel) {
596            this.channel = channel;
597        }
598
599        /*
600         * Closes this stream and channel.
601         *
602         * @exception IOException thrown if an error occurs during the close
603         */
604        @Override
605        public void close() throws IOException {
606            channel.close();
607        }
608
609        @Override
610        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
611            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
612            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
613            if (!channel.isBlocking()) {
614                throw new IllegalBlockingModeException();
615            }
616            channel.write(buf);
617        }
618
619        @Override
620        public void write(int oneByte) throws IOException {
621            if (!channel.isBlocking()) {
622                throw new IllegalBlockingModeException();
623            }
624            ByteBuffer buffer = ByteBuffer.allocate(1);
625            buffer.put(0, (byte) (oneByte & 0xFF));
626            channel.write(buffer);
627        }
628    }
629
630    /*
631     * This input stream delegates all operations to the associated channel.
632     * Throws an IllegalBlockingModeException if the channel is in non-blocking
633     * mode when performing read operations.
634     */
635    private static class SocketChannelInputStream extends InputStream {
636        private final SocketChannel channel;
637
638        public SocketChannelInputStream(SocketChannel channel) {
639            this.channel = channel;
640        }
641
642        /*
643         * Closes this stream and channel.
644         */
645        @Override
646        public void close() throws IOException {
647            channel.close();
648        }
649
650        @Override
651        public int read() throws IOException {
652            if (!channel.isBlocking()) {
653                throw new IllegalBlockingModeException();
654            }
655            ByteBuffer buf = ByteBuffer.allocate(1);
656            int result = channel.read(buf);
657            return (result == -1) ? result : (buf.get(0) & 0xff);
658        }
659
660        @Override
661        public int read(byte[] buffer, int offset, int byteCount) throws IOException {
662            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
663            if (!channel.isBlocking()) {
664                throw new IllegalBlockingModeException();
665            }
666            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
667            return channel.read(buf);
668        }
669    }
670}
671