SocketChannelImpl.java revision 47ae0b5a1d96c8030e0963ccc5b44c3ee66aaec3
1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio; 19 20import java.io.FileDescriptor; 21import java.io.FilterInputStream; 22import java.io.FilterOutputStream; 23import java.io.IOException; 24import java.io.InputStream; 25import java.io.OutputStream; 26import java.net.ConnectException; 27import java.net.Inet4Address; 28import java.net.InetAddress; 29import java.net.InetSocketAddress; 30import java.net.PlainSocketImpl; 31import java.net.Socket; 32import java.net.SocketAddress; 33import java.net.SocketException; 34import java.net.SocketUtils; 35import java.nio.channels.AlreadyBoundException; 36import java.nio.channels.AlreadyConnectedException; 37import java.nio.channels.ClosedChannelException; 38import java.nio.channels.ConnectionPendingException; 39import java.nio.channels.IllegalBlockingModeException; 40import java.nio.channels.NoConnectionPendingException; 41import java.nio.channels.NotYetConnectedException; 42import java.nio.channels.SocketChannel; 43import java.nio.channels.UnresolvedAddressException; 44import java.nio.channels.UnsupportedAddressTypeException; 45import java.nio.channels.spi.SelectorProvider; 46import java.util.Arrays; 47import libcore.io.ErrnoException; 48import libcore.io.Libcore; 49import libcore.io.IoBridge; 50import libcore.io.IoUtils; 51import static libcore.io.OsConstants.*; 52 53/* 54 * The default implementation class of java.nio.channels.SocketChannel. 55 */ 56class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel { 57 private static final int SOCKET_STATUS_UNINITIALIZED = -1; 58 59 // Status before connect. 60 private static final int SOCKET_STATUS_UNCONNECTED = 0; 61 62 // Status connection pending. 63 private static final int SOCKET_STATUS_PENDING = 1; 64 65 // Status after connection success. 66 private static final int SOCKET_STATUS_CONNECTED = 2; 67 68 // Status closed. 69 private static final int SOCKET_STATUS_CLOSED = 3; 70 71 private final FileDescriptor fd; 72 73 // Our internal Socket. 74 private SocketAdapter socket = null; 75 76 // The address to be connected. 77 private InetSocketAddress connectAddress = null; 78 79 // The local address the socket is bound to. 80 private InetAddress localAddress = null; 81 private int localPort; 82 83 private int status = SOCKET_STATUS_UNINITIALIZED; 84 85 // Whether the socket is bound. 86 private volatile boolean isBound = false; 87 88 private final Object readLock = new Object(); 89 90 private final Object writeLock = new Object(); 91 92 /* 93 * Constructor for creating a connected socket channel. 94 */ 95 public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException { 96 this(selectorProvider, true); 97 } 98 99 /* 100 * Constructor for creating an optionally connected socket channel. 101 */ 102 public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException { 103 super(selectorProvider); 104 status = SOCKET_STATUS_UNCONNECTED; 105 fd = (connect ? IoBridge.socket(true) : new FileDescriptor()); 106 } 107 108 /* 109 * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel. 110 */ 111 public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException { 112 super(selectorProvider); 113 status = SOCKET_STATUS_CONNECTED; 114 fd = existingFd; 115 } 116 117 /* 118 * Getting the internal Socket If we have not the socket, we create a new 119 * one. 120 */ 121 @Override 122 synchronized public Socket socket() { 123 if (socket == null) { 124 try { 125 InetAddress addr = null; 126 int port = 0; 127 if (connectAddress != null) { 128 addr = connectAddress.getAddress(); 129 port = connectAddress.getPort(); 130 } 131 socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this); 132 } catch (SocketException e) { 133 return null; 134 } 135 } 136 return socket; 137 } 138 139 /** @hide Until ready for a public API change */ 140 @Override 141 synchronized public final SocketChannel bind(SocketAddress local) throws IOException { 142 if (!isOpen()) { 143 throw new ClosedChannelException(); 144 } 145 if (isBound) { 146 throw new AlreadyBoundException(); 147 } 148 149 if (local == null) { 150 local = new InetSocketAddress(Inet4Address.ANY, 0); 151 } else if (!(local instanceof InetSocketAddress)) { 152 throw new UnsupportedAddressTypeException(); 153 } 154 155 InetSocketAddress localAddress = (InetSocketAddress) local; 156 IoBridge.bind(fd, localAddress.getAddress(), localAddress.getPort()); 157 onBind(true /* updateSocketState */); 158 return this; 159 } 160 161 /** 162 * Initialise the isBound, localAddress and localPort state from the file descriptor. Used when 163 * some or all of the bound state has been left to the OS to decide, or when the Socket handled 164 * bind() or connect(). 165 * 166 * @param updateSocketState 167 * if the associated socket (if present) needs to be updated 168 * @hide package visible for other nio classes 169 */ 170 void onBind(boolean updateSocketState) { 171 SocketAddress sa; 172 try { 173 sa = Libcore.os.getsockname(fd); 174 } catch (ErrnoException errnoException) { 175 throw new AssertionError(errnoException); 176 } 177 isBound = true; 178 InetSocketAddress localSocketAddress = (InetSocketAddress) sa; 179 localAddress = localSocketAddress.getAddress(); 180 localPort = localSocketAddress.getPort(); 181 if (updateSocketState && socket != null) { 182 socket.onBind(localAddress, localPort); 183 } 184 } 185 186 /** @hide Until ready for a public API change */ 187 @Override 188 synchronized public SocketAddress getLocalAddress() throws IOException { 189 if (!isOpen()) { 190 throw new ClosedChannelException(); 191 } 192 return isBound ? new InetSocketAddress(localAddress, localPort) : null; 193 } 194 195 @Override 196 synchronized public boolean isConnected() { 197 return status == SOCKET_STATUS_CONNECTED; 198 } 199 200 @Override 201 synchronized public boolean isConnectionPending() { 202 return status == SOCKET_STATUS_PENDING; 203 } 204 205 @Override 206 public boolean connect(SocketAddress socketAddress) throws IOException { 207 // status must be open and unconnected 208 checkUnconnected(); 209 210 // check the address 211 InetSocketAddress inetSocketAddress = validateAddress(socketAddress); 212 InetAddress normalAddr = inetSocketAddress.getAddress(); 213 int port = inetSocketAddress.getPort(); 214 215 // When connecting, map ANY address to localhost 216 if (normalAddr.isAnyLocalAddress()) { 217 normalAddr = InetAddress.getLocalHost(); 218 } 219 220 boolean isBlocking = isBlocking(); 221 boolean finished = false; 222 int newStatus; 223 try { 224 if (isBlocking) { 225 begin(); 226 } 227 // When in blocking mode, IoBridge.connect() will return without an exception when the 228 // socket is connected. When in non-blocking mode it will return without an exception 229 // without knowing the result of the connection attempt, which could still be going on. 230 IoBridge.connect(fd, normalAddr, port); 231 newStatus = isBlocking ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_PENDING; 232 finished = true; 233 } catch (IOException e) { 234 if (isEINPROGRESS(e)) { 235 newStatus = SOCKET_STATUS_PENDING; 236 } else { 237 if (isOpen()) { 238 close(); 239 finished = true; 240 } 241 throw e; 242 } 243 } finally { 244 if (isBlocking) { 245 end(finished); 246 } 247 } 248 249 // If the channel was not bound, a connection attempt will have caused an implicit bind() to 250 // take place. Keep the local address state held by the channel and the socket up to date. 251 if (!isBound) { 252 onBind(true /* updateSocketState */); 253 } 254 255 // Keep the connected state held by the channel and the socket up to date. 256 onConnectStatusChanged(inetSocketAddress, newStatus, true /* updateSocketState */); 257 258 return status == SOCKET_STATUS_CONNECTED; 259 } 260 261 /** 262 * Initialise the connect() state with the supplied information. 263 * 264 * @param updateSocketState 265 * if the associated socket (if present) needs to be updated 266 * @hide package visible for other nio classes 267 */ 268 void onConnectStatusChanged(InetSocketAddress address, int status, boolean updateSocketState) { 269 this.status = status; 270 connectAddress = address; 271 if (status == SOCKET_STATUS_CONNECTED && updateSocketState && socket != null) { 272 socket.onConnect(connectAddress.getAddress(), connectAddress.getPort()); 273 } 274 } 275 276 private boolean isEINPROGRESS(IOException e) { 277 if (isBlocking()) { 278 return false; 279 } 280 if (e instanceof ConnectException) { 281 Throwable cause = e.getCause(); 282 if (cause instanceof ErrnoException) { 283 return ((ErrnoException) cause).errno == EINPROGRESS; 284 } 285 } 286 return false; 287 } 288 289 @Override 290 public boolean finishConnect() throws IOException { 291 synchronized (this) { 292 if (!isOpen()) { 293 throw new ClosedChannelException(); 294 } 295 if (status == SOCKET_STATUS_CONNECTED) { 296 return true; 297 } 298 if (status != SOCKET_STATUS_PENDING) { 299 throw new NoConnectionPendingException(); 300 } 301 } 302 303 boolean finished = false; 304 try { 305 begin(); 306 InetAddress inetAddress = connectAddress.getAddress(); 307 int port = connectAddress.getPort(); 308 finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately. 309 } catch (ConnectException e) { 310 if (isOpen()) { 311 close(); 312 finished = true; 313 } 314 throw e; 315 } finally { 316 end(finished); 317 } 318 319 synchronized (this) { 320 status = (finished ? SOCKET_STATUS_CONNECTED : status); 321 if (finished && socket != null) { 322 socket.onConnect(connectAddress.getAddress(), connectAddress.getPort()); 323 } 324 } 325 return finished; 326 } 327 328 @Override 329 public int read(ByteBuffer dst) throws IOException { 330 dst.checkWritable(); 331 checkOpenConnected(); 332 if (!dst.hasRemaining()) { 333 return 0; 334 } 335 return readImpl(dst); 336 } 337 338 @Override 339 public long read(ByteBuffer[] targets, int offset, int length) throws IOException { 340 Arrays.checkOffsetAndCount(targets.length, offset, length); 341 checkOpenConnected(); 342 int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true); 343 if (totalCount == 0) { 344 return 0; 345 } 346 byte[] readArray = new byte[totalCount]; 347 ByteBuffer readBuffer = ByteBuffer.wrap(readArray); 348 int readCount; 349 // read data to readBuffer, and then transfer data from readBuffer to targets. 350 readCount = readImpl(readBuffer); 351 readBuffer.flip(); 352 if (readCount > 0) { 353 int left = readCount; 354 int index = offset; 355 // transfer data from readArray to targets 356 while (left > 0) { 357 int putLength = Math.min(targets[index].remaining(), left); 358 targets[index].put(readArray, readCount - left, putLength); 359 index++; 360 left -= putLength; 361 } 362 } 363 return readCount; 364 } 365 366 private int readImpl(ByteBuffer dst) throws IOException { 367 synchronized (readLock) { 368 int readCount = 0; 369 try { 370 if (isBlocking()) { 371 begin(); 372 } 373 readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false); 374 if (readCount > 0) { 375 dst.position(dst.position() + readCount); 376 } 377 } finally { 378 if (isBlocking()) { 379 end(readCount > 0); 380 } 381 } 382 return readCount; 383 } 384 } 385 386 @Override 387 public int write(ByteBuffer src) throws IOException { 388 if (src == null) { 389 throw new NullPointerException("src == null"); 390 } 391 checkOpenConnected(); 392 if (!src.hasRemaining()) { 393 return 0; 394 } 395 return writeImpl(src); 396 } 397 398 @Override 399 public long write(ByteBuffer[] sources, int offset, int length) throws IOException { 400 Arrays.checkOffsetAndCount(sources.length, offset, length); 401 checkOpenConnected(); 402 int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false); 403 if (count == 0) { 404 return 0; 405 } 406 ByteBuffer writeBuf = ByteBuffer.allocate(count); 407 for (int val = offset; val < length + offset; val++) { 408 ByteBuffer source = sources[val]; 409 int oldPosition = source.position(); 410 writeBuf.put(source); 411 source.position(oldPosition); 412 } 413 writeBuf.flip(); 414 int result = writeImpl(writeBuf); 415 int val = offset; 416 int written = result; 417 while (result > 0) { 418 ByteBuffer source = sources[val]; 419 int gap = Math.min(result, source.remaining()); 420 source.position(source.position() + gap); 421 val++; 422 result -= gap; 423 } 424 return written; 425 } 426 427 private int writeImpl(ByteBuffer src) throws IOException { 428 synchronized (writeLock) { 429 if (!src.hasRemaining()) { 430 return 0; 431 } 432 int writeCount = 0; 433 try { 434 if (isBlocking()) { 435 begin(); 436 } 437 writeCount = IoBridge.sendto(fd, src, 0, null, 0); 438 if (writeCount > 0) { 439 src.position(src.position() + writeCount); 440 } 441 } finally { 442 if (isBlocking()) { 443 end(writeCount >= 0); 444 } 445 } 446 return writeCount; 447 } 448 } 449 450 /* 451 * Status check, open and "connected", when read and write. 452 */ 453 synchronized private void checkOpenConnected() throws ClosedChannelException { 454 if (!isOpen()) { 455 throw new ClosedChannelException(); 456 } 457 if (!isConnected()) { 458 throw new NotYetConnectedException(); 459 } 460 } 461 462 /* 463 * Status check, open and "unconnected", before connection. 464 */ 465 synchronized private void checkUnconnected() throws IOException { 466 if (!isOpen()) { 467 throw new ClosedChannelException(); 468 } 469 if (status == SOCKET_STATUS_CONNECTED) { 470 throw new AlreadyConnectedException(); 471 } 472 if (status == SOCKET_STATUS_PENDING) { 473 throw new ConnectionPendingException(); 474 } 475 } 476 477 /* 478 * Shared by this class and DatagramChannelImpl, to do the address transfer 479 * and check. 480 */ 481 static InetSocketAddress validateAddress(SocketAddress socketAddress) { 482 if (socketAddress == null) { 483 throw new IllegalArgumentException("socketAddress == null"); 484 } 485 if (!(socketAddress instanceof InetSocketAddress)) { 486 throw new UnsupportedAddressTypeException(); 487 } 488 InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; 489 if (inetSocketAddress.isUnresolved()) { 490 throw new UnresolvedAddressException(); 491 } 492 return inetSocketAddress; 493 } 494 495 /* 496 * Do really closing action here. 497 */ 498 @Override 499 protected synchronized void implCloseSelectableChannel() throws IOException { 500 if (status != SOCKET_STATUS_CLOSED) { 501 status = SOCKET_STATUS_CLOSED; 502 // IoBridge.closeSocket(fd) is idempotent: It is safe to call on an already-closed file 503 // descriptor. 504 IoBridge.closeSocket(fd); 505 if (socket != null && !socket.isClosed()) { 506 socket.onClose(); 507 } 508 } 509 } 510 511 @Override protected void implConfigureBlocking(boolean blocking) throws IOException { 512 IoUtils.setBlocking(fd, blocking); 513 } 514 515 /* 516 * Get the fd. 517 */ 518 public FileDescriptor getFD() { 519 return fd; 520 } 521 522 /* @hide used by ServerSocketChannelImpl to sync channel state during accept() */ 523 public void onAccept(InetSocketAddress remoteAddress, boolean updateSocketState) { 524 onBind(updateSocketState); 525 onConnectStatusChanged(remoteAddress, SOCKET_STATUS_CONNECTED, updateSocketState); 526 } 527 528 /* 529 * Adapter classes for internal socket. 530 */ 531 private static class SocketAdapter extends Socket { 532 private final SocketChannelImpl channel; 533 private final PlainSocketImpl socketImpl; 534 535 SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) 536 throws SocketException { 537 super(socketImpl); 538 this.socketImpl = socketImpl; 539 this.channel = channel; 540 SocketUtils.setCreated(this); 541 542 // Sync state socket state with the channel it is being created from 543 if (channel.isBound) { 544 onBind(channel.localAddress, channel.localPort); 545 } 546 if (channel.isConnected()) { 547 onConnect(channel.connectAddress.getAddress(), channel.connectAddress.getPort()); 548 } 549 if (!channel.isOpen()) { 550 onClose(); 551 } 552 553 } 554 555 @Override 556 public SocketChannel getChannel() { 557 return channel; 558 } 559 560 @Override 561 public void connect(SocketAddress remoteAddr, int timeout) throws IOException { 562 if (!channel.isBlocking()) { 563 throw new IllegalBlockingModeException(); 564 } 565 if (isConnected()) { 566 throw new AlreadyConnectedException(); 567 } 568 super.connect(remoteAddr, timeout); 569 channel.onBind(false); 570 if (super.isConnected()) { 571 InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddr; 572 channel.onConnectStatusChanged( 573 remoteInetAddress, SOCKET_STATUS_CONNECTED, false /* updateSocketState */); 574 } 575 } 576 577 @Override 578 public void bind(SocketAddress localAddr) throws IOException { 579 if (channel.isConnected()) { 580 throw new AlreadyConnectedException(); 581 } 582 if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) { 583 throw new ConnectionPendingException(); 584 } 585 super.bind(localAddr); 586 channel.onBind(false); 587 } 588 589 @Override 590 public void close() throws IOException { 591 synchronized (channel) { 592 super.close(); 593 if (channel.isOpen()) { 594 // channel.close() recognizes the socket is closed and avoids recursion. There 595 // is no channel.onClose() because the "closed" field is private. 596 channel.close(); 597 } 598 } 599 } 600 601 @Override 602 public OutputStream getOutputStream() throws IOException { 603 return new BlockingCheckOutputStream(super.getOutputStream(), channel); 604 } 605 606 @Override 607 public InputStream getInputStream() throws IOException { 608 return new BlockingCheckInputStream(super.getInputStream(), channel); 609 } 610 611 @Override 612 public FileDescriptor getFileDescriptor$() { 613 return socketImpl.getFD$(); 614 } 615 } 616 617 /* 618 * Throws an IllegalBlockingModeException if the channel is in non-blocking 619 * mode when performing write operations. 620 */ 621 private static class BlockingCheckOutputStream extends FilterOutputStream { 622 private final SocketChannel channel; 623 624 public BlockingCheckOutputStream(OutputStream out, SocketChannel channel) { 625 super(out); 626 this.channel = channel; 627 } 628 629 @Override 630 public void write(byte[] buffer, int offset, int byteCount) throws IOException { 631 checkBlocking(); 632 out.write(buffer, offset, byteCount); 633 } 634 635 @Override 636 public void write(int oneByte) throws IOException { 637 checkBlocking(); 638 out.write(oneByte); 639 } 640 641 @Override 642 public void write(byte[] buffer) throws IOException { 643 checkBlocking(); 644 out.write(buffer); 645 } 646 647 @Override 648 public void close() throws IOException { 649 super.close(); 650 // channel.close() recognizes the socket is closed and avoids recursion. There is no 651 // channel.onClose() because the "closed" field is private. 652 channel.close(); 653 } 654 655 private void checkBlocking() { 656 if (!channel.isBlocking()) { 657 throw new IllegalBlockingModeException(); 658 } 659 } 660 } 661 662 /* 663 * Throws an IllegalBlockingModeException if the channel is in non-blocking 664 * mode when performing read operations. 665 */ 666 private static class BlockingCheckInputStream extends FilterInputStream { 667 private final SocketChannel channel; 668 669 public BlockingCheckInputStream(InputStream in, SocketChannel channel) { 670 super(in); 671 this.channel = channel; 672 } 673 674 @Override 675 public int read() throws IOException { 676 checkBlocking(); 677 return in.read(); 678 } 679 680 @Override 681 public int read(byte[] buffer, int byteOffset, int byteCount) throws IOException { 682 checkBlocking(); 683 return in.read(buffer, byteOffset, byteCount); 684 } 685 686 @Override 687 public int read(byte[] buffer) throws IOException { 688 checkBlocking(); 689 return in.read(buffer); 690 } 691 692 @Override 693 public void close() throws IOException { 694 super.close(); 695 // channel.close() recognizes the socket is closed and avoids recursion. There is no 696 // channel.onClose() because the "closed" field is private. 697 channel.close(); 698 } 699 700 private void checkBlocking() { 701 if (!channel.isBlocking()) { 702 throw new IllegalBlockingModeException(); 703 } 704 } 705 } 706} 707