SelectorImpl.java revision f7bd2a99f6f4024e9034300b30a13a2ea871aa97
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package java.nio;
17
18import java.io.FileDescriptor;
19import java.io.IOException;
20import java.nio.channels.ClosedSelectorException;
21import java.nio.channels.IllegalSelectorException;
22import java.nio.channels.SelectableChannel;
23import java.nio.channels.SelectionKey;
24import static java.nio.channels.SelectionKey.*;
25import java.nio.channels.Selector;
26import java.nio.channels.SocketChannel;
27import java.nio.channels.spi.AbstractSelectableChannel;
28import java.nio.channels.spi.AbstractSelectionKey;
29import java.nio.channels.spi.AbstractSelector;
30import java.nio.channels.spi.SelectorProvider;
31import java.util.Arrays;
32import java.util.Collection;
33import java.util.Collections;
34import java.util.HashSet;
35import java.util.Iterator;
36import java.util.Set;
37import java.util.UnsafeArrayList;
38import libcore.io.ErrnoException;
39import libcore.io.IoBridge;
40import libcore.io.IoUtils;
41import libcore.io.Libcore;
42import libcore.io.StructPollfd;
43import libcore.util.EmptyArray;
44import static libcore.io.OsConstants.*;
45
46/*
47 * Default implementation of java.nio.channels.Selector
48 */
49final class SelectorImpl extends AbstractSelector {
50
51    /**
52     * Used to synchronize when a key's interest ops change.
53     */
54    final Object keysLock = new Object();
55
56    private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>();
57
58    /**
59     * The unmodifiable set of keys as exposed to the user. This object is used
60     * for synchronization.
61     */
62    private final Set<SelectionKey> unmodifiableKeys = Collections
63            .<SelectionKey>unmodifiableSet(mutableKeys);
64
65    private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>();
66
67    /**
68     * The unmodifiable set of selectable keys as seen by the user. This object
69     * is used for synchronization.
70     */
71    private final Set<SelectionKey> selectedKeys
72            = new UnaddableSet<SelectionKey>(mutableSelectedKeys);
73
74    /**
75     * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each
76     * time select returns, wakeupIn is drained.
77     */
78    private final FileDescriptor wakeupIn;
79    private final FileDescriptor wakeupOut;
80
81    private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8);
82
83    public SelectorImpl(SelectorProvider selectorProvider) throws IOException {
84        super(selectorProvider);
85
86        /*
87         * Create a pipes to trigger wakeup. We can't use a NIO pipe because it
88         * would be closed if the selecting thread is interrupted. Also
89         * configure the pipe so we can fully drain it without blocking.
90         */
91        try {
92            FileDescriptor[] pipeFds = Libcore.os.pipe();
93            wakeupIn = pipeFds[0];
94            wakeupOut = pipeFds[1];
95            IoUtils.setBlocking(wakeupIn, false);
96            pollFds.add(new StructPollfd());
97            setPollFd(0, wakeupIn, POLLIN, null);
98        } catch (ErrnoException errnoException) {
99            throw errnoException.rethrowAsIOException();
100        }
101    }
102
103    @Override protected void implCloseSelector() throws IOException {
104        wakeup();
105        synchronized (this) {
106            synchronized (unmodifiableKeys) {
107                synchronized (selectedKeys) {
108                    IoUtils.close(wakeupIn);
109                    IoUtils.close(wakeupOut);
110                    doCancel();
111                    for (SelectionKey sk : mutableKeys) {
112                        deregister((AbstractSelectionKey) sk);
113                    }
114                }
115            }
116        }
117    }
118
119    @Override protected SelectionKey register(AbstractSelectableChannel channel,
120            int operations, Object attachment) {
121        if (!provider().equals(channel.provider())) {
122            throw new IllegalSelectorException();
123        }
124        synchronized (this) {
125            synchronized (unmodifiableKeys) {
126                SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations,
127                        attachment, this);
128                mutableKeys.add(selectionKey);
129                ensurePollFdsCapacity();
130                return selectionKey;
131            }
132        }
133    }
134
135    @Override public synchronized Set<SelectionKey> keys() {
136        checkClosed();
137        return unmodifiableKeys;
138    }
139
140    private void checkClosed() {
141        if (!isOpen()) {
142            throw new ClosedSelectorException();
143        }
144    }
145
146    @Override public int select() throws IOException {
147        // Blocks until some fd is ready.
148        return selectInternal(-1);
149    }
150
151    @Override public int select(long timeout) throws IOException {
152        if (timeout < 0) {
153            throw new IllegalArgumentException();
154        }
155        // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow.
156        return selectInternal((timeout == 0) ? -1 : timeout);
157    }
158
159    @Override public int selectNow() throws IOException {
160        return selectInternal(0);
161    }
162
163    private int selectInternal(long timeout) throws IOException {
164        checkClosed();
165        synchronized (this) {
166            synchronized (unmodifiableKeys) {
167                synchronized (selectedKeys) {
168                    doCancel();
169                    boolean isBlock = (timeout != 0);
170                    synchronized (keysLock) {
171                        preparePollFds();
172                    }
173                    int rc = -1;
174                    try {
175                        if (isBlock) {
176                            begin();
177                        }
178                        try {
179                            rc = Libcore.os.poll(pollFds.array(), (int) timeout);
180                        } catch (ErrnoException errnoException) {
181                            if (errnoException.errno != EINTR) {
182                                throw errnoException.rethrowAsIOException();
183                            }
184                        }
185                    } finally {
186                        if (isBlock) {
187                            end();
188                        }
189                    }
190
191                    int readyCount = (rc > 0) ? processPollFds() : 0;
192                    readyCount -= doCancel();
193                    return readyCount;
194                }
195            }
196        }
197    }
198
199    private void setPollFd(int i, FileDescriptor fd, int events, Object object) {
200        StructPollfd pollFd = pollFds.get(i);
201        pollFd.fd = fd;
202        pollFd.events = (short) events;
203        pollFd.userData = object;
204    }
205
206    private void preparePollFds() {
207        int i = 1; // Our wakeup pipe comes before all the user's fds.
208        for (SelectionKeyImpl key : mutableKeys) {
209            int interestOps = key.interestOpsNoCheck();
210            short eventMask = 0;
211            if (((OP_ACCEPT | OP_READ) & interestOps) != 0) {
212                eventMask |= POLLIN;
213            }
214            if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) {
215                eventMask |= POLLOUT;
216            }
217            if (eventMask != 0) {
218                setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key);
219            }
220        }
221    }
222
223    private void ensurePollFdsCapacity() {
224        // We need one slot for each element of mutableKeys, plus one for the wakeup pipe.
225        while (pollFds.size() < mutableKeys.size() + 1) {
226            pollFds.add(new StructPollfd());
227        }
228    }
229
230    /**
231     * Updates the key ready ops and selected key set.
232     */
233    private int processPollFds() throws IOException {
234        if (pollFds.get(0).revents == POLLIN) {
235            // Read bytes from the wakeup pipe until the pipe is empty.
236            byte[] buffer = new byte[8];
237            while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) {
238            }
239        }
240
241        int readyKeyCount = 0;
242        for (int i = 1; i < pollFds.size(); ++i) {
243            StructPollfd pollFd = pollFds.get(i);
244            if (pollFd.revents == 0) {
245                continue;
246            }
247            if (pollFd.fd == null) {
248                break;
249            }
250
251            SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData;
252
253            pollFd.fd = null;
254            pollFd.userData = null;
255
256            int ops = key.interestOpsNoCheck();
257            int selectedOp = 0;
258            if ((pollFd.revents & POLLIN) != 0) {
259                selectedOp = ops & (OP_ACCEPT | OP_READ);
260            } else if ((pollFd.revents & POLLOUT) != 0) {
261                if (key.isConnected()) {
262                    selectedOp = ops & OP_WRITE;
263                } else {
264                    selectedOp = ops & OP_CONNECT;
265                }
266            }
267
268            if (selectedOp != 0) {
269                boolean wasSelected = mutableSelectedKeys.contains(key);
270                if (wasSelected && key.readyOps() != selectedOp) {
271                    key.setReadyOps(key.readyOps() | selectedOp);
272                    ++readyKeyCount;
273                } else if (!wasSelected) {
274                    key.setReadyOps(selectedOp);
275                    mutableSelectedKeys.add(key);
276                    ++readyKeyCount;
277                }
278            }
279        }
280
281        return readyKeyCount;
282    }
283
284    @Override public synchronized Set<SelectionKey> selectedKeys() {
285        checkClosed();
286        return selectedKeys;
287    }
288
289    /**
290     * Removes cancelled keys from the key set and selected key set, and
291     * deregisters the corresponding channels. Returns the number of keys
292     * removed from the selected key set.
293     */
294    private int doCancel() {
295        int deselected = 0;
296
297        Set<SelectionKey> cancelledKeys = cancelledKeys();
298        synchronized (cancelledKeys) {
299            if (cancelledKeys.size() > 0) {
300                for (SelectionKey currentKey : cancelledKeys) {
301                    mutableKeys.remove(currentKey);
302                    deregister((AbstractSelectionKey) currentKey);
303                    if (mutableSelectedKeys.remove(currentKey)) {
304                        deselected++;
305                    }
306                }
307                cancelledKeys.clear();
308            }
309        }
310
311        return deselected;
312    }
313
314    @Override public Selector wakeup() {
315        try {
316            Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1);
317        } catch (ErrnoException ignored) {
318        }
319        return this;
320    }
321
322    private static class UnaddableSet<E> implements Set<E> {
323
324        private final Set<E> set;
325
326        UnaddableSet(Set<E> set) {
327            this.set = set;
328        }
329
330        @Override
331        public boolean equals(Object object) {
332            return set.equals(object);
333        }
334
335        @Override
336        public int hashCode() {
337            return set.hashCode();
338        }
339
340        public boolean add(E object) {
341            throw new UnsupportedOperationException();
342        }
343
344        public boolean addAll(Collection<? extends E> c) {
345            throw new UnsupportedOperationException();
346        }
347
348        public void clear() {
349            set.clear();
350        }
351
352        public boolean contains(Object object) {
353            return set.contains(object);
354        }
355
356        public boolean containsAll(Collection<?> c) {
357            return set.containsAll(c);
358        }
359
360        public boolean isEmpty() {
361            return set.isEmpty();
362        }
363
364        public Iterator<E> iterator() {
365            return set.iterator();
366        }
367
368        public boolean remove(Object object) {
369            return set.remove(object);
370        }
371
372        public boolean removeAll(Collection<?> c) {
373            return set.removeAll(c);
374        }
375
376        public boolean retainAll(Collection<?> c) {
377            return set.retainAll(c);
378        }
379
380        public int size() {
381            return set.size();
382        }
383
384        public Object[] toArray() {
385            return set.toArray();
386        }
387
388        public <T> T[] toArray(T[] a) {
389            return set.toArray(a);
390        }
391    }
392}
393