SelectorImpl.java revision 41f0605d2c809bd9bc1c0fb68d86b49a0f59b6c5
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package java.nio; 17 18import java.io.FileDescriptor; 19import java.io.IOException; 20import java.nio.channels.ClosedSelectorException; 21import java.nio.channels.IllegalSelectorException; 22import java.nio.channels.SelectableChannel; 23import java.nio.channels.SelectionKey; 24import static java.nio.channels.SelectionKey.*; 25import java.nio.channels.Selector; 26import java.nio.channels.SocketChannel; 27import java.nio.channels.spi.AbstractSelectableChannel; 28import java.nio.channels.spi.AbstractSelectionKey; 29import java.nio.channels.spi.AbstractSelector; 30import java.nio.channels.spi.SelectorProvider; 31import java.util.Arrays; 32import java.util.Collection; 33import java.util.Collections; 34import java.util.HashSet; 35import java.util.Iterator; 36import java.util.Set; 37import libcore.io.ErrnoException; 38import libcore.io.IoUtils; 39import libcore.io.Libcore; 40import libcore.util.EmptyArray; 41import org.apache.harmony.luni.platform.FileDescriptorHandler; 42import org.apache.harmony.luni.platform.Platform; 43 44/* 45 * Default implementation of java.nio.channels.Selector 46 */ 47final class SelectorImpl extends AbstractSelector { 48 49 static final FileDescriptor[] EMPTY_FILE_DESCRIPTORS_ARRAY = new FileDescriptor[0]; 50 51 private static final SelectionKeyImpl[] EMPTY_SELECTION_KEY_IMPLS_ARRAY 52 = new SelectionKeyImpl[0]; 53 54 private static final int CONNECT_OR_WRITE = OP_CONNECT | OP_WRITE; 55 56 private static final int ACCEPT_OR_READ = OP_ACCEPT | OP_READ; 57 58 private static final int NA = 0; 59 60 private static final int READABLE = 1; 61 62 private static final int WRITABLE = 2; 63 64 private static final int SELECT_BLOCK = -1; 65 66 private static final int SELECT_NOW = 0; 67 68 /** 69 * Used to synchronize when a key's interest ops change. 70 */ 71 final Object keysLock = new Object(); 72 73 private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>(); 74 75 /** 76 * The unmodifiable set of keys as exposed to the user. This object is used 77 * for synchronization. 78 */ 79 private final Set<SelectionKey> unmodifiableKeys = Collections 80 .<SelectionKey>unmodifiableSet(mutableKeys); 81 82 private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>(); 83 84 /** 85 * The unmodifiable set of selectable keys as seen by the user. This object 86 * is used for synchronization. 87 */ 88 private final Set<SelectionKey> selectedKeys 89 = new UnaddableSet<SelectionKey>(mutableSelectedKeys); 90 91 /** 92 * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each 93 * time select returns, wakeupIn is drained. 94 */ 95 private final FileDescriptor wakeupIn; 96 private final FileDescriptor wakeupOut; 97 98 /** 99 * File descriptors we're interested in reading from. When actively 100 * selecting, the first element is always the wakeup channel's file 101 * descriptor, and the other elements are user-specified file descriptors. 102 * Otherwise, all elements are null. 103 */ 104 private FileDescriptor[] readableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY; 105 106 /** 107 * File descriptors we're interested in writing from. May be empty. When not 108 * actively selecting, all elements are null. 109 */ 110 private FileDescriptor[] writableFDs = EMPTY_FILE_DESCRIPTORS_ARRAY; 111 112 /** 113 * Selection keys that correspond to the concatenation of readableFDs and 114 * writableFDs. This is used to interpret the results returned by select(). 115 * When not actively selecting, all elements are null. 116 */ 117 private SelectionKeyImpl[] readyKeys = EMPTY_SELECTION_KEY_IMPLS_ARRAY; 118 119 /** 120 * Selection flags that define the ready ops on the ready keys. When not 121 * actively selecting, all elements are 0. Corresponds to the ready keys 122 * set. 123 */ 124 private int[] flags = EmptyArray.INT; 125 126 public SelectorImpl(SelectorProvider selectorProvider) throws IOException { 127 super(selectorProvider); 128 129 /* 130 * Create a pipes to trigger wakeup. We can't use a NIO pipe because it 131 * would be closed if the selecting thread is interrupted. Also 132 * configure the pipe so we can fully drain it without blocking. 133 */ 134 try { 135 FileDescriptor[] pipeFds = Libcore.os.pipe(); 136 wakeupIn = pipeFds[0]; 137 wakeupOut = pipeFds[1]; 138 IoUtils.setBlocking(wakeupIn, false); 139 } catch (ErrnoException errnoException) { 140 throw errnoException.rethrowAsIOException(); 141 } 142 } 143 144 @Override protected void implCloseSelector() throws IOException { 145 wakeup(); 146 synchronized (this) { 147 synchronized (unmodifiableKeys) { 148 synchronized (selectedKeys) { 149 IoUtils.close(wakeupIn); 150 IoUtils.close(wakeupOut); 151 doCancel(); 152 for (SelectionKey sk : mutableKeys) { 153 deregister((AbstractSelectionKey) sk); 154 } 155 } 156 } 157 } 158 } 159 160 @Override protected SelectionKey register(AbstractSelectableChannel channel, 161 int operations, Object attachment) { 162 if (!provider().equals(channel.provider())) { 163 throw new IllegalSelectorException(); 164 } 165 synchronized (this) { 166 synchronized (unmodifiableKeys) { 167 SelectionKeyImpl selectionKey = new SelectionKeyImpl( 168 channel, operations, attachment, this); 169 mutableKeys.add(selectionKey); 170 return selectionKey; 171 } 172 } 173 } 174 175 @Override public synchronized Set<SelectionKey> keys() { 176 checkClosed(); 177 return unmodifiableKeys; 178 } 179 180 private void checkClosed() { 181 if (!isOpen()) { 182 throw new ClosedSelectorException(); 183 } 184 } 185 186 @Override public int select() throws IOException { 187 return selectInternal(SELECT_BLOCK); 188 } 189 190 @Override public int select(long timeout) throws IOException { 191 if (timeout < 0) { 192 throw new IllegalArgumentException(); 193 } 194 return selectInternal((timeout == 0) ? SELECT_BLOCK : timeout); 195 } 196 197 @Override public int selectNow() throws IOException { 198 return selectInternal(SELECT_NOW); 199 } 200 201 private int selectInternal(long timeout) throws IOException { 202 checkClosed(); 203 synchronized (this) { 204 synchronized (unmodifiableKeys) { 205 synchronized (selectedKeys) { 206 doCancel(); 207 boolean isBlock = (SELECT_NOW != timeout); 208 int readableKeysCount = 1; // first is always the wakeup channel 209 int writableKeysCount = 0; 210 synchronized (keysLock) { 211 for (SelectionKeyImpl key : mutableKeys) { 212 int ops = key.interestOpsNoCheck(); 213 if ((ACCEPT_OR_READ & ops) != 0) { 214 readableKeysCount++; 215 } 216 if ((CONNECT_OR_WRITE & ops) != 0) { 217 writableKeysCount++; 218 } 219 } 220 prepareChannels(readableKeysCount, writableKeysCount); 221 } 222 boolean success; 223 try { 224 if (isBlock) { 225 begin(); 226 } 227 success = Platform.NETWORK.select(readableFDs, writableFDs, 228 readableKeysCount, writableKeysCount, timeout, flags); 229 } finally { 230 if (isBlock) { 231 end(); 232 } 233 } 234 235 int selected = success ? processSelectResult() : 0; 236 237 Arrays.fill(readableFDs, null); 238 Arrays.fill(writableFDs, null); 239 Arrays.fill(readyKeys, null); 240 Arrays.fill(flags, 0); 241 242 selected -= doCancel(); 243 244 return selected; 245 } 246 } 247 } 248 } 249 250 private int getReadyOps(SelectionKeyImpl key) { 251 SelectableChannel channel = key.channel(); 252 return ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnectionPending()) ? 253 OP_WRITE : CONNECT_OR_WRITE; 254 } 255 256 /** 257 * Prepare the readableFDs, writableFDs, readyKeys and flags arrays in 258 * preparation for a call to {@code INetworkSystem#select()}. After they're 259 * used, the array elements must be cleared. 260 */ 261 private void prepareChannels(int numReadable, int numWritable) { 262 // grow each array to sufficient capacity. Always grow to at least 1.5x 263 // to avoid growing too frequently 264 if (readableFDs.length < numReadable) { 265 int newSize = Math.max((int) (readableFDs.length * 1.5f), numReadable); 266 readableFDs = new FileDescriptor[newSize]; 267 } 268 if (writableFDs.length < numWritable) { 269 int newSize = Math.max((int) (writableFDs.length * 1.5f), numWritable); 270 writableFDs = new FileDescriptor[newSize]; 271 } 272 int total = numReadable + numWritable; 273 if (readyKeys.length < total) { 274 int newSize = Math.max((int) (readyKeys.length * 1.5f), total); 275 readyKeys = new SelectionKeyImpl[newSize]; 276 flags = new int[newSize]; 277 } 278 279 // populate the FDs, including the wakeup channel 280 readableFDs[0] = wakeupIn; 281 int r = 1; 282 int w = 0; 283 for (SelectionKeyImpl key : mutableKeys) { 284 int interestOps = key.interestOpsNoCheck(); 285 if ((ACCEPT_OR_READ & interestOps) != 0) { 286 readableFDs[r] = ((FileDescriptorHandler) key.channel()).getFD(); 287 readyKeys[r] = key; 288 r++; 289 } 290 if ((getReadyOps(key) & interestOps) != 0) { 291 writableFDs[w] = ((FileDescriptorHandler) key.channel()).getFD(); 292 readyKeys[w + numReadable] = key; 293 w++; 294 } 295 } 296 } 297 298 /** 299 * Updates the key ready ops and selected key set with data from the flags 300 * array. 301 */ 302 private int processSelectResult() throws IOException { 303 if (flags[0] == READABLE) { 304 /* 305 * Read bytes from the wakeup pipe until the pipe is empty. We made 306 * the FD non-blocking so we can just loop until read returns 0. 307 */ 308 byte[] buffer = new byte[8]; 309 while (Platform.FILE_SYSTEM.read(IoUtils.getFd(wakeupIn), buffer, 0, 1) != 0) { 310 } 311 } 312 313 int selected = 0; 314 for (int i = 1; i < flags.length; i++) { 315 if (flags[i] == NA) { 316 continue; 317 } 318 319 SelectionKeyImpl key = readyKeys[i]; 320 int ops = key.interestOpsNoCheck(); 321 int selectedOp = 0; 322 323 switch (flags[i]) { 324 case READABLE: 325 selectedOp = ACCEPT_OR_READ & ops; 326 break; 327 case WRITABLE: 328 if (key.isConnected()) { 329 selectedOp = OP_WRITE & ops; 330 } else { 331 selectedOp = OP_CONNECT & ops; 332 } 333 break; 334 } 335 336 if (selectedOp != 0) { 337 boolean wasSelected = mutableSelectedKeys.contains(key); 338 if (wasSelected && key.readyOps() != selectedOp) { 339 key.setReadyOps(key.readyOps() | selectedOp); 340 selected++; 341 } else if (!wasSelected) { 342 key.setReadyOps(selectedOp); 343 mutableSelectedKeys.add(key); 344 selected++; 345 } 346 } 347 } 348 349 return selected; 350 } 351 352 @Override public synchronized Set<SelectionKey> selectedKeys() { 353 checkClosed(); 354 return selectedKeys; 355 } 356 357 /** 358 * Removes cancelled keys from the key set and selected key set, and 359 * deregisters the corresponding channels. Returns the number of keys 360 * removed from the selected key set. 361 */ 362 private int doCancel() { 363 int deselected = 0; 364 365 Set<SelectionKey> cancelledKeys = cancelledKeys(); 366 synchronized (cancelledKeys) { 367 if (cancelledKeys.size() > 0) { 368 for (SelectionKey currentKey : cancelledKeys) { 369 mutableKeys.remove(currentKey); 370 deregister((AbstractSelectionKey) currentKey); 371 if (mutableSelectedKeys.remove(currentKey)) { 372 deselected++; 373 } 374 } 375 cancelledKeys.clear(); 376 } 377 } 378 379 return deselected; 380 } 381 382 @Override public Selector wakeup() { 383 try { 384 Platform.FILE_SYSTEM.write(IoUtils.getFd(wakeupOut), new byte[]{1}, 0, 1); 385 } catch (IOException ignored) { 386 } 387 return this; 388 } 389 390 private static class UnaddableSet<E> implements Set<E> { 391 392 private final Set<E> set; 393 394 UnaddableSet(Set<E> set) { 395 this.set = set; 396 } 397 398 @Override 399 public boolean equals(Object object) { 400 return set.equals(object); 401 } 402 403 @Override 404 public int hashCode() { 405 return set.hashCode(); 406 } 407 408 public boolean add(E object) { 409 throw new UnsupportedOperationException(); 410 } 411 412 public boolean addAll(Collection<? extends E> c) { 413 throw new UnsupportedOperationException(); 414 } 415 416 public void clear() { 417 set.clear(); 418 } 419 420 public boolean contains(Object object) { 421 return set.contains(object); 422 } 423 424 public boolean containsAll(Collection<?> c) { 425 return set.containsAll(c); 426 } 427 428 public boolean isEmpty() { 429 return set.isEmpty(); 430 } 431 432 public Iterator<E> iterator() { 433 return set.iterator(); 434 } 435 436 public boolean remove(Object object) { 437 return set.remove(object); 438 } 439 440 public boolean removeAll(Collection<?> c) { 441 return set.removeAll(c); 442 } 443 444 public boolean retainAll(Collection<?> c) { 445 return set.retainAll(c); 446 } 447 448 public int size() { 449 return set.size(); 450 } 451 452 public Object[] toArray() { 453 return set.toArray(); 454 } 455 456 public <T> T[] toArray(T[] a) { 457 return set.toArray(a); 458 } 459 } 460} 461