1/*
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.io.FileDescriptor;
29import java.io.IOException;
30import java.net.InetSocketAddress;
31import java.net.SocketAddress;
32import java.nio.ByteBuffer;
33import java.nio.channels.AlreadyConnectedException;
34import java.nio.channels.AsynchronousChannel;
35import java.nio.channels.AsynchronousCloseException;
36import java.nio.channels.ClosedChannelException;
37import java.nio.channels.CompletionHandler;
38import java.nio.channels.ConnectionPendingException;
39import java.nio.channels.InterruptedByTimeoutException;
40import java.nio.channels.ShutdownChannelGroupException;
41import java.security.AccessController;
42import java.util.concurrent.Future;
43import java.util.concurrent.TimeUnit;
44
45import dalvik.system.CloseGuard;
46import sun.net.NetHooks;
47import sun.security.action.GetPropertyAction;
48
49/**
50 * Unix implementation of AsynchronousSocketChannel
51 */
52
53class UnixAsynchronousSocketChannelImpl
54    extends AsynchronousSocketChannelImpl implements Port.PollableChannel
55{
56    private final static NativeDispatcher nd = new SocketDispatcher();
57    private static enum OpType { CONNECT, READ, WRITE };
58
59    private static final boolean disableSynchronousRead;
60    static {
61        String propValue = AccessController.doPrivileged(
62            new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false"));
63        disableSynchronousRead = (propValue.length() == 0) ?
64            true : Boolean.valueOf(propValue);
65    }
66
67    private final Port port;
68    private final int fdVal;
69
70    // used to ensure that the context for I/O operations that complete
71    // ascynrhonously is visible to the pooled threads handling I/O events.
72    private final Object updateLock = new Object();
73
74    // pending connect (updateLock)
75    private boolean connectPending;
76    private CompletionHandler<Void,Object> connectHandler;
77    private Object connectAttachment;
78    private PendingFuture<Void,Object> connectFuture;
79
80    // pending remote address (stateLock)
81    private SocketAddress pendingRemote;
82
83    // pending read (updateLock)
84    private boolean readPending;
85    private boolean isScatteringRead;
86    private ByteBuffer readBuffer;
87    private ByteBuffer[] readBuffers;
88    private CompletionHandler<Number,Object> readHandler;
89    private Object readAttachment;
90    private PendingFuture<Number,Object> readFuture;
91    private Future<?> readTimer;
92
93    // pending write (updateLock)
94    private boolean writePending;
95    private boolean isGatheringWrite;
96    private ByteBuffer writeBuffer;
97    private ByteBuffer[] writeBuffers;
98    private CompletionHandler<Number,Object> writeHandler;
99    private Object writeAttachment;
100    private PendingFuture<Number,Object> writeFuture;
101    private Future<?> writeTimer;
102
103    // Android-changed: Add CloseGuard support.
104    private final CloseGuard guard = CloseGuard.get();
105
106    UnixAsynchronousSocketChannelImpl(Port port)
107        throws IOException
108    {
109        super(port);
110
111        // set non-blocking
112        try {
113            IOUtil.configureBlocking(fd, false);
114        } catch (IOException x) {
115            nd.close(fd);
116            throw x;
117        }
118
119        this.port = port;
120        this.fdVal = IOUtil.fdVal(fd);
121
122        // add mapping from file descriptor to this channel
123        port.register(fdVal, this);
124        // Android-changed: Add CloseGuard support.
125        guard.open("close");
126    }
127
128    // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl
129    UnixAsynchronousSocketChannelImpl(Port port,
130                                      FileDescriptor fd,
131                                      InetSocketAddress remote)
132        throws IOException
133    {
134        super(port, fd, remote);
135
136        this.fdVal = IOUtil.fdVal(fd);
137        IOUtil.configureBlocking(fd, false);
138
139        try {
140            port.register(fdVal, this);
141        } catch (ShutdownChannelGroupException x) {
142            // ShutdownChannelGroupException thrown if we attempt to register a
143            // new channel after the group is shutdown
144            throw new IOException(x);
145        }
146
147        this.port = port;
148        guard.open("close");
149    }
150
151    @Override
152    public AsynchronousChannelGroupImpl group() {
153        return port;
154    }
155
156    // register events for outstanding I/O operations, caller already owns updateLock
157    private void updateEvents() {
158        assert Thread.holdsLock(updateLock);
159        int events = 0;
160        if (readPending)
161            events |= Net.POLLIN;
162        if (connectPending || writePending)
163            events |= Net.POLLOUT;
164        if (events != 0)
165            port.startPoll(fdVal, events);
166    }
167
168    // register events for outstanding I/O operations
169    private void lockAndUpdateEvents() {
170        synchronized (updateLock) {
171            updateEvents();
172        }
173    }
174
175    // invoke to finish read and/or write operations
176    private void finish(boolean mayInvokeDirect,
177                        boolean readable,
178                        boolean writable)
179    {
180        boolean finishRead = false;
181        boolean finishWrite = false;
182        boolean finishConnect = false;
183
184        // map event to pending result
185        synchronized (updateLock) {
186            if (readable && this.readPending) {
187                this.readPending = false;
188                finishRead = true;
189            }
190            if (writable) {
191                if (this.writePending) {
192                    this.writePending = false;
193                    finishWrite = true;
194                } else if (this.connectPending) {
195                    this.connectPending = false;
196                    finishConnect = true;
197                }
198            }
199        }
200
201        // complete the I/O operation. Special case for when channel is
202        // ready for both reading and writing. In that case, submit task to
203        // complete write if write operation has a completion handler.
204        if (finishRead) {
205            if (finishWrite)
206                finishWrite(false);
207            finishRead(mayInvokeDirect);
208            return;
209        }
210        if (finishWrite) {
211            finishWrite(mayInvokeDirect);
212        }
213        if (finishConnect) {
214            finishConnect(mayInvokeDirect);
215        }
216    }
217
218    /**
219     * Invoked by event handler thread when file descriptor is polled
220     */
221    @Override
222    public void onEvent(int events, boolean mayInvokeDirect) {
223        boolean readable = (events & Net.POLLIN) > 0;
224        boolean writable = (events & Net.POLLOUT) > 0;
225        if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {
226            readable = true;
227            writable = true;
228        }
229        finish(mayInvokeDirect, readable, writable);
230    }
231
232    @Override
233    void implClose() throws IOException {
234        // Android-changed: Add CloseGuard support.
235        guard.close();
236        // remove the mapping
237        port.unregister(fdVal);
238
239        // close file descriptor
240        nd.close(fd);
241
242        // All outstanding I/O operations are required to fail
243        finish(false, true, true);
244    }
245
246    protected void finalize() throws Throwable {
247        try {
248            if (guard != null) {
249                guard.warnIfOpen();
250            }
251            close();
252        } finally {
253            super.finalize();
254        }
255    }
256
257    @Override
258    public void onCancel(PendingFuture<?,?> task) {
259        if (task.getContext() == OpType.CONNECT)
260            killConnect();
261        if (task.getContext() == OpType.READ)
262            killReading();
263        if (task.getContext() == OpType.WRITE)
264            killWriting();
265    }
266
267    // -- connect --
268
269    private void setConnected() throws IOException {
270        synchronized (stateLock) {
271            state = ST_CONNECTED;
272            localAddress = Net.localAddress(fd);
273            remoteAddress = (InetSocketAddress)pendingRemote;
274        }
275    }
276
277    private void finishConnect(boolean mayInvokeDirect) {
278        Throwable e = null;
279        try {
280            begin();
281            checkConnect(fdVal);
282            setConnected();
283        } catch (Throwable x) {
284            if (x instanceof ClosedChannelException)
285                x = new AsynchronousCloseException();
286            e = x;
287        } finally {
288            end();
289        }
290        if (e != null) {
291            // close channel if connection cannot be established
292            try {
293                close();
294            } catch (Throwable suppressed) {
295                e.addSuppressed(suppressed);
296            }
297        }
298
299        // invoke handler and set result
300        CompletionHandler<Void,Object> handler = connectHandler;
301        Object att = connectAttachment;
302        PendingFuture<Void,Object> future = connectFuture;
303        if (handler == null) {
304            future.setResult(null, e);
305        } else {
306            if (mayInvokeDirect) {
307                Invoker.invokeUnchecked(handler, att, null, e);
308            } else {
309                Invoker.invokeIndirectly(this, handler, att, null, e);
310            }
311        }
312    }
313
314    @Override
315    @SuppressWarnings("unchecked")
316    <A> Future<Void> implConnect(SocketAddress remote,
317                                 A attachment,
318                                 CompletionHandler<Void,? super A> handler)
319    {
320        if (!isOpen()) {
321            Throwable e = new ClosedChannelException();
322            if (handler == null) {
323                return CompletedFuture.withFailure(e);
324            } else {
325                Invoker.invoke(this, handler, attachment, null, e);
326                return null;
327            }
328        }
329
330        InetSocketAddress isa = Net.checkAddress(remote);
331
332        // permission check
333        SecurityManager sm = System.getSecurityManager();
334        if (sm != null)
335            sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
336
337        // check and set state
338        boolean notifyBeforeTcpConnect;
339        synchronized (stateLock) {
340            if (state == ST_CONNECTED)
341                throw new AlreadyConnectedException();
342            if (state == ST_PENDING)
343                throw new ConnectionPendingException();
344            state = ST_PENDING;
345            pendingRemote = remote;
346            notifyBeforeTcpConnect = (localAddress == null);
347        }
348
349        Throwable e = null;
350        try {
351            begin();
352            // notify hook if unbound
353            if (notifyBeforeTcpConnect)
354                NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
355            int n = Net.connect(fd, isa.getAddress(), isa.getPort());
356            if (n == IOStatus.UNAVAILABLE) {
357                // connection could not be established immediately
358                PendingFuture<Void,A> result = null;
359                synchronized (updateLock) {
360                    if (handler == null) {
361                        result = new PendingFuture<Void,A>(this, OpType.CONNECT);
362                        this.connectFuture = (PendingFuture<Void,Object>)result;
363                    } else {
364                        this.connectHandler = (CompletionHandler<Void,Object>)handler;
365                        this.connectAttachment = attachment;
366                    }
367                    this.connectPending = true;
368                    updateEvents();
369                }
370                return result;
371            }
372            setConnected();
373        } catch (Throwable x) {
374            if (x instanceof ClosedChannelException)
375                x = new AsynchronousCloseException();
376            e = x;
377        } finally {
378            end();
379        }
380
381        // close channel if connect fails
382        if (e != null) {
383            try {
384                close();
385            } catch (Throwable suppressed) {
386                e.addSuppressed(suppressed);
387            }
388        }
389        if (handler == null) {
390            return CompletedFuture.withResult(null, e);
391        } else {
392            Invoker.invoke(this, handler, attachment, null, e);
393            return null;
394        }
395    }
396
397    // -- read --
398
399    private void finishRead(boolean mayInvokeDirect) {
400        int n = -1;
401        Throwable exc = null;
402
403        // copy fields as we can't access them after reading is re-enabled.
404        boolean scattering = isScatteringRead;
405        CompletionHandler<Number,Object> handler = readHandler;
406        Object att = readAttachment;
407        PendingFuture<Number,Object> future = readFuture;
408        Future<?> timeout = readTimer;
409
410        try {
411            begin();
412
413            if (scattering) {
414                n = (int)IOUtil.read(fd, readBuffers, nd);
415            } else {
416                n = IOUtil.read(fd, readBuffer, -1, nd);
417            }
418            if (n == IOStatus.UNAVAILABLE) {
419                // spurious wakeup, is this possible?
420                synchronized (updateLock) {
421                    readPending = true;
422                }
423                return;
424            }
425
426            // allow objects to be GC'ed.
427            this.readBuffer = null;
428            this.readBuffers = null;
429            this.readAttachment = null;
430
431            // allow another read to be initiated
432            enableReading();
433
434        } catch (Throwable x) {
435            enableReading();
436            if (x instanceof ClosedChannelException)
437                x = new AsynchronousCloseException();
438            exc = x;
439        } finally {
440            // restart poll in case of concurrent write
441            if (!(exc instanceof AsynchronousCloseException))
442                lockAndUpdateEvents();
443            end();
444        }
445
446        // cancel the associated timer
447        if (timeout != null)
448            timeout.cancel(false);
449
450        // create result
451        Number result = (exc != null) ? null : (scattering) ?
452            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
453
454        // invoke handler or set result
455        if (handler == null) {
456            future.setResult(result, exc);
457        } else {
458            if (mayInvokeDirect) {
459                Invoker.invokeUnchecked(handler, att, result, exc);
460            } else {
461                Invoker.invokeIndirectly(this, handler, att, result, exc);
462            }
463        }
464    }
465
466    private Runnable readTimeoutTask = new Runnable() {
467        public void run() {
468            CompletionHandler<Number,Object> handler = null;
469            Object att = null;
470            PendingFuture<Number,Object> future = null;
471
472            synchronized (updateLock) {
473                if (!readPending)
474                    return;
475                readPending = false;
476                handler = readHandler;
477                att = readAttachment;
478                future = readFuture;
479            }
480
481            // kill further reading before releasing waiters
482            enableReading(true);
483
484            // invoke handler or set result
485            Exception exc = new InterruptedByTimeoutException();
486            if (handler == null) {
487                future.setFailure(exc);
488            } else {
489                AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
490                Invoker.invokeIndirectly(ch, handler, att, null, exc);
491            }
492        }
493    };
494
495    /**
496     * Initiates a read or scattering read operation
497     */
498    @Override
499    @SuppressWarnings("unchecked")
500    <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
501                                            ByteBuffer dst,
502                                            ByteBuffer[] dsts,
503                                            long timeout,
504                                            TimeUnit unit,
505                                            A attachment,
506                                            CompletionHandler<V,? super A> handler)
507    {
508        // A synchronous read is not attempted if disallowed by system property
509        // or, we are using a fixed thread pool and the completion handler may
510        // not be invoked directly (because the thread is not a pooled thread or
511        // there are too many handlers on the stack).
512        Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
513        boolean invokeDirect = false;
514        boolean attemptRead = false;
515        if (!disableSynchronousRead) {
516            if (handler == null) {
517                attemptRead = true;
518            } else {
519                myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
520                invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
521                // okay to attempt read with user thread pool
522                attemptRead = invokeDirect || !port.isFixedThreadPool();
523            }
524        }
525
526        int n = IOStatus.UNAVAILABLE;
527        Throwable exc = null;
528        boolean pending = false;
529
530        try {
531            begin();
532
533            if (attemptRead) {
534                if (isScatteringRead) {
535                    n = (int)IOUtil.read(fd, dsts, nd);
536                } else {
537                    n = IOUtil.read(fd, dst, -1, nd);
538                }
539            }
540
541            if (n == IOStatus.UNAVAILABLE) {
542                PendingFuture<V,A> result = null;
543                synchronized (updateLock) {
544                    this.isScatteringRead = isScatteringRead;
545                    this.readBuffer = dst;
546                    this.readBuffers = dsts;
547                    if (handler == null) {
548                        this.readHandler = null;
549                        result = new PendingFuture<V,A>(this, OpType.READ);
550                        this.readFuture = (PendingFuture<Number,Object>)result;
551                        this.readAttachment = null;
552                    } else {
553                        this.readHandler = (CompletionHandler<Number,Object>)handler;
554                        this.readAttachment = attachment;
555                        this.readFuture = null;
556                    }
557                    if (timeout > 0L) {
558                        this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
559                    }
560                    this.readPending = true;
561                    updateEvents();
562                }
563                pending = true;
564                return result;
565            }
566        } catch (Throwable x) {
567            if (x instanceof ClosedChannelException)
568                x = new AsynchronousCloseException();
569            exc = x;
570        } finally {
571            if (!pending)
572                enableReading();
573            end();
574        }
575
576        Number result = (exc != null) ? null : (isScatteringRead) ?
577            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
578
579        // read completed immediately
580        if (handler != null) {
581            if (invokeDirect) {
582                Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
583            } else {
584                Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
585            }
586            return null;
587        } else {
588            return CompletedFuture.withResult((V)result, exc);
589        }
590    }
591
592    // -- write --
593
594    private void finishWrite(boolean mayInvokeDirect) {
595        int n = -1;
596        Throwable exc = null;
597
598        // copy fields as we can't access them after reading is re-enabled.
599        boolean gathering = this.isGatheringWrite;
600        CompletionHandler<Number,Object> handler = this.writeHandler;
601        Object att = this.writeAttachment;
602        PendingFuture<Number,Object> future = this.writeFuture;
603        Future<?> timer = this.writeTimer;
604
605        try {
606            begin();
607
608            if (gathering) {
609                n = (int)IOUtil.write(fd, writeBuffers, nd);
610            } else {
611                n = IOUtil.write(fd, writeBuffer, -1, nd);
612            }
613            if (n == IOStatus.UNAVAILABLE) {
614                // spurious wakeup, is this possible?
615                synchronized (updateLock) {
616                    writePending = true;
617                }
618                return;
619            }
620
621            // allow objects to be GC'ed.
622            this.writeBuffer = null;
623            this.writeBuffers = null;
624            this.writeAttachment = null;
625
626            // allow another write to be initiated
627            enableWriting();
628
629        } catch (Throwable x) {
630            enableWriting();
631            if (x instanceof ClosedChannelException)
632                x = new AsynchronousCloseException();
633            exc = x;
634        } finally {
635            // restart poll in case of concurrent write
636            if (!(exc instanceof AsynchronousCloseException))
637                lockAndUpdateEvents();
638            end();
639        }
640
641        // cancel the associated timer
642        if (timer != null)
643            timer.cancel(false);
644
645        // create result
646        Number result = (exc != null) ? null : (gathering) ?
647            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
648
649        // invoke handler or set result
650        if (handler == null) {
651            future.setResult(result, exc);
652        } else {
653            if (mayInvokeDirect) {
654                Invoker.invokeUnchecked(handler, att, result, exc);
655            } else {
656                Invoker.invokeIndirectly(this, handler, att, result, exc);
657            }
658        }
659    }
660
661    private Runnable writeTimeoutTask = new Runnable() {
662        public void run() {
663            CompletionHandler<Number,Object> handler = null;
664            Object att = null;
665            PendingFuture<Number,Object> future = null;
666
667            synchronized (updateLock) {
668                if (!writePending)
669                    return;
670                writePending = false;
671                handler = writeHandler;
672                att = writeAttachment;
673                future = writeFuture;
674            }
675
676            // kill further writing before releasing waiters
677            enableWriting(true);
678
679            // invoke handler or set result
680            Exception exc = new InterruptedByTimeoutException();
681            if (handler != null) {
682                Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
683                    handler, att, null, exc);
684            } else {
685                future.setFailure(exc);
686            }
687        }
688    };
689
690    /**
691     * Initiates a read or scattering read operation
692     */
693    @Override
694    @SuppressWarnings("unchecked")
695    <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
696                                             ByteBuffer src,
697                                             ByteBuffer[] srcs,
698                                             long timeout,
699                                             TimeUnit unit,
700                                             A attachment,
701                                             CompletionHandler<V,? super A> handler)
702    {
703        Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
704            Invoker.getGroupAndInvokeCount();
705        boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
706        boolean attemptWrite = (handler == null) || invokeDirect ||
707            !port.isFixedThreadPool();  // okay to attempt write with user thread pool
708
709        int n = IOStatus.UNAVAILABLE;
710        Throwable exc = null;
711        boolean pending = false;
712
713        try {
714            begin();
715
716            if (attemptWrite) {
717                if (isGatheringWrite) {
718                    n = (int)IOUtil.write(fd, srcs, nd);
719                } else {
720                    n = IOUtil.write(fd, src, -1, nd);
721                }
722            }
723
724            if (n == IOStatus.UNAVAILABLE) {
725                PendingFuture<V,A> result = null;
726                synchronized (updateLock) {
727                    this.isGatheringWrite = isGatheringWrite;
728                    this.writeBuffer = src;
729                    this.writeBuffers = srcs;
730                    if (handler == null) {
731                        this.writeHandler = null;
732                        result = new PendingFuture<V,A>(this, OpType.WRITE);
733                        this.writeFuture = (PendingFuture<Number,Object>)result;
734                        this.writeAttachment = null;
735                    } else {
736                        this.writeHandler = (CompletionHandler<Number,Object>)handler;
737                        this.writeAttachment = attachment;
738                        this.writeFuture = null;
739                    }
740                    if (timeout > 0L) {
741                        this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
742                    }
743                    this.writePending = true;
744                    updateEvents();
745                }
746                pending = true;
747                return result;
748            }
749        } catch (Throwable x) {
750            if (x instanceof ClosedChannelException)
751                x = new AsynchronousCloseException();
752            exc = x;
753        } finally {
754            if (!pending)
755                enableWriting();
756            end();
757        }
758
759        Number result = (exc != null) ? null : (isGatheringWrite) ?
760            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
761
762        // write completed immediately
763        if (handler != null) {
764            if (invokeDirect) {
765                Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
766            } else {
767                Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
768            }
769            return null;
770        } else {
771            return CompletedFuture.withResult((V)result, exc);
772        }
773    }
774
775    // -- Native methods --
776
777    private static native void checkConnect(int fdVal) throws IOException;
778}
779