SelectorImpl.java revision 41f0605d2c809bd9bc1c0fb68d86b49a0f59b6c5
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package java.nio;
17
18import java.io.FileDescriptor;
19import java.io.IOException;
20import java.nio.channels.ClosedSelectorException;
21import java.nio.channels.IllegalSelectorException;
22import java.nio.channels.SelectableChannel;
23import java.nio.channels.SelectionKey;
24import static java.nio.channels.SelectionKey.*;
25import java.nio.channels.Selector;
26import java.nio.channels.SocketChannel;
27import java.nio.channels.spi.AbstractSelectableChannel;
28import java.nio.channels.spi.AbstractSelectionKey;
29import java.nio.channels.spi.AbstractSelector;
30import java.nio.channels.spi.SelectorProvider;
31import java.util.Arrays;
32import java.util.Collection;
33import java.util.Collections;
34import java.util.HashSet;
35import java.util.Iterator;
36import java.util.Set;
37import libcore.io.ErrnoException;
38import libcore.io.IoUtils;
39import libcore.io.Libcore;
40import libcore.util.EmptyArray;
41import org.apache.harmony.luni.platform.FileDescriptorHandler;
42import org.apache.harmony.luni.platform.Platform;
43
44/*
45 * Default implementation of java.nio.channels.Selector
46 */
47final class SelectorImpl extends AbstractSelector {
48
49    static final FileDescriptor[] EMPTY_FILE_DESCRIPTORS_ARRAY = new FileDescriptor[0];
50
51    private static final SelectionKeyImpl[] EMPTY_SELECTION_KEY_IMPLS_ARRAY
52            = new SelectionKeyImpl[0];
53
54    private static final int CONNECT_OR_WRITE = OP_CONNECT | OP_WRITE;
55
56    private static final int ACCEPT_OR_READ = OP_ACCEPT | OP_READ;
57
58    private static final int NA = 0;
59
60    private static final int READABLE = 1;
61
62    private static final int WRITABLE = 2;
63
64    private static final int SELECT_BLOCK = -1;
65
66    private static final int SELECT_NOW = 0;
67
68    /**
69     * Used to synchronize when a key's interest ops change.
70     */
71    final Object keysLock = new Object();
72
73    private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>();
74
75    /**
76     * The unmodifiable set of keys as exposed to the user. This object is used
77     * for synchronization.
78     */
79    private final Set<SelectionKey> unmodifiableKeys = Collections
80            .<SelectionKey>unmodifiableSet(mutableKeys);
81
82    private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>();
83
84    /**
85     * The unmodifiable set of selectable keys as seen by the user. This object
86     * is used for synchronization.
87     */
88    private final Set<SelectionKey> selectedKeys
89            = new UnaddableSet<SelectionKey>(mutableSelectedKeys);
90
91    /**
92     * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each
93     * time select returns, wakeupIn is drained.
94     */
95    private final FileDescriptor wakeupIn;
96    private final FileDescriptor wakeupOut;
97
98    /**
99     * File descriptors we're interested in reading from. When actively
100     * selecting, the first element is always the wakeup channel's file
101     * descriptor, and the other elements are user-specified file descriptors.
102     * Otherwise, all elements are null.
103     */
104    private FileDescriptor[] readableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY;
105
106    /**
107     * File descriptors we're interested in writing from. May be empty. When not
108     * actively selecting, all elements are null.
109     */
110    private FileDescriptor[] writableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY;
111
112    /**
113     * Selection keys that correspond to the concatenation of readableFDs and
114     * writableFDs. This is used to interpret the results returned by select().
115     * When not actively selecting, all elements are null.
116     */
117    private SelectionKeyImpl[] readyKeys = EMPTY_SELECTION_KEY_IMPLS_ARRAY;
118
119    /**
120     * Selection flags that define the ready ops on the ready keys. When not
121     * actively selecting, all elements are 0. Corresponds to the ready keys
122     * set.
123     */
124    private int[] flags = EmptyArray.INT;
125
126    public SelectorImpl(SelectorProvider selectorProvider) throws IOException {
127        super(selectorProvider);
128
129        /*
130         * Create a pipes to trigger wakeup. We can't use a NIO pipe because it
131         * would be closed if the selecting thread is interrupted. Also
132         * configure the pipe so we can fully drain it without blocking.
133         */
134        try {
135            FileDescriptor[] pipeFds = Libcore.os.pipe();
136            wakeupIn = pipeFds[0];
137            wakeupOut = pipeFds[1];
138            IoUtils.setBlocking(wakeupIn, false);
139        } catch (ErrnoException errnoException) {
140            throw errnoException.rethrowAsIOException();
141        }
142    }
143
144    @Override protected void implCloseSelector() throws IOException {
145        wakeup();
146        synchronized (this) {
147            synchronized (unmodifiableKeys) {
148                synchronized (selectedKeys) {
149                    IoUtils.close(wakeupIn);
150                    IoUtils.close(wakeupOut);
151                    doCancel();
152                    for (SelectionKey sk : mutableKeys) {
153                        deregister((AbstractSelectionKey) sk);
154                    }
155                }
156            }
157        }
158    }
159
160    @Override protected SelectionKey register(AbstractSelectableChannel channel,
161            int operations, Object attachment) {
162        if (!provider().equals(channel.provider())) {
163            throw new IllegalSelectorException();
164        }
165        synchronized (this) {
166            synchronized (unmodifiableKeys) {
167                SelectionKeyImpl selectionKey = new SelectionKeyImpl(
168                        channel, operations, attachment, this);
169                mutableKeys.add(selectionKey);
170                return selectionKey;
171            }
172        }
173    }
174
175    @Override public synchronized Set<SelectionKey> keys() {
176        checkClosed();
177        return unmodifiableKeys;
178    }
179
180    private void checkClosed() {
181        if (!isOpen()) {
182            throw new ClosedSelectorException();
183        }
184    }
185
186    @Override public int select() throws IOException {
187        return selectInternal(SELECT_BLOCK);
188    }
189
190    @Override public int select(long timeout) throws IOException {
191        if (timeout < 0) {
192            throw new IllegalArgumentException();
193        }
194        return selectInternal((timeout == 0) ? SELECT_BLOCK : timeout);
195    }
196
197    @Override public int selectNow() throws IOException {
198        return selectInternal(SELECT_NOW);
199    }
200
201    private int selectInternal(long timeout) throws IOException {
202        checkClosed();
203        synchronized (this) {
204            synchronized (unmodifiableKeys) {
205                synchronized (selectedKeys) {
206                    doCancel();
207                    boolean isBlock = (SELECT_NOW != timeout);
208                    int readableKeysCount = 1; // first is always the wakeup channel
209                    int writableKeysCount = 0;
210                    synchronized (keysLock) {
211                        for (SelectionKeyImpl key : mutableKeys) {
212                            int ops = key.interestOpsNoCheck();
213                            if ((ACCEPT_OR_READ & ops) != 0) {
214                                readableKeysCount++;
215                            }
216                            if ((CONNECT_OR_WRITE & ops) != 0) {
217                                writableKeysCount++;
218                            }
219                        }
220                        prepareChannels(readableKeysCount, writableKeysCount);
221                    }
222                    boolean success;
223                    try {
224                        if (isBlock) {
225                            begin();
226                        }
227                        success = Platform.NETWORK.select(readableFDs, writableFDs,
228                                readableKeysCount, writableKeysCount, timeout, flags);
229                    } finally {
230                        if (isBlock) {
231                            end();
232                        }
233                    }
234
235                    int selected = success ? processSelectResult() : 0;
236
237                    Arrays.fill(readableFDs, null);
238                    Arrays.fill(writableFDs, null);
239                    Arrays.fill(readyKeys, null);
240                    Arrays.fill(flags, 0);
241
242                    selected -= doCancel();
243
244                    return selected;
245                }
246            }
247        }
248    }
249
250    private int getReadyOps(SelectionKeyImpl key) {
251        SelectableChannel channel = key.channel();
252        return ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnectionPending()) ?
253                OP_WRITE : CONNECT_OR_WRITE;
254    }
255
256    /**
257     * Prepare the readableFDs, writableFDs, readyKeys and flags arrays in
258     * preparation for a call to {@code INetworkSystem#select()}. After they're
259     * used, the array elements must be cleared.
260     */
261    private void prepareChannels(int numReadable, int numWritable) {
262        // grow each array to sufficient capacity. Always grow to at least 1.5x
263        // to avoid growing too frequently
264        if (readableFDs.length < numReadable) {
265            int newSize = Math.max((int) (readableFDs.length * 1.5f), numReadable);
266            readableFDs = new FileDescriptor[newSize];
267        }
268        if (writableFDs.length < numWritable) {
269            int newSize = Math.max((int) (writableFDs.length * 1.5f), numWritable);
270            writableFDs = new FileDescriptor[newSize];
271        }
272        int total = numReadable + numWritable;
273        if (readyKeys.length < total) {
274            int newSize = Math.max((int) (readyKeys.length * 1.5f), total);
275            readyKeys = new SelectionKeyImpl[newSize];
276            flags = new int[newSize];
277        }
278
279        // populate the FDs, including the wakeup channel
280        readableFDs[0] = wakeupIn;
281        int r = 1;
282        int w = 0;
283        for (SelectionKeyImpl key : mutableKeys) {
284            int interestOps = key.interestOpsNoCheck();
285            if ((ACCEPT_OR_READ & interestOps) != 0) {
286                readableFDs[r] = ((FileDescriptorHandler) key.channel()).getFD();
287                readyKeys[r] = key;
288                r++;
289            }
290            if ((getReadyOps(key) & interestOps) != 0) {
291                writableFDs[w] = ((FileDescriptorHandler) key.channel()).getFD();
292                readyKeys[w + numReadable] = key;
293                w++;
294            }
295        }
296    }
297
298    /**
299     * Updates the key ready ops and selected key set with data from the flags
300     * array.
301     */
302    private int processSelectResult() throws IOException {
303        if (flags[0] == READABLE) {
304            /*
305             * Read bytes from the wakeup pipe until the pipe is empty. We made
306             * the FD non-blocking so we can just loop until read returns 0.
307             */
308            byte[] buffer = new byte[8];
309            while (Platform.FILE_SYSTEM.read(IoUtils.getFd(wakeupIn), buffer, 0, 1) != 0) {
310            }
311        }
312
313        int selected = 0;
314        for (int i = 1; i < flags.length; i++) {
315            if (flags[i] == NA) {
316                continue;
317            }
318
319            SelectionKeyImpl key = readyKeys[i];
320            int ops = key.interestOpsNoCheck();
321            int selectedOp = 0;
322
323            switch (flags[i]) {
324                case READABLE:
325                    selectedOp = ACCEPT_OR_READ & ops;
326                    break;
327                case WRITABLE:
328                    if (key.isConnected()) {
329                        selectedOp = OP_WRITE & ops;
330                    } else {
331                        selectedOp = OP_CONNECT & ops;
332                    }
333                    break;
334            }
335
336            if (selectedOp != 0) {
337                boolean wasSelected = mutableSelectedKeys.contains(key);
338                if (wasSelected && key.readyOps() != selectedOp) {
339                    key.setReadyOps(key.readyOps() | selectedOp);
340                    selected++;
341                } else if (!wasSelected) {
342                    key.setReadyOps(selectedOp);
343                    mutableSelectedKeys.add(key);
344                    selected++;
345                }
346            }
347        }
348
349        return selected;
350    }
351
352    @Override public synchronized Set<SelectionKey> selectedKeys() {
353        checkClosed();
354        return selectedKeys;
355    }
356
357    /**
358     * Removes cancelled keys from the key set and selected key set, and
359     * deregisters the corresponding channels. Returns the number of keys
360     * removed from the selected key set.
361     */
362    private int doCancel() {
363        int deselected = 0;
364
365        Set<SelectionKey> cancelledKeys = cancelledKeys();
366        synchronized (cancelledKeys) {
367            if (cancelledKeys.size() > 0) {
368                for (SelectionKey currentKey : cancelledKeys) {
369                    mutableKeys.remove(currentKey);
370                    deregister((AbstractSelectionKey) currentKey);
371                    if (mutableSelectedKeys.remove(currentKey)) {
372                        deselected++;
373                    }
374                }
375                cancelledKeys.clear();
376            }
377        }
378
379        return deselected;
380    }
381
382    @Override public Selector wakeup() {
383        try {
384            Platform.FILE_SYSTEM.write(IoUtils.getFd(wakeupOut), new byte[]{1}, 0, 1);
385        } catch (IOException ignored) {
386        }
387        return this;
388    }
389
390    private static class UnaddableSet<E> implements Set<E> {
391
392        private final Set<E> set;
393
394        UnaddableSet(Set<E> set) {
395            this.set = set;
396        }
397
398        @Override
399        public boolean equals(Object object) {
400            return set.equals(object);
401        }
402
403        @Override
404        public int hashCode() {
405            return set.hashCode();
406        }
407
408        public boolean add(E object) {
409            throw new UnsupportedOperationException();
410        }
411
412        public boolean addAll(Collection<? extends E> c) {
413            throw new UnsupportedOperationException();
414        }
415
416        public void clear() {
417            set.clear();
418        }
419
420        public boolean contains(Object object) {
421            return set.contains(object);
422        }
423
424        public boolean containsAll(Collection<?> c) {
425            return set.containsAll(c);
426        }
427
428        public boolean isEmpty() {
429            return set.isEmpty();
430        }
431
432        public Iterator<E> iterator() {
433            return set.iterator();
434        }
435
436        public boolean remove(Object object) {
437            return set.remove(object);
438        }
439
440        public boolean removeAll(Collection<?> c) {
441            return set.removeAll(c);
442        }
443
444        public boolean retainAll(Collection<?> c) {
445            return set.retainAll(c);
446        }
447
448        public int size() {
449            return set.size();
450        }
451
452        public Object[] toArray() {
453            return set.toArray();
454        }
455
456        public <T> T[] toArray(T[] a) {
457            return set.toArray(a);
458        }
459    }
460}
461