SelectorImpl.java revision ae704b984c10a63883cc366e823d53902d6ac7a9
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package java.nio; 17 18import java.io.FileDescriptor; 19import java.io.IOException; 20import java.nio.channels.ClosedSelectorException; 21import java.nio.channels.IllegalSelectorException; 22import java.nio.channels.Pipe; 23import java.nio.channels.SelectableChannel; 24import java.nio.channels.SelectionKey; 25import static java.nio.channels.SelectionKey.*; 26import java.nio.channels.Selector; 27import java.nio.channels.SocketChannel; 28import java.nio.channels.spi.AbstractSelectableChannel; 29import java.nio.channels.spi.AbstractSelectionKey; 30import java.nio.channels.spi.AbstractSelector; 31import java.nio.channels.spi.SelectorProvider; 32import java.util.Arrays; 33import java.util.Collection; 34import java.util.Collections; 35import java.util.HashSet; 36import java.util.Iterator; 37import java.util.Set; 38import org.apache.harmony.luni.platform.FileDescriptorHandler; 39import org.apache.harmony.luni.platform.Platform; 40 41/* 42 * Default implementation of java.nio.channels.Selector 43 */ 44final class SelectorImpl extends AbstractSelector { 45 46 private static final int[] EMPTY_INT_ARRAY = new int[0]; 47 48 private static final FileDescriptor[] EMPTY_FILE_DESCRIPTORS_ARRAY = new FileDescriptor[0]; 49 private static final SelectionKeyImpl[] EMPTY_SELECTION_KEY_IMPLS_ARRAY 50 = new SelectionKeyImpl[0]; 51 52 private static final int CONNECT_OR_WRITE = OP_CONNECT | OP_WRITE; 53 54 private static final int ACCEPT_OR_READ = OP_ACCEPT | OP_READ; 55 56 private static final int WAKEUP_WRITE_SIZE = 1; 57 58 private static final int WAKEUP_READ_SIZE = 8; 59 60 private static final int NA = 0; 61 62 private static final int READABLE = 1; 63 64 private static final int WRITABLE = 2; 65 66 private static final int SELECT_BLOCK = -1; 67 68 private static final int SELECT_NOW = 0; 69 70 /** 71 * Used to synchronize when a key's interest ops change. 72 */ 73 private static class KeysLock {} 74 final Object keysLock = new KeysLock(); 75 76 private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>(); 77 78 /** 79 * The unmodifiable set of keys as exposed to the user. This object is used 80 * for synchronization. 81 */ 82 private final Set<SelectionKey> unmodifiableKeys = Collections 83 .<SelectionKey>unmodifiableSet(mutableKeys); 84 85 private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>(); 86 87 /** 88 * The unmodifiable set of selectable keys as seen by the user. This object 89 * is used for synchronization. 90 */ 91 private final Set<SelectionKey> selectedKeys 92 = new UnaddableSet<SelectionKey>(mutableSelectedKeys); 93 94 /** 95 * The pipe used to implement wakeup. 96 */ 97 private final Pipe wakeupPipe; 98 99 /** 100 * File descriptors we're interested in reading from. When actively 101 * selecting, the first element is always the wakeup channel's file 102 * descriptor, and the other elements are user-specified file descriptors. 103 * Otherwise, all elements are null. 104 */ 105 private FileDescriptor[] readableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY; 106 107 /** 108 * File descriptors we're interested in writing from. May be empty. When not 109 * actively selecting, all elements are null. 110 */ 111 private FileDescriptor[] writableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY; 112 113 /** 114 * Selection keys that correspond to the concatenation of readableFDs and 115 * writableFDs. This is used to interpret the results returned by select(). 116 * When not actively selecting, all elements are null. 117 */ 118 private SelectionKeyImpl[] readyKeys = EMPTY_SELECTION_KEY_IMPLS_ARRAY; 119 120 /** 121 * Selection flags that define the ready ops on the ready keys. When not 122 * actively selecting, all elements are 0. Corresponds to the ready keys 123 * set. 124 */ 125 private int[] flags = EMPTY_INT_ARRAY; 126 127 public SelectorImpl(SelectorProvider selectorProvider) throws IOException { 128 super(selectorProvider); 129 wakeupPipe = selectorProvider.openPipe(); 130 wakeupPipe.source().configureBlocking(false); 131 } 132 133 @Override protected void implCloseSelector() throws IOException { 134 wakeup(); 135 synchronized (this) { 136 synchronized (unmodifiableKeys) { 137 synchronized (selectedKeys) { 138 wakeupPipe.sink().close(); 139 wakeupPipe.source().close(); 140 doCancel(); 141 for (SelectionKey sk : mutableKeys) { 142 deregister((AbstractSelectionKey) sk); 143 } 144 } 145 } 146 } 147 } 148 149 @Override protected SelectionKey register(AbstractSelectableChannel channel, 150 int operations, Object attachment) { 151 if (!provider().equals(channel.provider())) { 152 throw new IllegalSelectorException(); 153 } 154 synchronized (this) { 155 synchronized (unmodifiableKeys) { 156 SelectionKeyImpl selectionKey = new SelectionKeyImpl( 157 channel, operations, attachment, this); 158 mutableKeys.add(selectionKey); 159 return selectionKey; 160 } 161 } 162 } 163 164 @Override public synchronized Set<SelectionKey> keys() { 165 closeCheck(); 166 return unmodifiableKeys; 167 } 168 169 /* 170 * Checks that the receiver is not closed. If it is throws an exception. 171 */ 172 private void closeCheck() { 173 if (!isOpen()) { 174 throw new ClosedSelectorException(); 175 } 176 } 177 178 @Override public int select() throws IOException { 179 return selectInternal(SELECT_BLOCK); 180 } 181 182 @Override public int select(long timeout) throws IOException { 183 if (timeout < 0) { 184 throw new IllegalArgumentException(); 185 } 186 return selectInternal((0 == timeout) ? SELECT_BLOCK : timeout); 187 } 188 189 @Override public int selectNow() throws IOException { 190 return selectInternal(SELECT_NOW); 191 } 192 193 private int selectInternal(long timeout) throws IOException { 194 closeCheck(); 195 synchronized (this) { 196 synchronized (unmodifiableKeys) { 197 synchronized (selectedKeys) { 198 doCancel(); 199 boolean isBlock = (SELECT_NOW != timeout); 200 int readableKeysCount = 1; // first is always the wakeup channel 201 int writableKeysCount = 0; 202 synchronized (keysLock) { 203 for (SelectionKeyImpl key : mutableKeys) { 204 int ops = key.interestOpsNoCheck(); 205 if ((ACCEPT_OR_READ & ops) != 0) { 206 readableKeysCount++; 207 } 208 if ((CONNECT_OR_WRITE & ops) != 0) { 209 writableKeysCount++; 210 } 211 } 212 prepareChannels(readableKeysCount, writableKeysCount); 213 } 214 boolean success; 215 try { 216 if (isBlock) { 217 begin(); 218 } 219 success = Platform.NETWORK.select(readableFDs, writableFDs, 220 readableKeysCount, writableKeysCount, timeout, flags); 221 } finally { 222 if (isBlock) { 223 end(); 224 } 225 } 226 227 int selected = success ? processSelectResult() : 0; 228 229 Arrays.fill(readableFDs, null); 230 Arrays.fill(writableFDs, null); 231 Arrays.fill(readyKeys, null); 232 Arrays.fill(flags, 0); 233 234 selected -= doCancel(); 235 236 return selected; 237 } 238 } 239 } 240 } 241 242 private int getReadyOps(SelectionKeyImpl key) { 243 SelectableChannel channel = key.channel(); 244 return ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnectionPending()) ? 245 OP_WRITE : CONNECT_OR_WRITE; 246 } 247 248 /** 249 * Prepare the readableFDs, writableFDs, readyKeys and flags arrays in 250 * preparation for a call to {@code INetworkSystem#select()}. After they're 251 * used, the array elements must be cleared. 252 */ 253 private void prepareChannels(int numReadable, int numWritable) { 254 // grow each array to sufficient capacity. Always grow to at least 1.5x 255 // to avoid growing too frequently 256 if (readableFDs.length < numReadable) { 257 int newSize = Math.max((int) (readableFDs.length * 1.5f), numReadable); 258 readableFDs = new FileDescriptor[newSize]; 259 } 260 if (writableFDs.length < numWritable) { 261 int newSize = Math.max((int) (writableFDs.length * 1.5f), numWritable); 262 writableFDs = new FileDescriptor[newSize]; 263 } 264 int total = numReadable + numWritable; 265 if (readyKeys.length < total) { 266 int newSize = Math.max((int) (readyKeys.length * 1.5f), total); 267 readyKeys = new SelectionKeyImpl[newSize]; 268 flags = new int[newSize]; 269 } 270 271 // populate the FDs, including the wakeup channel 272 readableFDs[0] = ((FileDescriptorHandler) wakeupPipe.source()).getFD(); 273 int r = 1; 274 int w = 0; 275 for (SelectionKeyImpl key : mutableKeys) { 276 int interestOps = key.interestOpsNoCheck(); 277 if ((ACCEPT_OR_READ & interestOps) != 0) { 278 readableFDs[r] = ((FileDescriptorHandler) key.channel()).getFD(); 279 readyKeys[r] = key; 280 r++; 281 } 282 if ((getReadyOps(key) & interestOps) != 0) { 283 writableFDs[w] = ((FileDescriptorHandler) key.channel()).getFD(); 284 readyKeys[w + numReadable] = key; 285 w++; 286 } 287 } 288 } 289 290 /** 291 * Updates the key ready ops and selected key set with data from the flags 292 * array. 293 */ 294 private int processSelectResult() throws IOException { 295 // If there's something in the wakeup pipe, read it all --- the definition of the various 296 // select methods says that one select swallows all outstanding wakeups. We made this 297 // channel non-blocking in our constructor so that we can just loop until read returns 0. 298 if (flags[0] == READABLE) { 299 ByteBuffer buf = ByteBuffer.allocate(WAKEUP_READ_SIZE); 300 while (wakeupPipe.source().read(buf) > 0) { 301 buf.flip(); 302 } 303 } 304 305 int selected = 0; 306 for (int i = 1; i < flags.length; i++) { 307 if (flags[i] == NA) { 308 continue; 309 } 310 311 SelectionKeyImpl key = readyKeys[i]; 312 int ops = key.interestOpsNoCheck(); 313 int selectedOp = 0; 314 315 switch (flags[i]) { 316 case READABLE: 317 selectedOp = ACCEPT_OR_READ & ops; 318 break; 319 case WRITABLE: 320 if (key.isConnected()) { 321 selectedOp = OP_WRITE & ops; 322 } else { 323 selectedOp = OP_CONNECT & ops; 324 } 325 break; 326 } 327 328 if (selectedOp != 0) { 329 boolean wasSelected = mutableSelectedKeys.contains(key); 330 if (wasSelected && key.readyOps() != selectedOp) { 331 key.setReadyOps(key.readyOps() | selectedOp); 332 selected++; 333 } else if (!wasSelected) { 334 key.setReadyOps(selectedOp); 335 mutableSelectedKeys.add(key); 336 selected++; 337 } 338 } 339 } 340 341 return selected; 342 } 343 344 @Override public synchronized Set<SelectionKey> selectedKeys() { 345 closeCheck(); 346 return selectedKeys; 347 } 348 349 /** 350 * Removes cancelled keys from the key set and selected key set, and 351 * deregisters the corresponding channels. Returns the number of keys 352 * removed from the selected key set. 353 */ 354 private int doCancel() { 355 int deselected = 0; 356 357 Set<SelectionKey> cancelledKeys = cancelledKeys(); 358 synchronized (cancelledKeys) { 359 if (cancelledKeys.size() > 0) { 360 for (SelectionKey currentKey : cancelledKeys) { 361 mutableKeys.remove(currentKey); 362 deregister((AbstractSelectionKey) currentKey); 363 if (mutableSelectedKeys.remove(currentKey)) { 364 deselected++; 365 } 366 } 367 cancelledKeys.clear(); 368 } 369 } 370 371 return deselected; 372 } 373 374 @Override public Selector wakeup() { 375 try { 376 wakeupPipe.sink().write(ByteBuffer.allocate(WAKEUP_WRITE_SIZE)); 377 } catch (IOException ignored) { 378 } 379 return this; 380 } 381 382 private static class UnaddableSet<E> implements Set<E> { 383 384 private final Set<E> set; 385 386 UnaddableSet(Set<E> set) { 387 this.set = set; 388 } 389 390 @Override 391 public boolean equals(Object object) { 392 return set.equals(object); 393 } 394 395 @Override 396 public int hashCode() { 397 return set.hashCode(); 398 } 399 400 public boolean add(E object) { 401 throw new UnsupportedOperationException(); 402 } 403 404 public boolean addAll(Collection<? extends E> c) { 405 throw new UnsupportedOperationException(); 406 } 407 408 public void clear() { 409 set.clear(); 410 } 411 412 public boolean contains(Object object) { 413 return set.contains(object); 414 } 415 416 public boolean containsAll(Collection<?> c) { 417 return set.containsAll(c); 418 } 419 420 public boolean isEmpty() { 421 return set.isEmpty(); 422 } 423 424 public Iterator<E> iterator() { 425 return set.iterator(); 426 } 427 428 public boolean remove(Object object) { 429 return set.remove(object); 430 } 431 432 public boolean removeAll(Collection<?> c) { 433 return set.removeAll(c); 434 } 435 436 public boolean retainAll(Collection<?> c) { 437 return set.retainAll(c); 438 } 439 440 public int size() { 441 return set.size(); 442 } 443 444 public Object[] toArray() { 445 return set.toArray(); 446 } 447 448 public <T> T[] toArray(T[] a) { 449 return set.toArray(a); 450 } 451 } 452} 453