SelectorImpl.java revision f7bd2a99f6f4024e9034300b30a13a2ea871aa97
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package java.nio; 17 18import java.io.FileDescriptor; 19import java.io.IOException; 20import java.nio.channels.ClosedSelectorException; 21import java.nio.channels.IllegalSelectorException; 22import java.nio.channels.SelectableChannel; 23import java.nio.channels.SelectionKey; 24import static java.nio.channels.SelectionKey.*; 25import java.nio.channels.Selector; 26import java.nio.channels.SocketChannel; 27import java.nio.channels.spi.AbstractSelectableChannel; 28import java.nio.channels.spi.AbstractSelectionKey; 29import java.nio.channels.spi.AbstractSelector; 30import java.nio.channels.spi.SelectorProvider; 31import java.util.Arrays; 32import java.util.Collection; 33import java.util.Collections; 34import java.util.HashSet; 35import java.util.Iterator; 36import java.util.Set; 37import java.util.UnsafeArrayList; 38import libcore.io.ErrnoException; 39import libcore.io.IoBridge; 40import libcore.io.IoUtils; 41import libcore.io.Libcore; 42import libcore.io.StructPollfd; 43import libcore.util.EmptyArray; 44import static libcore.io.OsConstants.*; 45 46/* 47 * Default implementation of java.nio.channels.Selector 48 */ 49final class SelectorImpl extends AbstractSelector { 50 51 /** 52 * Used to synchronize when a key's interest ops change. 53 */ 54 final Object keysLock = new Object(); 55 56 private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>(); 57 58 /** 59 * The unmodifiable set of keys as exposed to the user. This object is used 60 * for synchronization. 61 */ 62 private final Set<SelectionKey> unmodifiableKeys = Collections 63 .<SelectionKey>unmodifiableSet(mutableKeys); 64 65 private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>(); 66 67 /** 68 * The unmodifiable set of selectable keys as seen by the user. This object 69 * is used for synchronization. 70 */ 71 private final Set<SelectionKey> selectedKeys 72 = new UnaddableSet<SelectionKey>(mutableSelectedKeys); 73 74 /** 75 * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each 76 * time select returns, wakeupIn is drained. 77 */ 78 private final FileDescriptor wakeupIn; 79 private final FileDescriptor wakeupOut; 80 81 private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8); 82 83 public SelectorImpl(SelectorProvider selectorProvider) throws IOException { 84 super(selectorProvider); 85 86 /* 87 * Create a pipes to trigger wakeup. We can't use a NIO pipe because it 88 * would be closed if the selecting thread is interrupted. Also 89 * configure the pipe so we can fully drain it without blocking. 90 */ 91 try { 92 FileDescriptor[] pipeFds = Libcore.os.pipe(); 93 wakeupIn = pipeFds[0]; 94 wakeupOut = pipeFds[1]; 95 IoUtils.setBlocking(wakeupIn, false); 96 pollFds.add(new StructPollfd()); 97 setPollFd(0, wakeupIn, POLLIN, null); 98 } catch (ErrnoException errnoException) { 99 throw errnoException.rethrowAsIOException(); 100 } 101 } 102 103 @Override protected void implCloseSelector() throws IOException { 104 wakeup(); 105 synchronized (this) { 106 synchronized (unmodifiableKeys) { 107 synchronized (selectedKeys) { 108 IoUtils.close(wakeupIn); 109 IoUtils.close(wakeupOut); 110 doCancel(); 111 for (SelectionKey sk : mutableKeys) { 112 deregister((AbstractSelectionKey) sk); 113 } 114 } 115 } 116 } 117 } 118 119 @Override protected SelectionKey register(AbstractSelectableChannel channel, 120 int operations, Object attachment) { 121 if (!provider().equals(channel.provider())) { 122 throw new IllegalSelectorException(); 123 } 124 synchronized (this) { 125 synchronized (unmodifiableKeys) { 126 SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations, 127 attachment, this); 128 mutableKeys.add(selectionKey); 129 ensurePollFdsCapacity(); 130 return selectionKey; 131 } 132 } 133 } 134 135 @Override public synchronized Set<SelectionKey> keys() { 136 checkClosed(); 137 return unmodifiableKeys; 138 } 139 140 private void checkClosed() { 141 if (!isOpen()) { 142 throw new ClosedSelectorException(); 143 } 144 } 145 146 @Override public int select() throws IOException { 147 // Blocks until some fd is ready. 148 return selectInternal(-1); 149 } 150 151 @Override public int select(long timeout) throws IOException { 152 if (timeout < 0) { 153 throw new IllegalArgumentException(); 154 } 155 // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow. 156 return selectInternal((timeout == 0) ? -1 : timeout); 157 } 158 159 @Override public int selectNow() throws IOException { 160 return selectInternal(0); 161 } 162 163 private int selectInternal(long timeout) throws IOException { 164 checkClosed(); 165 synchronized (this) { 166 synchronized (unmodifiableKeys) { 167 synchronized (selectedKeys) { 168 doCancel(); 169 boolean isBlock = (timeout != 0); 170 synchronized (keysLock) { 171 preparePollFds(); 172 } 173 int rc = -1; 174 try { 175 if (isBlock) { 176 begin(); 177 } 178 try { 179 rc = Libcore.os.poll(pollFds.array(), (int) timeout); 180 } catch (ErrnoException errnoException) { 181 if (errnoException.errno != EINTR) { 182 throw errnoException.rethrowAsIOException(); 183 } 184 } 185 } finally { 186 if (isBlock) { 187 end(); 188 } 189 } 190 191 int readyCount = (rc > 0) ? processPollFds() : 0; 192 readyCount -= doCancel(); 193 return readyCount; 194 } 195 } 196 } 197 } 198 199 private void setPollFd(int i, FileDescriptor fd, int events, Object object) { 200 StructPollfd pollFd = pollFds.get(i); 201 pollFd.fd = fd; 202 pollFd.events = (short) events; 203 pollFd.userData = object; 204 } 205 206 private void preparePollFds() { 207 int i = 1; // Our wakeup pipe comes before all the user's fds. 208 for (SelectionKeyImpl key : mutableKeys) { 209 int interestOps = key.interestOpsNoCheck(); 210 short eventMask = 0; 211 if (((OP_ACCEPT | OP_READ) & interestOps) != 0) { 212 eventMask |= POLLIN; 213 } 214 if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) { 215 eventMask |= POLLOUT; 216 } 217 if (eventMask != 0) { 218 setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key); 219 } 220 } 221 } 222 223 private void ensurePollFdsCapacity() { 224 // We need one slot for each element of mutableKeys, plus one for the wakeup pipe. 225 while (pollFds.size() < mutableKeys.size() + 1) { 226 pollFds.add(new StructPollfd()); 227 } 228 } 229 230 /** 231 * Updates the key ready ops and selected key set. 232 */ 233 private int processPollFds() throws IOException { 234 if (pollFds.get(0).revents == POLLIN) { 235 // Read bytes from the wakeup pipe until the pipe is empty. 236 byte[] buffer = new byte[8]; 237 while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) { 238 } 239 } 240 241 int readyKeyCount = 0; 242 for (int i = 1; i < pollFds.size(); ++i) { 243 StructPollfd pollFd = pollFds.get(i); 244 if (pollFd.revents == 0) { 245 continue; 246 } 247 if (pollFd.fd == null) { 248 break; 249 } 250 251 SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData; 252 253 pollFd.fd = null; 254 pollFd.userData = null; 255 256 int ops = key.interestOpsNoCheck(); 257 int selectedOp = 0; 258 if ((pollFd.revents & POLLIN) != 0) { 259 selectedOp = ops & (OP_ACCEPT | OP_READ); 260 } else if ((pollFd.revents & POLLOUT) != 0) { 261 if (key.isConnected()) { 262 selectedOp = ops & OP_WRITE; 263 } else { 264 selectedOp = ops & OP_CONNECT; 265 } 266 } 267 268 if (selectedOp != 0) { 269 boolean wasSelected = mutableSelectedKeys.contains(key); 270 if (wasSelected && key.readyOps() != selectedOp) { 271 key.setReadyOps(key.readyOps() | selectedOp); 272 ++readyKeyCount; 273 } else if (!wasSelected) { 274 key.setReadyOps(selectedOp); 275 mutableSelectedKeys.add(key); 276 ++readyKeyCount; 277 } 278 } 279 } 280 281 return readyKeyCount; 282 } 283 284 @Override public synchronized Set<SelectionKey> selectedKeys() { 285 checkClosed(); 286 return selectedKeys; 287 } 288 289 /** 290 * Removes cancelled keys from the key set and selected key set, and 291 * deregisters the corresponding channels. Returns the number of keys 292 * removed from the selected key set. 293 */ 294 private int doCancel() { 295 int deselected = 0; 296 297 Set<SelectionKey> cancelledKeys = cancelledKeys(); 298 synchronized (cancelledKeys) { 299 if (cancelledKeys.size() > 0) { 300 for (SelectionKey currentKey : cancelledKeys) { 301 mutableKeys.remove(currentKey); 302 deregister((AbstractSelectionKey) currentKey); 303 if (mutableSelectedKeys.remove(currentKey)) { 304 deselected++; 305 } 306 } 307 cancelledKeys.clear(); 308 } 309 } 310 311 return deselected; 312 } 313 314 @Override public Selector wakeup() { 315 try { 316 Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1); 317 } catch (ErrnoException ignored) { 318 } 319 return this; 320 } 321 322 private static class UnaddableSet<E> implements Set<E> { 323 324 private final Set<E> set; 325 326 UnaddableSet(Set<E> set) { 327 this.set = set; 328 } 329 330 @Override 331 public boolean equals(Object object) { 332 return set.equals(object); 333 } 334 335 @Override 336 public int hashCode() { 337 return set.hashCode(); 338 } 339 340 public boolean add(E object) { 341 throw new UnsupportedOperationException(); 342 } 343 344 public boolean addAll(Collection<? extends E> c) { 345 throw new UnsupportedOperationException(); 346 } 347 348 public void clear() { 349 set.clear(); 350 } 351 352 public boolean contains(Object object) { 353 return set.contains(object); 354 } 355 356 public boolean containsAll(Collection<?> c) { 357 return set.containsAll(c); 358 } 359 360 public boolean isEmpty() { 361 return set.isEmpty(); 362 } 363 364 public Iterator<E> iterator() { 365 return set.iterator(); 366 } 367 368 public boolean remove(Object object) { 369 return set.remove(object); 370 } 371 372 public boolean removeAll(Collection<?> c) { 373 return set.removeAll(c); 374 } 375 376 public boolean retainAll(Collection<?> c) { 377 return set.retainAll(c); 378 } 379 380 public int size() { 381 return set.size(); 382 } 383 384 public Object[] toArray() { 385 return set.toArray(); 386 } 387 388 public <T> T[] toArray(T[] a) { 389 return set.toArray(a); 390 } 391 } 392} 393