SelectorImpl.java revision fa542091e45db699a937c5ac0191194405107827
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package java.nio;
17
18import android.system.ErrnoException;
19import android.system.StructPollfd;
20import java.io.FileDescriptor;
21import java.io.InterruptedIOException;
22import java.io.IOException;
23import java.nio.channels.ClosedSelectorException;
24import java.nio.channels.IllegalSelectorException;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.Selector;
27import java.nio.channels.spi.AbstractSelectableChannel;
28import java.nio.channels.spi.AbstractSelectionKey;
29import java.nio.channels.spi.AbstractSelector;
30import java.nio.channels.spi.SelectorProvider;
31import java.util.Collection;
32import java.util.Collections;
33import java.util.HashSet;
34import java.util.Iterator;
35import java.util.Set;
36import java.util.UnsafeArrayList;
37import libcore.io.IoBridge;
38import libcore.io.IoUtils;
39import libcore.io.Libcore;
40
41import static android.system.OsConstants.POLLERR;
42import static android.system.OsConstants.POLLHUP;
43import static android.system.OsConstants.POLLIN;
44import static android.system.OsConstants.POLLOUT;
45import static java.nio.channels.SelectionKey.OP_ACCEPT;
46import static java.nio.channels.SelectionKey.OP_CONNECT;
47import static java.nio.channels.SelectionKey.OP_READ;
48import static java.nio.channels.SelectionKey.OP_WRITE;
49
50/*
51 * Default implementation of java.nio.channels.Selector
52 */
53final class SelectorImpl extends AbstractSelector {
54
55    /**
56     * Used to synchronize when a key's interest ops change.
57     */
58    final Object keysLock = new Object();
59
60    private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>();
61
62    /**
63     * The unmodifiable set of keys as exposed to the user. This object is used
64     * for synchronization.
65     */
66    private final Set<SelectionKey> unmodifiableKeys = Collections
67            .<SelectionKey>unmodifiableSet(mutableKeys);
68
69    private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>();
70
71    /**
72     * The unmodifiable set of selectable keys as seen by the user. This object
73     * is used for synchronization.
74     */
75    private final Set<SelectionKey> selectedKeys
76            = new UnaddableSet<SelectionKey>(mutableSelectedKeys);
77
78    /**
79     * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each
80     * time select returns, wakeupIn is drained.
81     */
82    private final FileDescriptor wakeupIn;
83    private final FileDescriptor wakeupOut;
84
85    private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8);
86
87    public SelectorImpl(SelectorProvider selectorProvider) throws IOException {
88        super(selectorProvider);
89
90        /*
91         * Create a pipe to trigger wakeup. We can't use a NIO pipe because it
92         * would be closed if the selecting thread is interrupted. Also
93         * configure the pipe so we can fully drain it without blocking.
94         */
95        try {
96            FileDescriptor[] pipeFds = Libcore.os.pipe2(0);
97            wakeupIn = pipeFds[0];
98            wakeupOut = pipeFds[1];
99            IoUtils.setBlocking(wakeupIn, false);
100            pollFds.add(new StructPollfd());
101            setPollFd(0, wakeupIn, POLLIN, null);
102        } catch (ErrnoException errnoException) {
103            throw errnoException.rethrowAsIOException();
104        }
105    }
106
107    @Override protected void implCloseSelector() throws IOException {
108        wakeup();
109        synchronized (this) {
110            synchronized (unmodifiableKeys) {
111                synchronized (selectedKeys) {
112                    IoUtils.close(wakeupIn);
113                    IoUtils.close(wakeupOut);
114                    doCancel();
115                    for (SelectionKey sk : mutableKeys) {
116                        deregister((AbstractSelectionKey) sk);
117                    }
118                }
119            }
120        }
121    }
122
123    @Override protected SelectionKey register(AbstractSelectableChannel channel,
124            int operations, Object attachment) {
125        if (!provider().equals(channel.provider())) {
126            throw new IllegalSelectorException();
127        }
128        synchronized (this) {
129            synchronized (unmodifiableKeys) {
130                SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations,
131                        attachment, this);
132                mutableKeys.add(selectionKey);
133                ensurePollFdsCapacity();
134                return selectionKey;
135            }
136        }
137    }
138
139    @Override public synchronized Set<SelectionKey> keys() {
140        checkClosed();
141        return unmodifiableKeys;
142    }
143
144    private void checkClosed() {
145        if (!isOpen()) {
146            throw new ClosedSelectorException();
147        }
148    }
149
150    @Override public int select() throws IOException {
151        // Blocks until some fd is ready.
152        return selectInternal(-1);
153    }
154
155    @Override public int select(long timeout) throws IOException {
156        if (timeout < 0) {
157            throw new IllegalArgumentException("timeout < 0: " + timeout);
158        }
159        // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow.
160        return selectInternal((timeout == 0) ? -1 : timeout);
161    }
162
163    @Override public int selectNow() throws IOException {
164        return selectInternal(0);
165    }
166
167    private int selectInternal(long timeout) throws IOException {
168        checkClosed();
169        synchronized (this) {
170            synchronized (unmodifiableKeys) {
171                synchronized (selectedKeys) {
172                    doCancel();
173                    boolean isBlocking = (timeout != 0);
174                    synchronized (keysLock) {
175                        preparePollFds();
176                    }
177                    int rc = -1;
178                    try {
179                        if (isBlocking) {
180                            begin();
181                        }
182                        try {
183                            rc = Libcore.os.poll(pollFds.array(), (int) timeout);
184                        } catch (ErrnoException errnoException) {
185                            throw errnoException.rethrowAsIOException();
186                        }
187                    } finally {
188                        if (isBlocking) {
189                            end();
190                        }
191                    }
192
193                    int readyCount = (rc > 0) ? processPollFds() : 0;
194                    readyCount -= doCancel();
195                    return readyCount;
196                }
197            }
198        }
199    }
200
201    private void setPollFd(int i, FileDescriptor fd, int events, Object object) {
202        StructPollfd pollFd = pollFds.get(i);
203        pollFd.fd = fd;
204        pollFd.events = (short) events;
205        pollFd.userData = object;
206    }
207
208    private void preparePollFds() {
209        int i = 1; // Our wakeup pipe comes before all the user's fds.
210        for (SelectionKeyImpl key : mutableKeys) {
211            int interestOps = key.interestOpsNoCheck();
212            short eventMask = 0;
213            if (((OP_ACCEPT | OP_READ) & interestOps) != 0) {
214                eventMask |= POLLIN;
215            }
216            if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) {
217                eventMask |= POLLOUT;
218            }
219            if (eventMask != 0) {
220                setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key);
221            }
222        }
223    }
224
225    private void ensurePollFdsCapacity() {
226        // We need one slot for each element of mutableKeys, plus one for the wakeup pipe.
227        while (pollFds.size() < mutableKeys.size() + 1) {
228            pollFds.add(new StructPollfd());
229        }
230    }
231
232    /**
233     * Updates the key ready ops and selected key set.
234     */
235    private int processPollFds() throws IOException {
236        if (pollFds.get(0).revents == POLLIN) {
237            // Read bytes from the wakeup pipe until the pipe is empty.
238            byte[] buffer = new byte[8];
239            while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) {
240            }
241        }
242
243        int readyKeyCount = 0;
244        for (int i = 1; i < pollFds.size(); ++i) {
245            StructPollfd pollFd = pollFds.get(i);
246            if (pollFd.revents == 0) {
247                continue;
248            }
249            if (pollFd.fd == null) {
250                break;
251            }
252
253            SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData;
254
255            pollFd.fd = null;
256            pollFd.userData = null;
257
258            int ops = key.interestOpsNoCheck();
259            int selectedOps = 0;
260            if ((pollFd.revents & POLLHUP) != 0 || (pollFd.revents & POLLERR) != 0) {
261                // If there was an error condition, we definitely want to wake listeners,
262                // regardless of what they're waiting for. Failure is always interesting.
263                selectedOps |= ops;
264            }
265            if ((pollFd.revents & POLLIN) != 0) {
266                selectedOps |= ops & (OP_ACCEPT | OP_READ);
267            }
268            if ((pollFd.revents & POLLOUT) != 0) {
269                if (key.isConnected()) {
270                    selectedOps |= ops & OP_WRITE;
271                } else {
272                    selectedOps |= ops & OP_CONNECT;
273                }
274            }
275
276            if (selectedOps != 0) {
277                boolean wasSelected = mutableSelectedKeys.contains(key);
278                if (wasSelected && key.readyOps() != selectedOps) {
279                    key.setReadyOps(key.readyOps() | selectedOps);
280                    ++readyKeyCount;
281                } else if (!wasSelected) {
282                    key.setReadyOps(selectedOps);
283                    mutableSelectedKeys.add(key);
284                    ++readyKeyCount;
285                }
286            }
287        }
288
289        return readyKeyCount;
290    }
291
292    @Override public synchronized Set<SelectionKey> selectedKeys() {
293        checkClosed();
294        return selectedKeys;
295    }
296
297    /**
298     * Removes cancelled keys from the key set and selected key set, and
299     * unregisters the corresponding channels. Returns the number of keys
300     * removed from the selected key set.
301     */
302    private int doCancel() {
303        int deselected = 0;
304
305        Set<SelectionKey> cancelledKeys = cancelledKeys();
306        synchronized (cancelledKeys) {
307            if (cancelledKeys.size() > 0) {
308                for (SelectionKey currentKey : cancelledKeys) {
309                    mutableKeys.remove(currentKey);
310                    deregister((AbstractSelectionKey) currentKey);
311                    if (mutableSelectedKeys.remove(currentKey)) {
312                        deselected++;
313                    }
314                }
315                cancelledKeys.clear();
316            }
317        }
318
319        return deselected;
320    }
321
322    @Override public Selector wakeup() {
323        try {
324            Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1);
325        } catch (ErrnoException ignored) {
326        } catch (InterruptedIOException ignored) {
327        }
328        return this;
329    }
330
331    private static class UnaddableSet<E> implements Set<E> {
332
333        private final Set<E> set;
334
335        UnaddableSet(Set<E> set) {
336            this.set = set;
337        }
338
339        @Override
340        public boolean equals(Object object) {
341            return set.equals(object);
342        }
343
344        @Override
345        public int hashCode() {
346            return set.hashCode();
347        }
348
349        public boolean add(E object) {
350            throw new UnsupportedOperationException();
351        }
352
353        public boolean addAll(Collection<? extends E> c) {
354            throw new UnsupportedOperationException();
355        }
356
357        public void clear() {
358            set.clear();
359        }
360
361        public boolean contains(Object object) {
362            return set.contains(object);
363        }
364
365        public boolean containsAll(Collection<?> c) {
366            return set.containsAll(c);
367        }
368
369        public boolean isEmpty() {
370            return set.isEmpty();
371        }
372
373        public Iterator<E> iterator() {
374            return set.iterator();
375        }
376
377        public boolean remove(Object object) {
378            return set.remove(object);
379        }
380
381        public boolean removeAll(Collection<?> c) {
382            return set.removeAll(c);
383        }
384
385        public boolean retainAll(Collection<?> c) {
386            return set.retainAll(c);
387        }
388
389        public int size() {
390            return set.size();
391        }
392
393        public Object[] toArray() {
394            return set.toArray();
395        }
396
397        public <T> T[] toArray(T[] a) {
398            return set.toArray(a);
399        }
400    }
401}
402