SelectorImpl.java revision ae704b984c10a63883cc366e823d53902d6ac7a9
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package java.nio;
17
18import java.io.FileDescriptor;
19import java.io.IOException;
20import java.nio.channels.ClosedSelectorException;
21import java.nio.channels.IllegalSelectorException;
22import java.nio.channels.Pipe;
23import java.nio.channels.SelectableChannel;
24import java.nio.channels.SelectionKey;
25import static java.nio.channels.SelectionKey.*;
26import java.nio.channels.Selector;
27import java.nio.channels.SocketChannel;
28import java.nio.channels.spi.AbstractSelectableChannel;
29import java.nio.channels.spi.AbstractSelectionKey;
30import java.nio.channels.spi.AbstractSelector;
31import java.nio.channels.spi.SelectorProvider;
32import java.util.Arrays;
33import java.util.Collection;
34import java.util.Collections;
35import java.util.HashSet;
36import java.util.Iterator;
37import java.util.Set;
38import org.apache.harmony.luni.platform.FileDescriptorHandler;
39import org.apache.harmony.luni.platform.Platform;
40
41/*
42 * Default implementation of java.nio.channels.Selector
43 */
44final class SelectorImpl extends AbstractSelector {
45
46    private static final int[] EMPTY_INT_ARRAY = new int[0];
47
48    private static final FileDescriptor[] EMPTY_FILE_DESCRIPTORS_ARRAY = new FileDescriptor[0];
49    private static final SelectionKeyImpl[] EMPTY_SELECTION_KEY_IMPLS_ARRAY
50            = new SelectionKeyImpl[0];
51
52    private static final int CONNECT_OR_WRITE = OP_CONNECT | OP_WRITE;
53
54    private static final int ACCEPT_OR_READ = OP_ACCEPT | OP_READ;
55
56    private static final int WAKEUP_WRITE_SIZE = 1;
57
58    private static final int WAKEUP_READ_SIZE = 8;
59
60    private static final int NA = 0;
61
62    private static final int READABLE = 1;
63
64    private static final int WRITABLE = 2;
65
66    private static final int SELECT_BLOCK = -1;
67
68    private static final int SELECT_NOW = 0;
69
70    /**
71     * Used to synchronize when a key's interest ops change.
72     */
73    private static class KeysLock {}
74    final Object keysLock = new KeysLock();
75
76    private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>();
77
78    /**
79     * The unmodifiable set of keys as exposed to the user. This object is used
80     * for synchronization.
81     */
82    private final Set<SelectionKey> unmodifiableKeys = Collections
83            .<SelectionKey>unmodifiableSet(mutableKeys);
84
85    private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>();
86
87    /**
88     * The unmodifiable set of selectable keys as seen by the user. This object
89     * is used for synchronization.
90     */
91    private final Set<SelectionKey> selectedKeys
92            = new UnaddableSet<SelectionKey>(mutableSelectedKeys);
93
94    /**
95     * The pipe used to implement wakeup.
96     */
97    private final Pipe wakeupPipe;
98
99    /**
100     * File descriptors we're interested in reading from. When actively
101     * selecting, the first element is always the wakeup channel's file
102     * descriptor, and the other elements are user-specified file descriptors.
103     * Otherwise, all elements are null.
104     */
105    private FileDescriptor[] readableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY;
106
107    /**
108     * File descriptors we're interested in writing from. May be empty. When not
109     * actively selecting, all elements are null.
110     */
111    private FileDescriptor[] writableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY;
112
113    /**
114     * Selection keys that correspond to the concatenation of readableFDs and
115     * writableFDs. This is used to interpret the results returned by select().
116     * When not actively selecting, all elements are null.
117     */
118    private SelectionKeyImpl[] readyKeys = EMPTY_SELECTION_KEY_IMPLS_ARRAY;
119
120    /**
121     * Selection flags that define the ready ops on the ready keys. When not
122     * actively selecting, all elements are 0. Corresponds to the ready keys
123     * set.
124     */
125    private int[] flags = EMPTY_INT_ARRAY;
126
127    public SelectorImpl(SelectorProvider selectorProvider) throws IOException {
128        super(selectorProvider);
129        wakeupPipe = selectorProvider.openPipe();
130        wakeupPipe.source().configureBlocking(false);
131    }
132
133    @Override protected void implCloseSelector() throws IOException {
134        wakeup();
135        synchronized (this) {
136            synchronized (unmodifiableKeys) {
137                synchronized (selectedKeys) {
138                    wakeupPipe.sink().close();
139                    wakeupPipe.source().close();
140                    doCancel();
141                    for (SelectionKey sk : mutableKeys) {
142                        deregister((AbstractSelectionKey) sk);
143                    }
144                }
145            }
146        }
147    }
148
149    @Override protected SelectionKey register(AbstractSelectableChannel channel,
150            int operations, Object attachment) {
151        if (!provider().equals(channel.provider())) {
152            throw new IllegalSelectorException();
153        }
154        synchronized (this) {
155            synchronized (unmodifiableKeys) {
156                SelectionKeyImpl selectionKey = new SelectionKeyImpl(
157                        channel, operations, attachment, this);
158                mutableKeys.add(selectionKey);
159                return selectionKey;
160            }
161        }
162    }
163
164    @Override public synchronized Set<SelectionKey> keys() {
165        closeCheck();
166        return unmodifiableKeys;
167    }
168
169    /*
170     * Checks that the receiver is not closed. If it is throws an exception.
171     */
172    private void closeCheck() {
173        if (!isOpen()) {
174            throw new ClosedSelectorException();
175        }
176    }
177
178    @Override public int select() throws IOException {
179        return selectInternal(SELECT_BLOCK);
180    }
181
182    @Override public int select(long timeout) throws IOException {
183        if (timeout < 0) {
184            throw new IllegalArgumentException();
185        }
186        return selectInternal((0 == timeout) ? SELECT_BLOCK : timeout);
187    }
188
189    @Override public int selectNow() throws IOException {
190        return selectInternal(SELECT_NOW);
191    }
192
193    private int selectInternal(long timeout) throws IOException {
194        closeCheck();
195        synchronized (this) {
196            synchronized (unmodifiableKeys) {
197                synchronized (selectedKeys) {
198                    doCancel();
199                    boolean isBlock = (SELECT_NOW != timeout);
200                    int readableKeysCount = 1; // first is always the wakeup channel
201                    int writableKeysCount = 0;
202                    synchronized (keysLock) {
203                        for (SelectionKeyImpl key : mutableKeys) {
204                            int ops = key.interestOpsNoCheck();
205                            if ((ACCEPT_OR_READ & ops) != 0) {
206                                readableKeysCount++;
207                            }
208                            if ((CONNECT_OR_WRITE & ops) != 0) {
209                                writableKeysCount++;
210                            }
211                        }
212                        prepareChannels(readableKeysCount, writableKeysCount);
213                    }
214                    boolean success;
215                    try {
216                        if (isBlock) {
217                            begin();
218                        }
219                        success = Platform.NETWORK.select(readableFDs, writableFDs,
220                                readableKeysCount, writableKeysCount, timeout, flags);
221                    } finally {
222                        if (isBlock) {
223                            end();
224                        }
225                    }
226
227                    int selected = success ? processSelectResult() : 0;
228
229                    Arrays.fill(readableFDs, null);
230                    Arrays.fill(writableFDs, null);
231                    Arrays.fill(readyKeys, null);
232                    Arrays.fill(flags, 0);
233
234                    selected -= doCancel();
235
236                    return selected;
237                }
238            }
239        }
240    }
241
242    private int getReadyOps(SelectionKeyImpl key) {
243        SelectableChannel channel = key.channel();
244        return ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnectionPending()) ?
245                OP_WRITE : CONNECT_OR_WRITE;
246    }
247
248    /**
249     * Prepare the readableFDs, writableFDs, readyKeys and flags arrays in
250     * preparation for a call to {@code INetworkSystem#select()}. After they're
251     * used, the array elements must be cleared.
252     */
253    private void prepareChannels(int numReadable, int numWritable) {
254        // grow each array to sufficient capacity. Always grow to at least 1.5x
255        // to avoid growing too frequently
256        if (readableFDs.length < numReadable) {
257            int newSize = Math.max((int) (readableFDs.length * 1.5f), numReadable);
258            readableFDs = new FileDescriptor[newSize];
259        }
260        if (writableFDs.length < numWritable) {
261            int newSize = Math.max((int) (writableFDs.length * 1.5f), numWritable);
262            writableFDs = new FileDescriptor[newSize];
263        }
264        int total = numReadable + numWritable;
265        if (readyKeys.length < total) {
266            int newSize = Math.max((int) (readyKeys.length * 1.5f), total);
267            readyKeys = new SelectionKeyImpl[newSize];
268            flags = new int[newSize];
269        }
270
271        // populate the FDs, including the wakeup channel
272        readableFDs[0] = ((FileDescriptorHandler) wakeupPipe.source()).getFD();
273        int r = 1;
274        int w = 0;
275        for (SelectionKeyImpl key : mutableKeys) {
276            int interestOps = key.interestOpsNoCheck();
277            if ((ACCEPT_OR_READ & interestOps) != 0) {
278                readableFDs[r] = ((FileDescriptorHandler) key.channel()).getFD();
279                readyKeys[r] = key;
280                r++;
281            }
282            if ((getReadyOps(key) & interestOps) != 0) {
283                writableFDs[w] = ((FileDescriptorHandler) key.channel()).getFD();
284                readyKeys[w + numReadable] = key;
285                w++;
286            }
287        }
288    }
289
290    /**
291     * Updates the key ready ops and selected key set with data from the flags
292     * array.
293     */
294    private int processSelectResult() throws IOException {
295        // If there's something in the wakeup pipe, read it all --- the definition of the various
296        // select methods says that one select swallows all outstanding wakeups. We made this
297        // channel non-blocking in our constructor so that we can just loop until read returns 0.
298        if (flags[0] == READABLE) {
299            ByteBuffer buf = ByteBuffer.allocate(WAKEUP_READ_SIZE);
300            while (wakeupPipe.source().read(buf) > 0) {
301                buf.flip();
302            }
303        }
304
305        int selected = 0;
306        for (int i = 1; i < flags.length; i++) {
307            if (flags[i] == NA) {
308                continue;
309            }
310
311            SelectionKeyImpl key = readyKeys[i];
312            int ops = key.interestOpsNoCheck();
313            int selectedOp = 0;
314
315            switch (flags[i]) {
316                case READABLE:
317                    selectedOp = ACCEPT_OR_READ & ops;
318                    break;
319                case WRITABLE:
320                    if (key.isConnected()) {
321                        selectedOp = OP_WRITE & ops;
322                    } else {
323                        selectedOp = OP_CONNECT & ops;
324                    }
325                    break;
326            }
327
328            if (selectedOp != 0) {
329                boolean wasSelected = mutableSelectedKeys.contains(key);
330                if (wasSelected && key.readyOps() != selectedOp) {
331                    key.setReadyOps(key.readyOps() | selectedOp);
332                    selected++;
333                } else if (!wasSelected) {
334                    key.setReadyOps(selectedOp);
335                    mutableSelectedKeys.add(key);
336                    selected++;
337                }
338            }
339        }
340
341        return selected;
342    }
343
344    @Override public synchronized Set<SelectionKey> selectedKeys() {
345        closeCheck();
346        return selectedKeys;
347    }
348
349    /**
350     * Removes cancelled keys from the key set and selected key set, and
351     * deregisters the corresponding channels. Returns the number of keys
352     * removed from the selected key set.
353     */
354    private int doCancel() {
355        int deselected = 0;
356
357        Set<SelectionKey> cancelledKeys = cancelledKeys();
358        synchronized (cancelledKeys) {
359            if (cancelledKeys.size() > 0) {
360                for (SelectionKey currentKey : cancelledKeys) {
361                    mutableKeys.remove(currentKey);
362                    deregister((AbstractSelectionKey) currentKey);
363                    if (mutableSelectedKeys.remove(currentKey)) {
364                        deselected++;
365                    }
366                }
367                cancelledKeys.clear();
368            }
369        }
370
371        return deselected;
372    }
373
374    @Override public Selector wakeup() {
375        try {
376            wakeupPipe.sink().write(ByteBuffer.allocate(WAKEUP_WRITE_SIZE));
377        } catch (IOException ignored) {
378        }
379        return this;
380    }
381
382    private static class UnaddableSet<E> implements Set<E> {
383
384        private final Set<E> set;
385
386        UnaddableSet(Set<E> set) {
387            this.set = set;
388        }
389
390        @Override
391        public boolean equals(Object object) {
392            return set.equals(object);
393        }
394
395        @Override
396        public int hashCode() {
397            return set.hashCode();
398        }
399
400        public boolean add(E object) {
401            throw new UnsupportedOperationException();
402        }
403
404        public boolean addAll(Collection<? extends E> c) {
405            throw new UnsupportedOperationException();
406        }
407
408        public void clear() {
409            set.clear();
410        }
411
412        public boolean contains(Object object) {
413            return set.contains(object);
414        }
415
416        public boolean containsAll(Collection<?> c) {
417            return set.containsAll(c);
418        }
419
420        public boolean isEmpty() {
421            return set.isEmpty();
422        }
423
424        public Iterator<E> iterator() {
425            return set.iterator();
426        }
427
428        public boolean remove(Object object) {
429            return set.remove(object);
430        }
431
432        public boolean removeAll(Collection<?> c) {
433            return set.removeAll(c);
434        }
435
436        public boolean retainAll(Collection<?> c) {
437            return set.retainAll(c);
438        }
439
440        public int size() {
441            return set.size();
442        }
443
444        public Object[] toArray() {
445            return set.toArray();
446        }
447
448        public <T> T[] toArray(T[] a) {
449            return set.toArray(a);
450        }
451    }
452}
453