1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17
18package java.nio;
19
20import android.system.ErrnoException;
21import java.io.FileDescriptor;
22import java.io.InterruptedIOException;
23import java.io.IOException;
24import java.net.ConnectException;
25import java.net.DatagramPacket;
26import java.net.DatagramSocket;
27import java.net.DatagramSocketImpl;
28import java.net.Inet4Address;
29import java.net.InetAddress;
30import java.net.InetSocketAddress;
31import java.net.NetworkInterface;
32import java.net.PlainDatagramSocketImpl;
33import java.net.SocketAddress;
34import java.net.SocketException;
35import java.nio.channels.AlreadyConnectedException;
36import java.nio.channels.ClosedChannelException;
37import java.nio.channels.DatagramChannel;
38import java.nio.channels.IllegalBlockingModeException;
39import java.nio.channels.NotYetConnectedException;
40import java.nio.channels.spi.SelectorProvider;
41import java.nio.channels.UnresolvedAddressException;
42import java.nio.channels.UnsupportedAddressTypeException;
43import java.util.Arrays;
44import java.util.Set;
45import libcore.io.IoBridge;
46import libcore.io.IoUtils;
47import libcore.io.Libcore;
48import libcore.util.EmptyArray;
49
50/*
51 * The default implementation class of java.nio.channels.DatagramChannel.
52 */
53class DatagramChannelImpl extends DatagramChannel implements FileDescriptorChannel {
54    // The fd to interact with native code
55    private final FileDescriptor fd;
56
57    // Our internal DatagramSocket.
58    private DatagramSocket socket;
59
60    // The remote address to be connected.
61    InetSocketAddress connectAddress;
62
63    // The local address.
64    InetAddress localAddress;
65
66    // local port
67    private int localPort;
68
69    // At first, uninitialized.
70    boolean connected = false;
71
72    // whether the socket is bound
73    boolean isBound = false;
74
75    private final Object readLock = new Object();
76    private final Object writeLock = new Object();
77
78    /*
79     * Constructor
80     */
81    protected DatagramChannelImpl(SelectorProvider selectorProvider) throws IOException {
82        super(selectorProvider);
83        fd = IoBridge.socket(false);
84    }
85
86    /*
87     * for native call
88     */
89    @SuppressWarnings("unused")
90    private DatagramChannelImpl() {
91        super(SelectorProvider.provider());
92        fd = new FileDescriptor();
93        connectAddress = new InetSocketAddress(0);
94    }
95
96    /*
97     * Getting the internal DatagramSocket If we have not the socket, we create
98     * a new one.
99     */
100    @Override
101    synchronized public DatagramSocket socket() {
102        if (socket == null) {
103            socket = new DatagramSocketAdapter(new PlainDatagramSocketImpl(fd, localPort), this);
104        }
105        return socket;
106    }
107
108    /**
109     * Initialise the isBound, localAddress and localPort state from the file descriptor. Used when
110     * some or all of the bound state has been left to the OS to decide, or when the Socket handled
111     * bind() or connect().
112     *
113     * @param updateSocketState
114     *        if the associated socket (if present) needs to be updated
115     * @hide used to sync state, non-private to avoid synthetic method
116     */
117    void onBind(boolean updateSocketState) {
118        SocketAddress sa;
119        try {
120            sa = Libcore.os.getsockname(fd);
121        } catch (ErrnoException errnoException) {
122            throw new AssertionError(errnoException);
123        }
124        isBound = true;
125        InetSocketAddress localSocketAddress = (InetSocketAddress) sa;
126        localAddress = localSocketAddress.getAddress();
127        localPort = localSocketAddress.getPort();
128        if (updateSocketState && socket != null) {
129            socket.onBind(localAddress, localPort);
130        }
131    }
132
133    @Override
134    synchronized public boolean isConnected() {
135        return connected;
136    }
137
138    @Override
139    synchronized public DatagramChannel connect(SocketAddress address) throws IOException {
140        // must be open
141        checkOpen();
142        // status must be un-connected.
143        if (connected) {
144            throw new IllegalStateException();
145        }
146
147        // check the address
148        InetSocketAddress inetSocketAddress = SocketChannelImpl.validateAddress(address);
149        InetAddress remoteAddress = inetSocketAddress.getAddress();
150        int remotePort = inetSocketAddress.getPort();
151        try {
152            begin();
153            IoBridge.connect(fd, remoteAddress, remotePort);
154        } catch (ConnectException e) {
155            // ConnectException means connect fail, not exception
156        } finally {
157            end(true);
158        }
159
160        // connect() performs a bind() if an explicit bind() was not performed. Keep the local
161        // address state held by the channel and the socket up to date.
162        if (!isBound) {
163            onBind(true /* updateSocketState */);
164        }
165
166        // Keep the connected state held by the channel and the socket up to date.
167        onConnect(remoteAddress, remotePort, true /* updateSocketState */);
168        return this;
169    }
170
171    /**
172     * Initialize the state associated with being connected, optionally syncing the socket if there
173     * is one.
174     * @hide used to sync state, non-private to avoid synthetic method
175     */
176    void onConnect(InetAddress remoteAddress, int remotePort, boolean updateSocketState) {
177        connected = true;
178        connectAddress = new InetSocketAddress(remoteAddress, remotePort);
179        if (updateSocketState && socket != null) {
180            socket.onConnect(remoteAddress, remotePort);
181        }
182    }
183
184    @Override
185    synchronized public DatagramChannel disconnect() throws IOException {
186        if (!isConnected() || !isOpen()) {
187            return this;
188        }
189
190        // Keep the disconnected state held by the channel and the socket up to date.
191        onDisconnect(true /* updateSocketState */);
192
193        try {
194            Libcore.os.connect(fd, InetAddress.UNSPECIFIED, 0);
195        } catch (ErrnoException errnoException) {
196            throw errnoException.rethrowAsIOException();
197        }
198        return this;
199    }
200
201    /**
202     * Initialize the state associated with being disconnected, optionally syncing the socket if
203     * there is one.
204     * @hide used to sync state, non-private to avoid synthetic method
205     */
206    void onDisconnect(boolean updateSocketState) {
207        connected = false;
208        connectAddress = null;
209        if (updateSocketState && socket != null && socket.isConnected()) {
210            socket.onDisconnect();
211        }
212    }
213
214    @Override
215    public SocketAddress receive(ByteBuffer target) throws IOException {
216        target.checkWritable();
217        checkOpen();
218
219        if (!isBound) {
220            return null;
221        }
222
223        SocketAddress retAddr = null;
224        try {
225            begin();
226
227            // receive real data packet, (not peek)
228            synchronized (readLock) {
229                boolean loop = isBlocking();
230                if (!target.isDirect()) {
231                    retAddr = receiveImpl(target, loop);
232                } else {
233                    retAddr = receiveDirectImpl(target, loop);
234                }
235            }
236        } catch (InterruptedIOException e) {
237            // this line used in Linux
238            return null;
239        } finally {
240            end(retAddr != null);
241        }
242        return retAddr;
243    }
244
245    private SocketAddress receiveImpl(ByteBuffer target, boolean loop) throws IOException {
246        SocketAddress retAddr = null;
247        DatagramPacket receivePacket;
248        int oldposition = target.position();
249        int received;
250        // TODO: disallow mapped buffers and lose this conditional?
251        if (target.hasArray()) {
252            receivePacket = new DatagramPacket(target.array(), target.position() + target.arrayOffset(), target.remaining());
253        } else {
254            receivePacket = new DatagramPacket(new byte[target.remaining()], target.remaining());
255        }
256        do {
257            received = IoBridge.recvfrom(false, fd, receivePacket.getData(), receivePacket.getOffset(), receivePacket.getLength(), 0, receivePacket, isConnected());
258            if (receivePacket.getAddress() != null) {
259                if (received > 0) {
260                    if (target.hasArray()) {
261                        target.position(oldposition + received);
262                    } else {
263                        // copy the data of received packet
264                        target.put(receivePacket.getData(), 0, received);
265                    }
266                }
267                retAddr = receivePacket.getSocketAddress();
268                break;
269            }
270        } while (loop);
271        return retAddr;
272    }
273
274    private SocketAddress receiveDirectImpl(ByteBuffer target, boolean loop) throws IOException {
275        SocketAddress retAddr = null;
276        DatagramPacket receivePacket = new DatagramPacket(EmptyArray.BYTE, 0);
277        int oldposition = target.position();
278        int received;
279        do {
280            received = IoBridge.recvfrom(false, fd, target, 0, receivePacket, isConnected());
281            if (receivePacket.getAddress() != null) {
282                // copy the data of received packet
283                if (received > 0) {
284                    target.position(oldposition + received);
285                }
286                retAddr = receivePacket.getSocketAddress();
287                break;
288            }
289        } while (loop);
290        return retAddr;
291    }
292
293    @Override
294    public int send(ByteBuffer source, SocketAddress socketAddress) throws IOException {
295        checkNotNull(source);
296        checkOpen();
297
298        InetSocketAddress isa = (InetSocketAddress) socketAddress;
299        if (isa.getAddress() == null) {
300            throw new IOException();
301        }
302
303        if (isConnected() && !connectAddress.equals(isa)) {
304            throw new IllegalArgumentException("Connected to " + connectAddress +
305                                               ", not " + socketAddress);
306        }
307
308        synchronized (writeLock) {
309            int sendCount = 0;
310            try {
311                begin();
312                int oldPosition = source.position();
313                sendCount = IoBridge.sendto(fd, source, 0, isa.getAddress(), isa.getPort());
314                if (sendCount > 0) {
315                    source.position(oldPosition + sendCount);
316                }
317                if (!isBound) {
318                    onBind(true /* updateSocketState */);
319                }
320            } finally {
321                end(sendCount >= 0);
322            }
323            return sendCount;
324        }
325    }
326
327    @Override
328    public int read(ByteBuffer target) throws IOException {
329        target.checkWritable();
330        checkOpenConnected();
331
332        if (!target.hasRemaining()) {
333            return 0;
334        }
335
336        int readCount;
337        if (target.isDirect() || target.hasArray()) {
338            readCount = readImpl(target);
339            if (readCount > 0) {
340                target.position(target.position() + readCount);
341            }
342
343        } else {
344            byte[] readArray = new byte[target.remaining()];
345            ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
346            readCount = readImpl(readBuffer);
347            if (readCount > 0) {
348                target.put(readArray, 0, readCount);
349            }
350        }
351        return readCount;
352    }
353
354    @Override
355    public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
356        Arrays.checkOffsetAndCount(targets.length, offset, length);
357
358        // status must be open and connected
359        checkOpenConnected();
360        int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
361        if (totalCount == 0) {
362            return 0;
363        }
364
365        // read data to readBuffer, and then transfer data from readBuffer to
366        // targets.
367        ByteBuffer readBuffer = ByteBuffer.allocate(totalCount);
368        int readCount;
369        readCount = readImpl(readBuffer);
370        int left = readCount;
371        int index = offset;
372        // transfer data from readBuffer to targets
373        byte[] readArray = readBuffer.array();
374        while (left > 0) {
375            int putLength = Math.min(targets[index].remaining(), left);
376            targets[index].put(readArray, readCount - left, putLength);
377            index++;
378            left -= putLength;
379        }
380        return readCount;
381    }
382
383    /*
384     * read from channel, and store the result in the target.
385     */
386    private int readImpl(ByteBuffer dst) throws IOException {
387        synchronized (readLock) {
388            int readCount = 0;
389            try {
390                begin();
391                readCount = IoBridge.recvfrom(false, fd, dst, 0, null, isConnected());
392            } catch (InterruptedIOException e) {
393                // InterruptedIOException will be thrown when timeout.
394                return 0;
395            } finally {
396                end(readCount > 0);
397            }
398            return readCount;
399        }
400    }
401
402    @Override public int write(ByteBuffer src) throws IOException {
403        checkNotNull(src);
404        checkOpenConnected();
405        if (!src.hasRemaining()) {
406            return 0;
407        }
408
409        int writeCount = writeImpl(src);
410        if (writeCount > 0) {
411            src.position(src.position() + writeCount);
412        }
413        return writeCount;
414    }
415
416    /**
417     * @see java.nio.channels.DatagramChannel#write(java.nio.ByteBuffer[], int,
418     *      int)
419     */
420    @Override
421    public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
422        Arrays.checkOffsetAndCount(sources.length, offset, length);
423
424        // status must be open and connected
425        checkOpenConnected();
426        int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
427        if (count == 0) {
428            return 0;
429        }
430        ByteBuffer writeBuf = ByteBuffer.allocate(count);
431        for (int val = offset; val < length + offset; val++) {
432            ByteBuffer source = sources[val];
433            int oldPosition = source.position();
434            writeBuf.put(source);
435            source.position(oldPosition);
436        }
437        writeBuf.flip();
438        int result = writeImpl(writeBuf);
439        int val = offset;
440        int written = result;
441        while (result > 0) {
442            ByteBuffer source = sources[val];
443            int gap = Math.min(result, source.remaining());
444            source.position(source.position() + gap);
445            val++;
446            result -= gap;
447        }
448        return written;
449    }
450
451    private int writeImpl(ByteBuffer buf) throws IOException {
452        synchronized (writeLock) {
453            int result = 0;
454            try {
455                begin();
456                result = IoBridge.sendto(fd, buf, 0, null, 0);
457            } finally {
458                end(result > 0);
459            }
460            return result;
461        }
462    }
463
464    @Override protected synchronized void implCloseSelectableChannel() throws IOException {
465        // A closed channel is not connected.
466        onDisconnect(true /* updateSocketState */);
467        IoBridge.closeAndSignalBlockedThreads(fd);
468
469        if (socket != null && !socket.isClosed()) {
470            socket.onClose();
471        }
472    }
473
474    @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
475        IoUtils.setBlocking(fd, blocking);
476    }
477
478    /*
479     * Status check, must be open.
480     */
481    private void checkOpen() throws ClosedChannelException {
482        if (!isOpen()) {
483            throw new ClosedChannelException();
484        }
485    }
486
487    /*
488     * Status check, must be open and connected, for read and write.
489     */
490    private void checkOpenConnected() throws IOException {
491        checkOpen();
492        if (!isConnected()) {
493            throw new NotYetConnectedException();
494        }
495    }
496
497    /*
498     * Buffer check, must not null
499     */
500    private void checkNotNull(ByteBuffer source) {
501        if (source == null) {
502            throw new NullPointerException("source == null");
503        }
504    }
505
506    /*
507     * Get the fd for internal use.
508     */
509    public FileDescriptor getFD() {
510        return fd;
511    }
512
513    /*
514     * The adapter class of DatagramSocket
515     */
516    private static class DatagramSocketAdapter extends DatagramSocket {
517
518        /*
519         * The internal datagramChannelImpl.
520         */
521        private final DatagramChannelImpl channelImpl;
522
523        /*
524         * Constructor initialize the datagramSocketImpl and datagramChannelImpl
525         */
526        DatagramSocketAdapter(DatagramSocketImpl socketimpl, DatagramChannelImpl channelImpl) {
527            super(socketimpl);
528            this.channelImpl = channelImpl;
529
530            // Sync state socket state with the channel it is being created from
531            if (channelImpl.isBound) {
532                onBind(channelImpl.localAddress, channelImpl.localPort);
533            }
534            if (channelImpl.connected) {
535                onConnect(
536                        channelImpl.connectAddress.getAddress(),
537                        channelImpl.connectAddress.getPort());
538            } else {
539                onDisconnect();
540            }
541            if (!channelImpl.isOpen()) {
542                onClose();
543            }
544        }
545
546        /*
547         * Get the internal datagramChannelImpl
548         */
549        @Override
550        public DatagramChannel getChannel() {
551            return channelImpl;
552        }
553
554        @Override
555        public void bind(SocketAddress localAddr) throws SocketException {
556            if (channelImpl.isConnected()) {
557                throw new AlreadyConnectedException();
558            }
559            super.bind(localAddr);
560            channelImpl.onBind(false /* updateSocketState */);
561        }
562
563        @Override
564        public void connect(SocketAddress peer) throws SocketException {
565            if (isConnected()) {
566                // RI compatibility: If the socket is already connected this fails.
567                throw new IllegalStateException("Socket is already connected.");
568            }
569            super.connect(peer);
570            // Connect may have performed an implicit bind(). Sync up here.
571            channelImpl.onBind(false /* updateSocketState */);
572
573            InetSocketAddress inetSocketAddress = (InetSocketAddress) peer;
574            channelImpl.onConnect(
575                    inetSocketAddress.getAddress(), inetSocketAddress.getPort(),
576                    false /* updateSocketState */);
577        }
578
579        @Override
580        public void connect(InetAddress address, int port) {
581            // To avoid implementing connect() twice call this.connect(SocketAddress) in preference
582            // to super.connect().
583            try {
584                connect(new InetSocketAddress(address, port));
585            } catch (SocketException e) {
586                // Ignored - there is nothing we can report here.
587            }
588        }
589
590        @Override
591        public void receive(DatagramPacket packet) throws IOException {
592            if (!channelImpl.isBlocking()) {
593                throw new IllegalBlockingModeException();
594            }
595
596            boolean wasBound = isBound();
597            super.receive(packet);
598            if (!wasBound) {
599                // DatagramSocket.receive() will implicitly bind if it hasn't been done explicitly.
600                // Sync the channel state with the socket.
601                channelImpl.onBind(false /* updateSocketState */);
602            }
603        }
604
605        @Override
606        public void send(DatagramPacket packet) throws IOException {
607            if (!channelImpl.isBlocking()) {
608                throw new IllegalBlockingModeException();
609            }
610
611            // DatagramSocket.send() will implicitly bind if it hasn't been done explicitly. Force
612            // bind() here so that the channel state stays in sync with the socket.
613            boolean wasBound = isBound();
614            super.send(packet);
615            if (!wasBound) {
616                // DatagramSocket.send() will implicitly bind if it hasn't been done explicitly.
617                // Sync the channel state with the socket.
618                channelImpl.onBind(false /* updateSocketState */);
619            }
620        }
621
622        @Override
623        public void close() {
624            synchronized (channelImpl) {
625                super.close();
626                if (channelImpl.isOpen()) {
627                    try {
628                        channelImpl.close();
629                    } catch (IOException e) {
630                        // Ignore
631                    }
632                }
633            }
634        }
635
636        @Override
637        public void disconnect() {
638            super.disconnect();
639            channelImpl.onDisconnect(false /* updateSocketState */);
640        }
641    }
642}
643