SocketChannelImpl.java revision e27d02044ec399884bfd4343d513bf270b9b9b11
1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import java.io.FileDescriptor;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ConnectException;
25import java.net.Inet4Address;
26import java.net.InetAddress;
27import java.net.InetSocketAddress;
28import java.net.PlainSocketImpl;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.net.SocketException;
32import java.net.SocketUtils;
33import java.net.UnknownHostException;
34import java.nio.channels.AlreadyConnectedException;
35import java.nio.channels.ClosedChannelException;
36import java.nio.channels.ConnectionPendingException;
37import java.nio.channels.IllegalBlockingModeException;
38import java.nio.channels.NoConnectionPendingException;
39import java.nio.channels.NotYetConnectedException;
40import java.nio.channels.SocketChannel;
41import java.nio.channels.UnresolvedAddressException;
42import java.nio.channels.UnsupportedAddressTypeException;
43import java.nio.channels.spi.SelectorProvider;
44import java.util.Arrays;
45import libcore.io.ErrnoException;
46import libcore.io.Libcore;
47import libcore.io.IoUtils;
48import org.apache.harmony.luni.platform.Platform;
49import static libcore.io.OsConstants.*;
50
51/*
52 * The default implementation class of java.nio.channels.SocketChannel.
53 */
54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel {
55    private static final int SOCKET_STATUS_UNINITIALIZED = -1;
56
57    // Status before connect.
58    private static final int SOCKET_STATUS_UNCONNECTED = 0;
59
60    // Status connection pending.
61    private static final int SOCKET_STATUS_PENDING = 1;
62
63    // Status after connection success.
64    private static final int SOCKET_STATUS_CONNECTED = 2;
65
66    // Status closed.
67    private static final int SOCKET_STATUS_CLOSED = 3;
68
69    private final FileDescriptor fd;
70
71    // Our internal Socket.
72    private SocketAdapter socket = null;
73
74    // The address to be connected.
75    private InetSocketAddress connectAddress = null;
76
77    private InetAddress localAddress = null;
78    private int localPort;
79
80    private int status = SOCKET_STATUS_UNINITIALIZED;
81
82    // Whether the socket is bound.
83    private volatile boolean isBound = false;
84
85    private final Object readLock = new Object();
86
87    private final Object writeLock = new Object();
88
89    /*
90     * Constructor for creating a connected socket channel.
91     */
92    public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException {
93        this(selectorProvider, true);
94    }
95
96    /*
97     * Constructor for creating an optionally connected socket channel.
98     */
99    public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException {
100        super(selectorProvider);
101        status = SOCKET_STATUS_UNCONNECTED;
102        fd = (connect ? IoUtils.socket(true) : new FileDescriptor());
103    }
104
105    /*
106     * Getting the internal Socket If we have not the socket, we create a new
107     * one.
108     */
109    @Override
110    synchronized public Socket socket() {
111        if (socket == null) {
112            try {
113                InetAddress addr = null;
114                int port = 0;
115                if (connectAddress != null) {
116                    addr = connectAddress.getAddress();
117                    port = connectAddress.getPort();
118                }
119                socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this);
120            } catch (SocketException e) {
121                return null;
122            }
123        }
124        return socket;
125    }
126
127    @Override
128    synchronized public boolean isConnected() {
129        return status == SOCKET_STATUS_CONNECTED;
130    }
131
132    /*
133     * Status setting used by other class.
134     */
135    synchronized void setConnected() {
136        status = SOCKET_STATUS_CONNECTED;
137    }
138
139    void setBound(boolean flag) {
140        isBound = flag;
141    }
142
143    @Override
144    synchronized public boolean isConnectionPending() {
145        return status == SOCKET_STATUS_PENDING;
146    }
147
148    @Override
149    public boolean connect(SocketAddress socketAddress) throws IOException {
150        // status must be open and unconnected
151        checkUnconnected();
152
153        // check the address
154        InetSocketAddress inetSocketAddress = validateAddress(socketAddress);
155        InetAddress normalAddr = inetSocketAddress.getAddress();
156        int port = inetSocketAddress.getPort();
157
158        // When connecting, map ANY address to localhost
159        if (normalAddr.isAnyLocalAddress()) {
160            normalAddr = InetAddress.getLocalHost();
161        }
162
163        boolean finished = false;
164        try {
165            if (isBlocking()) {
166                begin();
167            }
168            finished = IoUtils.connect(fd, normalAddr, port);
169            isBound = finished;
170        } catch (IOException e) {
171            if (e instanceof ConnectException && !isBlocking()) {
172                status = SOCKET_STATUS_PENDING;
173            } else {
174                if (isOpen()) {
175                    close();
176                    finished = true;
177                }
178                throw e;
179            }
180        } finally {
181            if (isBlocking()) {
182                end(finished);
183            }
184        }
185
186        initLocalAddressAndPort();
187        connectAddress = inetSocketAddress;
188        if (socket != null) {
189            socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(),
190                    connectAddress.getPort());
191        }
192
193        synchronized (this) {
194            if (isBlocking()) {
195                status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED);
196            } else {
197                status = SOCKET_STATUS_PENDING;
198            }
199        }
200        return finished;
201    }
202
203    private void initLocalAddressAndPort() {
204        SocketAddress sa = Libcore.os.getsockname(fd);
205        InetSocketAddress isa = (InetSocketAddress) sa;
206        localAddress = isa.getAddress();
207        localPort = isa.getPort();
208        if (socket != null) {
209            socket.socketImpl().initLocalPort(localPort);
210        }
211    }
212
213    @Override
214    public boolean finishConnect() throws IOException {
215        synchronized (this) {
216            if (!isOpen()) {
217                throw new ClosedChannelException();
218            }
219            if (status == SOCKET_STATUS_CONNECTED) {
220                return true;
221            }
222            if (status != SOCKET_STATUS_PENDING) {
223                throw new NoConnectionPendingException();
224            }
225        }
226
227        boolean finished = false;
228        try {
229            begin();
230            InetAddress inetAddress = connectAddress.getAddress();
231            int port = connectAddress.getPort();
232            finished = IoUtils.isConnected(fd, inetAddress, port, 0, 0); // Return immediately.
233            isBound = finished;
234        } catch (ConnectException e) {
235            if (isOpen()) {
236                close();
237                finished = true;
238            }
239            throw e;
240        } finally {
241            end(finished);
242        }
243
244        synchronized (this) {
245            status = (finished ? SOCKET_STATUS_CONNECTED : status);
246            isBound = finished;
247        }
248        return finished;
249    }
250
251    void finishAccept() {
252        initLocalAddressAndPort();
253    }
254
255    @Override
256    public int read(ByteBuffer target) throws IOException {
257        FileChannelImpl.checkWritable(target);
258        checkOpenConnected();
259        if (!target.hasRemaining()) {
260            return 0;
261        }
262
263        int readCount;
264        if (target.isDirect() || target.hasArray()) {
265            readCount = readImpl(target);
266            if (readCount > 0) {
267                target.position(target.position() + readCount);
268            }
269        } else {
270            ByteBuffer readBuffer = null;
271            byte[] readArray = null;
272            readArray = new byte[target.remaining()];
273            readBuffer = ByteBuffer.wrap(readArray);
274            readCount = readImpl(readBuffer);
275            if (readCount > 0) {
276                target.put(readArray, 0, readCount);
277            }
278        }
279        return readCount;
280    }
281
282    @Override
283    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
284        Arrays.checkOffsetAndCount(targets.length, offset, length);
285        checkOpenConnected();
286        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
287        if (totalCount == 0) {
288            return 0;
289        }
290        byte[] readArray = new byte[totalCount];
291        ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
292        int readCount;
293        // read data to readBuffer, and then transfer data from readBuffer to
294        // targets.
295        readCount = readImpl(readBuffer);
296        if (readCount > 0) {
297            int left = readCount;
298            int index = offset;
299            // transfer data from readArray to targets
300            while (left > 0) {
301                int putLength = Math.min(targets[index].remaining(), left);
302                targets[index].put(readArray, readCount - left, putLength);
303                index++;
304                left -= putLength;
305            }
306        }
307        return readCount;
308    }
309
310    /**
311     * Read from channel, and store the result in the target.
312     *
313     * @param target
314     *            output parameter
315     */
316    private int readImpl(ByteBuffer target) throws IOException {
317        synchronized (readLock) {
318            int readCount = 0;
319            try {
320                if (isBlocking()) {
321                    begin();
322                }
323                int offset = target.position();
324                int length = target.remaining();
325                if (target.isDirect()) {
326                    int address = NioUtils.getDirectBufferAddress(target);
327                    readCount = Platform.NETWORK.readDirect(fd, address + offset, length);
328                } else {
329                    // target is assured to have array.
330                    byte[] array = target.array();
331                    offset += target.arrayOffset();
332                    readCount = Platform.NETWORK.read(fd, array, offset, length);
333                }
334                return readCount;
335            } finally {
336                if (isBlocking()) {
337                    end(readCount > 0);
338                }
339            }
340        }
341    }
342
343    @Override
344    public int write(ByteBuffer source) throws IOException {
345        if (source == null) {
346            throw new NullPointerException();
347        }
348        checkOpenConnected();
349        if (!source.hasRemaining()) {
350            return 0;
351        }
352        return writeImpl(source);
353    }
354
355    @Override
356    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
357        Arrays.checkOffsetAndCount(sources.length, offset, length);
358        checkOpenConnected();
359        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
360        if (count == 0) {
361            return 0;
362        }
363        ByteBuffer writeBuf = ByteBuffer.allocate(count);
364        for (int val = offset; val < length + offset; val++) {
365            ByteBuffer source = sources[val];
366            int oldPosition = source.position();
367            writeBuf.put(source);
368            source.position(oldPosition);
369        }
370        writeBuf.flip();
371        int result = writeImpl(writeBuf);
372        int val = offset;
373        int written = result;
374        while (result > 0) {
375            ByteBuffer source = sources[val];
376            int gap = Math.min(result, source.remaining());
377            source.position(source.position() + gap);
378            val++;
379            result -= gap;
380        }
381        return written;
382    }
383
384    /*
385     * Write the source. return the count of bytes written.
386     */
387    private int writeImpl(ByteBuffer source) throws IOException {
388        synchronized (writeLock) {
389            if (!source.hasRemaining()) {
390                return 0;
391            }
392            int writeCount = 0;
393            try {
394                int pos = source.position();
395                int length = source.remaining();
396                if (isBlocking()) {
397                    begin();
398                }
399                if (source.isDirect()) {
400                    int address = NioUtils.getDirectBufferAddress(source);
401                    writeCount = Platform.NETWORK.writeDirect(fd, address, pos, length);
402                } else if (source.hasArray()) {
403                    pos += source.arrayOffset();
404                    writeCount = Platform.NETWORK.write(fd, source.array(), pos, length);
405                } else {
406                    byte[] array = new byte[length];
407                    source.get(array);
408                    writeCount = Platform.NETWORK.write(fd, array, 0, length);
409                }
410                source.position(pos + writeCount);
411            } finally {
412                if (isBlocking()) {
413                    end(writeCount >= 0);
414                }
415            }
416            return writeCount;
417        }
418    }
419
420    /*
421     * Status check, open and "connected", when read and write.
422     */
423    synchronized private void checkOpenConnected() throws ClosedChannelException {
424        if (!isOpen()) {
425            throw new ClosedChannelException();
426        }
427        if (!isConnected()) {
428            throw new NotYetConnectedException();
429        }
430    }
431
432    /*
433     * Status check, open and "unconnected", before connection.
434     */
435    synchronized private void checkUnconnected() throws IOException {
436        if (!isOpen()) {
437            throw new ClosedChannelException();
438        }
439        if (status == SOCKET_STATUS_CONNECTED) {
440            throw new AlreadyConnectedException();
441        }
442        if (status == SOCKET_STATUS_PENDING) {
443            throw new ConnectionPendingException();
444        }
445    }
446
447    /*
448     * Shared by this class and DatagramChannelImpl, to do the address transfer
449     * and check.
450     */
451    static InetSocketAddress validateAddress(SocketAddress socketAddress) {
452        if (socketAddress == null) {
453            throw new IllegalArgumentException();
454        }
455        if (!(socketAddress instanceof InetSocketAddress)) {
456            throw new UnsupportedAddressTypeException();
457        }
458        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
459        if (inetSocketAddress.isUnresolved()) {
460            throw new UnresolvedAddressException();
461        }
462        return inetSocketAddress;
463    }
464
465    /*
466     * Get local address.
467     */
468    public InetAddress getLocalAddress() throws UnknownHostException {
469        return isBound ? localAddress : Inet4Address.ANY;
470    }
471
472    /*
473     * Do really closing action here.
474     */
475    @Override
476    protected synchronized void implCloseSelectableChannel() throws IOException {
477        if (status != SOCKET_STATUS_CLOSED) {
478            status = SOCKET_STATUS_CLOSED;
479            if (socket != null && !socket.isClosed()) {
480                socket.close();
481            } else {
482                IoUtils.closeSocket(fd);
483            }
484        }
485    }
486
487    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
488        synchronized (blockingLock()) {
489            IoUtils.setBlocking(fd, blocking);
490        }
491    }
492
493    /*
494     * Get the fd.
495     */
496    public FileDescriptor getFD() {
497        return fd;
498    }
499
500    /*
501     * Adapter classes for internal socket.
502     */
503    private static class SocketAdapter extends Socket {
504        private final SocketChannelImpl channel;
505        private final PlainSocketImpl socketImpl;
506
507        SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException {
508            super(socketImpl);
509            this.socketImpl = socketImpl;
510            this.channel = channel;
511            SocketUtils.setCreated(this);
512        }
513
514        PlainSocketImpl socketImpl() {
515            return socketImpl;
516        }
517
518        @Override
519        public SocketChannel getChannel() {
520            return channel;
521        }
522
523        @Override
524        public boolean isBound() {
525            return channel.isBound;
526        }
527
528        @Override
529        public boolean isConnected() {
530            return channel.isConnected();
531        }
532
533        @Override
534        public InetAddress getLocalAddress() {
535            try {
536                return channel.getLocalAddress();
537            } catch (UnknownHostException e) {
538                return null;
539            }
540        }
541
542        @Override
543        public void connect(SocketAddress remoteAddr, int timeout) throws IOException {
544            if (!channel.isBlocking()) {
545                throw new IllegalBlockingModeException();
546            }
547            if (isConnected()) {
548                throw new AlreadyConnectedException();
549            }
550            super.connect(remoteAddr, timeout);
551            channel.initLocalAddressAndPort();
552            if (super.isConnected()) {
553                channel.setConnected();
554                channel.isBound = super.isBound();
555            }
556        }
557
558        @Override
559        public void bind(SocketAddress localAddr) throws IOException {
560            if (channel.isConnected()) {
561                throw new AlreadyConnectedException();
562            }
563            if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) {
564                throw new ConnectionPendingException();
565            }
566            super.bind(localAddr);
567            // keep here to see if need next version
568            // channel.Address = getLocalSocketAddress();
569            // channel.localport = getLocalPort();
570            channel.isBound = true;
571        }
572
573        @Override
574        public void close() throws IOException {
575            synchronized (channel) {
576                if (channel.isOpen()) {
577                    channel.close();
578                } else {
579                    super.close();
580                }
581                channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED;
582            }
583        }
584
585        @Override
586        public OutputStream getOutputStream() throws IOException {
587            checkOpenAndConnected();
588            if (isOutputShutdown()) {
589                throw new SocketException("Socket output is shutdown");
590            }
591            return new SocketChannelOutputStream(channel);
592        }
593
594        @Override
595        public InputStream getInputStream() throws IOException {
596            checkOpenAndConnected();
597            if (isInputShutdown()) {
598                throw new SocketException("Socket input is shutdown");
599            }
600            return new SocketChannelInputStream(channel);
601        }
602
603        private void checkOpenAndConnected() throws SocketException {
604            if (!channel.isOpen()) {
605                throw new SocketException("Socket is closed");
606            }
607            if (!channel.isConnected()) {
608                throw new SocketException("Socket is not connected");
609            }
610        }
611    }
612
613    /*
614     * This output stream delegates all operations to the associated channel.
615     * Throws an IllegalBlockingModeException if the channel is in non-blocking
616     * mode when performing write operations.
617     */
618    private static class SocketChannelOutputStream extends OutputStream {
619        private final SocketChannel channel;
620
621        public SocketChannelOutputStream(SocketChannel channel) {
622            this.channel = channel;
623        }
624
625        /*
626         * Closes this stream and channel.
627         *
628         * @exception IOException thrown if an error occurs during the close
629         */
630        @Override
631        public void close() throws IOException {
632            channel.close();
633        }
634
635        @Override
636        public void write(byte[] buffer, int offset, int byteCount) throws IOException {
637            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
638            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
639            if (!channel.isBlocking()) {
640                throw new IllegalBlockingModeException();
641            }
642            channel.write(buf);
643        }
644
645        @Override
646        public void write(int oneByte) throws IOException {
647            if (!channel.isBlocking()) {
648                throw new IllegalBlockingModeException();
649            }
650            ByteBuffer buffer = ByteBuffer.allocate(1);
651            buffer.put(0, (byte) (oneByte & 0xFF));
652            channel.write(buffer);
653        }
654    }
655
656    /*
657     * This input stream delegates all operations to the associated channel.
658     * Throws an IllegalBlockingModeException if the channel is in non-blocking
659     * mode when performing read operations.
660     */
661    private static class SocketChannelInputStream extends InputStream {
662        private final SocketChannel channel;
663
664        public SocketChannelInputStream(SocketChannel channel) {
665            this.channel = channel;
666        }
667
668        /*
669         * Closes this stream and channel.
670         */
671        @Override
672        public void close() throws IOException {
673            channel.close();
674        }
675
676        @Override
677        public int read() throws IOException {
678            if (!channel.isBlocking()) {
679                throw new IllegalBlockingModeException();
680            }
681            ByteBuffer buf = ByteBuffer.allocate(1);
682            int result = channel.read(buf);
683            return (result == -1) ? result : (buf.get(0) & 0xff);
684        }
685
686        @Override
687        public int read(byte[] buffer, int offset, int byteCount) throws IOException {
688            Arrays.checkOffsetAndCount(buffer.length, offset, byteCount);
689            if (!channel.isBlocking()) {
690                throw new IllegalBlockingModeException();
691            }
692            ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount);
693            return channel.read(buf);
694        }
695    }
696}
697