SelectorImpl.java revision fa542091e45db699a937c5ac0191194405107827
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package java.nio; 17 18import android.system.ErrnoException; 19import android.system.StructPollfd; 20import java.io.FileDescriptor; 21import java.io.InterruptedIOException; 22import java.io.IOException; 23import java.nio.channels.ClosedSelectorException; 24import java.nio.channels.IllegalSelectorException; 25import java.nio.channels.SelectionKey; 26import java.nio.channels.Selector; 27import java.nio.channels.spi.AbstractSelectableChannel; 28import java.nio.channels.spi.AbstractSelectionKey; 29import java.nio.channels.spi.AbstractSelector; 30import java.nio.channels.spi.SelectorProvider; 31import java.util.Collection; 32import java.util.Collections; 33import java.util.HashSet; 34import java.util.Iterator; 35import java.util.Set; 36import java.util.UnsafeArrayList; 37import libcore.io.IoBridge; 38import libcore.io.IoUtils; 39import libcore.io.Libcore; 40 41import static android.system.OsConstants.POLLERR; 42import static android.system.OsConstants.POLLHUP; 43import static android.system.OsConstants.POLLIN; 44import static android.system.OsConstants.POLLOUT; 45import static java.nio.channels.SelectionKey.OP_ACCEPT; 46import static java.nio.channels.SelectionKey.OP_CONNECT; 47import static java.nio.channels.SelectionKey.OP_READ; 48import static java.nio.channels.SelectionKey.OP_WRITE; 49 50/* 51 * Default implementation of java.nio.channels.Selector 52 */ 53final class SelectorImpl extends AbstractSelector { 54 55 /** 56 * Used to synchronize when a key's interest ops change. 57 */ 58 final Object keysLock = new Object(); 59 60 private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>(); 61 62 /** 63 * The unmodifiable set of keys as exposed to the user. This object is used 64 * for synchronization. 65 */ 66 private final Set<SelectionKey> unmodifiableKeys = Collections 67 .<SelectionKey>unmodifiableSet(mutableKeys); 68 69 private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>(); 70 71 /** 72 * The unmodifiable set of selectable keys as seen by the user. This object 73 * is used for synchronization. 74 */ 75 private final Set<SelectionKey> selectedKeys 76 = new UnaddableSet<SelectionKey>(mutableSelectedKeys); 77 78 /** 79 * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each 80 * time select returns, wakeupIn is drained. 81 */ 82 private final FileDescriptor wakeupIn; 83 private final FileDescriptor wakeupOut; 84 85 private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8); 86 87 public SelectorImpl(SelectorProvider selectorProvider) throws IOException { 88 super(selectorProvider); 89 90 /* 91 * Create a pipe to trigger wakeup. We can't use a NIO pipe because it 92 * would be closed if the selecting thread is interrupted. Also 93 * configure the pipe so we can fully drain it without blocking. 94 */ 95 try { 96 FileDescriptor[] pipeFds = Libcore.os.pipe2(0); 97 wakeupIn = pipeFds[0]; 98 wakeupOut = pipeFds[1]; 99 IoUtils.setBlocking(wakeupIn, false); 100 pollFds.add(new StructPollfd()); 101 setPollFd(0, wakeupIn, POLLIN, null); 102 } catch (ErrnoException errnoException) { 103 throw errnoException.rethrowAsIOException(); 104 } 105 } 106 107 @Override protected void implCloseSelector() throws IOException { 108 wakeup(); 109 synchronized (this) { 110 synchronized (unmodifiableKeys) { 111 synchronized (selectedKeys) { 112 IoUtils.close(wakeupIn); 113 IoUtils.close(wakeupOut); 114 doCancel(); 115 for (SelectionKey sk : mutableKeys) { 116 deregister((AbstractSelectionKey) sk); 117 } 118 } 119 } 120 } 121 } 122 123 @Override protected SelectionKey register(AbstractSelectableChannel channel, 124 int operations, Object attachment) { 125 if (!provider().equals(channel.provider())) { 126 throw new IllegalSelectorException(); 127 } 128 synchronized (this) { 129 synchronized (unmodifiableKeys) { 130 SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations, 131 attachment, this); 132 mutableKeys.add(selectionKey); 133 ensurePollFdsCapacity(); 134 return selectionKey; 135 } 136 } 137 } 138 139 @Override public synchronized Set<SelectionKey> keys() { 140 checkClosed(); 141 return unmodifiableKeys; 142 } 143 144 private void checkClosed() { 145 if (!isOpen()) { 146 throw new ClosedSelectorException(); 147 } 148 } 149 150 @Override public int select() throws IOException { 151 // Blocks until some fd is ready. 152 return selectInternal(-1); 153 } 154 155 @Override public int select(long timeout) throws IOException { 156 if (timeout < 0) { 157 throw new IllegalArgumentException("timeout < 0: " + timeout); 158 } 159 // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow. 160 return selectInternal((timeout == 0) ? -1 : timeout); 161 } 162 163 @Override public int selectNow() throws IOException { 164 return selectInternal(0); 165 } 166 167 private int selectInternal(long timeout) throws IOException { 168 checkClosed(); 169 synchronized (this) { 170 synchronized (unmodifiableKeys) { 171 synchronized (selectedKeys) { 172 doCancel(); 173 boolean isBlocking = (timeout != 0); 174 synchronized (keysLock) { 175 preparePollFds(); 176 } 177 int rc = -1; 178 try { 179 if (isBlocking) { 180 begin(); 181 } 182 try { 183 rc = Libcore.os.poll(pollFds.array(), (int) timeout); 184 } catch (ErrnoException errnoException) { 185 throw errnoException.rethrowAsIOException(); 186 } 187 } finally { 188 if (isBlocking) { 189 end(); 190 } 191 } 192 193 int readyCount = (rc > 0) ? processPollFds() : 0; 194 readyCount -= doCancel(); 195 return readyCount; 196 } 197 } 198 } 199 } 200 201 private void setPollFd(int i, FileDescriptor fd, int events, Object object) { 202 StructPollfd pollFd = pollFds.get(i); 203 pollFd.fd = fd; 204 pollFd.events = (short) events; 205 pollFd.userData = object; 206 } 207 208 private void preparePollFds() { 209 int i = 1; // Our wakeup pipe comes before all the user's fds. 210 for (SelectionKeyImpl key : mutableKeys) { 211 int interestOps = key.interestOpsNoCheck(); 212 short eventMask = 0; 213 if (((OP_ACCEPT | OP_READ) & interestOps) != 0) { 214 eventMask |= POLLIN; 215 } 216 if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) { 217 eventMask |= POLLOUT; 218 } 219 if (eventMask != 0) { 220 setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key); 221 } 222 } 223 } 224 225 private void ensurePollFdsCapacity() { 226 // We need one slot for each element of mutableKeys, plus one for the wakeup pipe. 227 while (pollFds.size() < mutableKeys.size() + 1) { 228 pollFds.add(new StructPollfd()); 229 } 230 } 231 232 /** 233 * Updates the key ready ops and selected key set. 234 */ 235 private int processPollFds() throws IOException { 236 if (pollFds.get(0).revents == POLLIN) { 237 // Read bytes from the wakeup pipe until the pipe is empty. 238 byte[] buffer = new byte[8]; 239 while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) { 240 } 241 } 242 243 int readyKeyCount = 0; 244 for (int i = 1; i < pollFds.size(); ++i) { 245 StructPollfd pollFd = pollFds.get(i); 246 if (pollFd.revents == 0) { 247 continue; 248 } 249 if (pollFd.fd == null) { 250 break; 251 } 252 253 SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData; 254 255 pollFd.fd = null; 256 pollFd.userData = null; 257 258 int ops = key.interestOpsNoCheck(); 259 int selectedOps = 0; 260 if ((pollFd.revents & POLLHUP) != 0 || (pollFd.revents & POLLERR) != 0) { 261 // If there was an error condition, we definitely want to wake listeners, 262 // regardless of what they're waiting for. Failure is always interesting. 263 selectedOps |= ops; 264 } 265 if ((pollFd.revents & POLLIN) != 0) { 266 selectedOps |= ops & (OP_ACCEPT | OP_READ); 267 } 268 if ((pollFd.revents & POLLOUT) != 0) { 269 if (key.isConnected()) { 270 selectedOps |= ops & OP_WRITE; 271 } else { 272 selectedOps |= ops & OP_CONNECT; 273 } 274 } 275 276 if (selectedOps != 0) { 277 boolean wasSelected = mutableSelectedKeys.contains(key); 278 if (wasSelected && key.readyOps() != selectedOps) { 279 key.setReadyOps(key.readyOps() | selectedOps); 280 ++readyKeyCount; 281 } else if (!wasSelected) { 282 key.setReadyOps(selectedOps); 283 mutableSelectedKeys.add(key); 284 ++readyKeyCount; 285 } 286 } 287 } 288 289 return readyKeyCount; 290 } 291 292 @Override public synchronized Set<SelectionKey> selectedKeys() { 293 checkClosed(); 294 return selectedKeys; 295 } 296 297 /** 298 * Removes cancelled keys from the key set and selected key set, and 299 * unregisters the corresponding channels. Returns the number of keys 300 * removed from the selected key set. 301 */ 302 private int doCancel() { 303 int deselected = 0; 304 305 Set<SelectionKey> cancelledKeys = cancelledKeys(); 306 synchronized (cancelledKeys) { 307 if (cancelledKeys.size() > 0) { 308 for (SelectionKey currentKey : cancelledKeys) { 309 mutableKeys.remove(currentKey); 310 deregister((AbstractSelectionKey) currentKey); 311 if (mutableSelectedKeys.remove(currentKey)) { 312 deselected++; 313 } 314 } 315 cancelledKeys.clear(); 316 } 317 } 318 319 return deselected; 320 } 321 322 @Override public Selector wakeup() { 323 try { 324 Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1); 325 } catch (ErrnoException ignored) { 326 } catch (InterruptedIOException ignored) { 327 } 328 return this; 329 } 330 331 private static class UnaddableSet<E> implements Set<E> { 332 333 private final Set<E> set; 334 335 UnaddableSet(Set<E> set) { 336 this.set = set; 337 } 338 339 @Override 340 public boolean equals(Object object) { 341 return set.equals(object); 342 } 343 344 @Override 345 public int hashCode() { 346 return set.hashCode(); 347 } 348 349 public boolean add(E object) { 350 throw new UnsupportedOperationException(); 351 } 352 353 public boolean addAll(Collection<? extends E> c) { 354 throw new UnsupportedOperationException(); 355 } 356 357 public void clear() { 358 set.clear(); 359 } 360 361 public boolean contains(Object object) { 362 return set.contains(object); 363 } 364 365 public boolean containsAll(Collection<?> c) { 366 return set.containsAll(c); 367 } 368 369 public boolean isEmpty() { 370 return set.isEmpty(); 371 } 372 373 public Iterator<E> iterator() { 374 return set.iterator(); 375 } 376 377 public boolean remove(Object object) { 378 return set.remove(object); 379 } 380 381 public boolean removeAll(Collection<?> c) { 382 return set.removeAll(c); 383 } 384 385 public boolean retainAll(Collection<?> c) { 386 return set.retainAll(c); 387 } 388 389 public int size() { 390 return set.size(); 391 } 392 393 public Object[] toArray() { 394 return set.toArray(); 395 } 396 397 public <T> T[] toArray(T[] a) { 398 return set.toArray(a); 399 } 400 } 401} 402