SocketChannelImpl.java revision c6ad01d286af243fd300dd105eb2e4437e0b6b16
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ConnectException;
25import java.net.Inet4Address;
26import java.net.InetAddress;
27import java.net.InetSocketAddress;
28import java.net.PlainSocketImpl;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.net.SocketException;
32import java.net.SocketUtils;
33import java.net.UnknownHostException;
34import java.nio.channels.AlreadyConnectedException;
35import java.nio.channels.ClosedChannelException;
36import java.nio.channels.ConnectionPendingException;
37import java.nio.channels.IllegalBlockingModeException;
38import java.nio.channels.NoConnectionPendingException;
39import java.nio.channels.NotYetConnectedException;
40import java.nio.channels.SocketChannel;
41import java.nio.channels.UnresolvedAddressException;
42import java.nio.channels.UnsupportedAddressTypeException;
43import java.nio.channels.spi.SelectorProvider;
44import java.util.Arrays;
45import libcore.io.ErrnoException;
46import libcore.io.Libcore;
47import libcore.io.IoBridge;
48import libcore.io.IoUtils;
49import static libcore.io.OsConstants.*;
50
51/*
52 * The default implementation class of java.nio.channels.SocketChannel.
53 */
54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel {
55    private static final int SOCKET_STATUS_UNINITIALIZED = -1;
56
57    // Status before connect.
58    private static final int SOCKET_STATUS_UNCONNECTED = 0;
59
60    // Status connection pending.
61    private static final int SOCKET_STATUS_PENDING = 1;
62
63    // Status after connection success.
64    private static final int SOCKET_STATUS_CONNECTED = 2;
65
66    // Status closed.
67    private static final int SOCKET_STATUS_CLOSED = 3;
68
69    private final FileDescriptor fd;
70
71    // Our internal Socket.
72    private SocketAdapter socket = null;
73
74    // The address to be connected.
75    private InetSocketAddress connectAddress = null;
76
77    private InetAddress localAddress = null;
78    private int localPort;
79
80    private int status = SOCKET_STATUS_UNINITIALIZED;
81
82    // Whether the socket is bound.
83    private volatile boolean isBound = false;
84
85    private final Object readLock = new Object();
86
87    private final Object writeLock = new Object();
88
89    /*
90     * Constructor for creating a connected socket channel.
91     */
92    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
93        this(selectorProvider, true);
94    }
95
96    /*
97     * Constructor for creating an optionally connected socket channel.
98     */
99    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
100        super(selectorProvider);
101        status = SOCKET_STATUS_UNCONNECTED;
102        fd = (connect ? IoBridge.socket(true) : new FileDescriptor());
103    }
104
105    /*
106     * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel.
107     */
108    public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException {
109        super(selectorProvider);
110        status = SOCKET_STATUS_CONNECTED;
111        fd = existingFd;
112    }
113
114    /*
115     * Getting the internal Socket If we have not the socket, we create a new
116     * one.
117     */
118    @Override
119    synchronized public Socket socket() {
120        if (socket == null) {
121            try {
122                InetAddress addr = null;
123                int port = 0;
124                if (connectAddress != null) {
125                    addr = connectAddress.getAddress();
126                    port = connectAddress.getPort();
127                }
128                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
129            } catch (SocketException e) {
130                return null;
131            }
132        }
133        return socket;
134    }
135
136    @Override
137    synchronized public boolean isConnected() {
138        return status == SOCKET_STATUS_CONNECTED;
139    }
140
141    /*
142     * Status setting used by other class.
143     */
144    synchronized void setConnected() {
145        status = SOCKET_STATUS_CONNECTED;
146    }
147
148    void setBound(boolean flag) {
149        isBound = flag;
150    }
151
152    @Override
153    synchronized public boolean isConnectionPending() {
154        return status == SOCKET_STATUS_PENDING;
155    }
156
157    @Override
158    public boolean connect(SocketAddress socketAddress) throws IOException {
159        // status must be open and unconnected
160        checkUnconnected();
161
162        // check the address
163        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
164        InetAddress normalAddr = inetSocketAddress.getAddress();
165        int port = inetSocketAddress.getPort();
166
167        // When connecting, map ANY address to localhost
168        if (normalAddr.isAnyLocalAddress()) {
169            normalAddr = InetAddress.getLocalHost();
170        }
171
172        boolean finished = false;
173        try {
174            if (isBlocking()) {
175                begin();
176            }
177            finished = IoBridge.connect(fd, normalAddr, port);
178            isBound = finished;
179        } catch (IOException e) {
180            if (isEINPROGRESS(e)) {
181                status = SOCKET_STATUS_PENDING;
182            } else {
183                if (isOpen()) {
184                    close();
185                    finished = true;
186                }
187                throw e;
188            }
189        } finally {
190            if (isBlocking()) {
191                end(finished);
192            }
193        }
194
195        initLocalAddressAndPort();
196        connectAddress = inetSocketAddress;
197        if (socket != null) {
198            socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(),
199                    connectAddress.getPort());
200        }
201
202        synchronized (this) {
203            if (isBlocking()) {
204                status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED);
205            } else {
206                status = SOCKET_STATUS_PENDING;
207            }
208        }
209        return finished;
210    }
211
212    private boolean isEINPROGRESS(IOException e) {
213        if (isBlocking()) {
214            return false;
215        }
216        if (e instanceof ConnectException) {
217            Throwable cause = e.getCause();
218            if (cause instanceof ErrnoException) {
219                return ((ErrnoException) cause).errno == EINPROGRESS;
220            }
221        }
222        return false;
223    }
224
225    private void initLocalAddressAndPort() {
226        SocketAddress sa;
227        try {
228            sa = Libcore.os.getsockname(fd);
229        } catch (ErrnoException errnoException) {
230            throw new AssertionError(errnoException);
231        }
232        InetSocketAddress isa = (InetSocketAddress) sa;
233        localAddress = isa.getAddress();
234        localPort = isa.getPort();
235        if (socket != null) {
236            socket.socketImpl().initLocalPort(localPort);
237        }
238    }
239
240    @Override
241    public boolean finishConnect() throws IOException {
242        synchronized (this) {
243            if (!isOpen()) {
244                throw new ClosedChannelException();
245            }
246            if (status == SOCKET_STATUS_CONNECTED) {
247                return true;
248            }
249            if (status != SOCKET_STATUS_PENDING) {
250                throw new NoConnectionPendingException();
251            }
252        }
253
254        boolean finished = false;
255        try {
256            begin();
257            InetAddress inetAddress = connectAddress.getAddress();
258            int port = connectAddress.getPort();
259            finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately.
260            isBound = finished;
261        } catch (ConnectException e) {
262            if (isOpen()) {
263                close();
264                finished = true;
265            }
266            throw e;
267        } finally {
268            end(finished);
269        }
270
271        synchronized (this) {
272            status = (finished ? SOCKET_STATUS_CONNECTED : status);
273            isBound = finished;
274        }
275        return finished;
276    }
277
278    void finishAccept() {
279        initLocalAddressAndPort();
280    }
281
282    @Override
283    public int read(ByteBuffer dst) throws IOException {
284        dst.checkWritable();
285        checkOpenConnected();
286        if (!dst.hasRemaining()) {
287            return 0;
288        }
289        return readImpl(dst);
290    }
291
292    @Override
293    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
294        Arrays.checkOffsetAndCount(targets.length, offset, length);
295        checkOpenConnected();
296        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
297        if (totalCount == 0) {
298            return 0;
299        }
300        byte[] readArray = new byte[totalCount];
301        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
302        int readCount;
303        // read data to readBuffer, and then transfer data from readBuffer to targets.
304        readCount = readImpl(readBuffer);
305        readBuffer.flip();
306        if (readCount > 0) {
307            int left = readCount;
308            int index = offset;
309            // transfer data from readArray to targets
310            while (left > 0) {
311                int putLength = Math.min(targets[index].remaining(), left);
312                targets[index].put(readArray, readCount - left, putLength);
313                index++;
314                left -= putLength;
315            }
316        }
317        return readCount;
318    }
319
320    private int readImpl(ByteBuffer dst) throws IOException {
321        synchronized (readLock) {
322            int readCount = 0;
323            try {
324                if (isBlocking()) {
325                    begin();
326                }
327                readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false);
328                if (readCount > 0) {
329                    dst.position(dst.position() + readCount);
330                }
331            } finally {
332                if (isBlocking()) {
333                    end(readCount > 0);
334                }
335            }
336            return readCount;
337        }
338    }
339
340    @Override
341    public int write(ByteBuffer src) throws IOException {
342        if (src == null) {
343            throw new NullPointerException("src == null");
344        }
345        checkOpenConnected();
346        if (!src.hasRemaining()) {
347            return 0;
348        }
349        return writeImpl(src);
350    }
351
352    @Override
353    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
354        Arrays.checkOffsetAndCount(sources.length, offset, length);
355        checkOpenConnected();
356        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
357        if (count == 0) {
358            return 0;
359        }
360        ByteBuffer writeBuf = ByteBuffer.allocate(count);
361        for (int val = offset; val < length + offset; val++) {
362            ByteBuffer source = sources[val];
363            int oldPosition = source.position();
364            writeBuf.put(source);
365            source.position(oldPosition);
366        }
367        writeBuf.flip();
368        int result = writeImpl(writeBuf);
369        int val = offset;
370        int written = result;
371        while (result > 0) {
372            ByteBuffer source = sources[val];
373            int gap = Math.min(result, source.remaining());
374            source.position(source.position() + gap);
375            val++;
376            result -= gap;
377        }
378        return written;
379    }
380
381    private int writeImpl(ByteBuffer src) throws IOException {
382        synchronized (writeLock) {
383            if (!src.hasRemaining()) {
384                return 0;
385            }
386            int writeCount = 0;
387            try {
388                if (isBlocking()) {
389                    begin();
390                }
391                writeCount = IoBridge.sendto(fd, src, 0, null, 0);
392                if (writeCount > 0) {
393                    src.position(src.position() + writeCount);
394                }
395            } finally {
396                if (isBlocking()) {
397                    end(writeCount >= 0);
398                }
399            }
400            return writeCount;
401        }
402    }
403
404    /*
405     * Status check, open and "connected", when read and write.
406     */
407    synchronized private void checkOpenConnected() throws ClosedChannelException {
408        if (!isOpen()) {
409            throw new ClosedChannelException();
410        }
411        if (!isConnected()) {
412            throw new NotYetConnectedException();
413        }
414    }
415
416    /*
417     * Status check, open and "unconnected", before connection.
418     */
419    synchronized private void checkUnconnected() throws IOException {
420        if (!isOpen()) {
421            throw new ClosedChannelException();
422        }
423        if (status == SOCKET_STATUS_CONNECTED) {
424            throw new AlreadyConnectedException();
425        }
426        if (status == SOCKET_STATUS_PENDING) {
427            throw new ConnectionPendingException();
428        }
429    }
430
431    /*
432     * Shared by this class and DatagramChannelImpl, to do the address transfer
433     * and check.
434     */
435    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
436        if (socketAddress == null) {
437            throw new IllegalArgumentException("socketAddress == null");
438        }
439        if (!(socketAddress instanceof InetSocketAddress)) {
440            throw new UnsupportedAddressTypeException();
441        }
442        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
443        if (inetSocketAddress.isUnresolved()) {
444            throw new UnresolvedAddressException();
445        }
446        return inetSocketAddress;
447    }
448
449    /*
450     * Get local address.
451     */
452    public InetAddress getLocalAddress() throws UnknownHostException {
453        return isBound ? localAddress : Inet4Address.ANY;
454    }
455
456    /*
457     * Do really closing action here.
458     */
459    @Override
460    protected synchronized void implCloseSelectableChannel() throws IOException {
461        if (status != SOCKET_STATUS_CLOSED) {
462            status = SOCKET_STATUS_CLOSED;
463            if (socket != null && !socket.isClosed()) {
464                socket.close();
465            } else {
466                IoBridge.closeSocket(fd);
467            }
468        }
469    }
470
471    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
472        synchronized (blockingLock()) {
473            IoUtils.setBlocking(fd, blocking);
474        }
475    }
476
477    /*
478     * Get the fd.
479     */
480    public FileDescriptor getFD() {
481        return fd;
482    }
483
484    /*
485     * Adapter classes for internal socket.
486     */
487    private static class SocketAdapter extends Socket {
488        private final SocketChannelImpl channel;
489        private final PlainSocketImpl socketImpl;
490
491        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException {
492            super(socketImpl);
493            this.socketImpl = socketImpl;
494            this.channel = channel;
495            SocketUtils.setCreated(this);
496        }
497
498        PlainSocketImpl socketImpl() {
499            return socketImpl;
500        }
501
502        @Override
503        public SocketChannel getChannel() {
504            return channel;
505        }
506
507        @Override
508        public boolean isBound() {
509            return channel.isBound;
510        }
511
512        @Override
513        public boolean isConnected() {
514            return channel.isConnected();
515        }
516
517        @Override
518        public InetAddress getLocalAddress() {
519            try {
520                return channel.getLocalAddress();
521            } catch (UnknownHostException e) {
522                return null;
523            }
524        }
525
526        @Override
527        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
528            if (!channel.isBlocking()) {
529                throw new IllegalBlockingModeException();
530            }
531            if (isConnected()) {
532                throw new AlreadyConnectedException();
533            }
534            super.connect(remoteAddr, timeout);
535            channel.initLocalAddressAndPort();
536            if (super.isConnected()) {
537                channel.setConnected();
538                channel.isBound = super.isBound();
539            }
540        }
541
542        @Override
543        public void bind(SocketAddress localAddr) throws IOException {
544            if (channel.isConnected()) {
545                throw new AlreadyConnectedException();
546            }
547            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
548                throw new ConnectionPendingException();
549            }
550            super.bind(localAddr);
551            channel.initLocalAddressAndPort();
552            channel.isBound = true;
553        }
554
555        @Override
556        public void close() throws IOException {
557            synchronized (channel) {
558                if (channel.isOpen()) {
559                    channel.close();
560                } else {
561                    super.close();
562                }
563                channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED;
564            }
565        }
566
567        @Override
568        public OutputStream getOutputStream() throws IOException {
569            checkOpenAndConnected();
570            if (isOutputShutdown()) {
571                throw new SocketException("Socket output is shutdown");
572            }
573            return new SocketChannelOutputStream(channel);
574        }
575
576        @Override
577        public InputStream getInputStream() throws IOException {
578            checkOpenAndConnected();
579            if (isInputShutdown()) {
580                throw new SocketException("Socket input is shutdown");
581            }
582            return new SocketChannelInputStream(channel);
583        }
584
585        private void checkOpenAndConnected() throws SocketException {
586            if (!channel.isOpen()) {
587                throw new SocketException("Socket is closed");
588            }
589            if (!channel.isConnected()) {
590                throw new SocketException("Socket is not connected");
591            }
592        }
593
594        @Override
595        public FileDescriptor getFileDescriptor$() {
596            return socketImpl.getFD$();
597        }
598    }
599
600    /*
601     * This output stream delegates all operations to the associated channel.
602     * Throws an IllegalBlockingModeException if the channel is in non-blocking
603     * mode when performing write operations.
604     */
605    private static class SocketChannelOutputStream extends OutputStream {
606        private final SocketChannel channel;
607
608        public SocketChannelOutputStream(SocketChannel channel) {
609            this.channel = channel;
610        }
611
612        /*
613         * Closes this stream and channel.
614         *
615         * @exception IOException thrown if an error occurs during the close
616         */
617        @Override
618        public void close() throws IOException {
619            channel.close();
620        }
621
622        @Override
623        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
624            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
625            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
626            if (!channel.isBlocking()) {
627                throw new IllegalBlockingModeException();
628            }
629            channel.write(buf);
630        }
631
632        @Override
633        public void write(int oneByte) throws IOException {
634            if (!channel.isBlocking()) {
635                throw new IllegalBlockingModeException();
636            }
637            ByteBuffer buffer = ByteBuffer.allocate(1);
638            buffer.put(0, (byte) (oneByte & 0xFF));
639            channel.write(buffer);
640        }
641    }
642
643    /*
644     * This input stream delegates all operations to the associated channel.
645     * Throws an IllegalBlockingModeException if the channel is in non-blocking
646     * mode when performing read operations.
647     */
648    private static class SocketChannelInputStream extends InputStream {
649        private final SocketChannel channel;
650
651        public SocketChannelInputStream(SocketChannel channel) {
652            this.channel = channel;
653        }
654
655        /*
656         * Closes this stream and channel.
657         */
658        @Override
659        public void close() throws IOException {
660            channel.close();
661        }
662
663        @Override
664        public int read() throws IOException {
665            if (!channel.isBlocking()) {
666                throw new IllegalBlockingModeException();
667            }
668            ByteBuffer buf = ByteBuffer.allocate(1);
669            int result = channel.read(buf);
670            return (result == -1) ? result : (buf.get(0) & 0xff);
671        }
672
673        @Override
674        public int read(byte[] buffer, int offset, int byteCount) throws IOException {
675            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
676            if (!channel.isBlocking()) {
677                throw new IllegalBlockingModeException();
678            }
679            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
680            return channel.read(buf);
681        }
682    }
683}
684