SocketChannelImpl.java revision c6ad01d286af243fd300dd105eb2e4437e0b6b16
1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio; 19 20import java.io.FileDescriptor; 21import java.io.IOException; 22import java.io.InputStream; 23import java.io.OutputStream; 24import java.net.ConnectException; 25import java.net.Inet4Address; 26import java.net.InetAddress; 27import java.net.InetSocketAddress; 28import java.net.PlainSocketImpl; 29import java.net.Socket; 30import java.net.SocketAddress; 31import java.net.SocketException; 32import java.net.SocketUtils; 33import java.net.UnknownHostException; 34import java.nio.channels.AlreadyConnectedException; 35import java.nio.channels.ClosedChannelException; 36import java.nio.channels.ConnectionPendingException; 37import java.nio.channels.IllegalBlockingModeException; 38import java.nio.channels.NoConnectionPendingException; 39import java.nio.channels.NotYetConnectedException; 40import java.nio.channels.SocketChannel; 41import java.nio.channels.UnresolvedAddressException; 42import java.nio.channels.UnsupportedAddressTypeException; 43import java.nio.channels.spi.SelectorProvider; 44import java.util.Arrays; 45import libcore.io.ErrnoException; 46import libcore.io.Libcore; 47import libcore.io.IoBridge; 48import libcore.io.IoUtils; 49import static libcore.io.OsConstants.*; 50 51/* 52 * The default implementation class of java.nio.channels.SocketChannel. 53 */ 54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel { 55 private static final int SOCKET_STATUS_UNINITIALIZED = -1; 56 57 // Status before connect. 58 private static final int SOCKET_STATUS_UNCONNECTED = 0; 59 60 // Status connection pending. 61 private static final int SOCKET_STATUS_PENDING = 1; 62 63 // Status after connection success. 64 private static final int SOCKET_STATUS_CONNECTED = 2; 65 66 // Status closed. 67 private static final int SOCKET_STATUS_CLOSED = 3; 68 69 private final FileDescriptor fd; 70 71 // Our internal Socket. 72 private SocketAdapter socket = null; 73 74 // The address to be connected. 75 private InetSocketAddress connectAddress = null; 76 77 private InetAddress localAddress = null; 78 private int localPort; 79 80 private int status = SOCKET_STATUS_UNINITIALIZED; 81 82 // Whether the socket is bound. 83 private volatile boolean isBound = false; 84 85 private final Object readLock = new Object(); 86 87 private final Object writeLock = new Object(); 88 89 /* 90 * Constructor for creating a connected socket channel. 91 */ 92 public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException { 93 this(selectorProvider, true); 94 } 95 96 /* 97 * Constructor for creating an optionally connected socket channel. 98 */ 99 public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException { 100 super(selectorProvider); 101 status = SOCKET_STATUS_UNCONNECTED; 102 fd = (connect ? IoBridge.socket(true) : new FileDescriptor()); 103 } 104 105 /* 106 * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel. 107 */ 108 public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException { 109 super(selectorProvider); 110 status = SOCKET_STATUS_CONNECTED; 111 fd = existingFd; 112 } 113 114 /* 115 * Getting the internal Socket If we have not the socket, we create a new 116 * one. 117 */ 118 @Override 119 synchronized public Socket socket() { 120 if (socket == null) { 121 try { 122 InetAddress addr = null; 123 int port = 0; 124 if (connectAddress != null) { 125 addr = connectAddress.getAddress(); 126 port = connectAddress.getPort(); 127 } 128 socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this); 129 } catch (SocketException e) { 130 return null; 131 } 132 } 133 return socket; 134 } 135 136 @Override 137 synchronized public boolean isConnected() { 138 return status == SOCKET_STATUS_CONNECTED; 139 } 140 141 /* 142 * Status setting used by other class. 143 */ 144 synchronized void setConnected() { 145 status = SOCKET_STATUS_CONNECTED; 146 } 147 148 void setBound(boolean flag) { 149 isBound = flag; 150 } 151 152 @Override 153 synchronized public boolean isConnectionPending() { 154 return status == SOCKET_STATUS_PENDING; 155 } 156 157 @Override 158 public boolean connect(SocketAddress socketAddress) throws IOException { 159 // status must be open and unconnected 160 checkUnconnected(); 161 162 // check the address 163 InetSocketAddress inetSocketAddress = validateAddress(socketAddress); 164 InetAddress normalAddr = inetSocketAddress.getAddress(); 165 int port = inetSocketAddress.getPort(); 166 167 // When connecting, map ANY address to localhost 168 if (normalAddr.isAnyLocalAddress()) { 169 normalAddr = InetAddress.getLocalHost(); 170 } 171 172 boolean finished = false; 173 try { 174 if (isBlocking()) { 175 begin(); 176 } 177 finished = IoBridge.connect(fd, normalAddr, port); 178 isBound = finished; 179 } catch (IOException e) { 180 if (isEINPROGRESS(e)) { 181 status = SOCKET_STATUS_PENDING; 182 } else { 183 if (isOpen()) { 184 close(); 185 finished = true; 186 } 187 throw e; 188 } 189 } finally { 190 if (isBlocking()) { 191 end(finished); 192 } 193 } 194 195 initLocalAddressAndPort(); 196 connectAddress = inetSocketAddress; 197 if (socket != null) { 198 socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(), 199 connectAddress.getPort()); 200 } 201 202 synchronized (this) { 203 if (isBlocking()) { 204 status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED); 205 } else { 206 status = SOCKET_STATUS_PENDING; 207 } 208 } 209 return finished; 210 } 211 212 private boolean isEINPROGRESS(IOException e) { 213 if (isBlocking()) { 214 return false; 215 } 216 if (e instanceof ConnectException) { 217 Throwable cause = e.getCause(); 218 if (cause instanceof ErrnoException) { 219 return ((ErrnoException) cause).errno == EINPROGRESS; 220 } 221 } 222 return false; 223 } 224 225 private void initLocalAddressAndPort() { 226 SocketAddress sa; 227 try { 228 sa = Libcore.os.getsockname(fd); 229 } catch (ErrnoException errnoException) { 230 throw new AssertionError(errnoException); 231 } 232 InetSocketAddress isa = (InetSocketAddress) sa; 233 localAddress = isa.getAddress(); 234 localPort = isa.getPort(); 235 if (socket != null) { 236 socket.socketImpl().initLocalPort(localPort); 237 } 238 } 239 240 @Override 241 public boolean finishConnect() throws IOException { 242 synchronized (this) { 243 if (!isOpen()) { 244 throw new ClosedChannelException(); 245 } 246 if (status == SOCKET_STATUS_CONNECTED) { 247 return true; 248 } 249 if (status != SOCKET_STATUS_PENDING) { 250 throw new NoConnectionPendingException(); 251 } 252 } 253 254 boolean finished = false; 255 try { 256 begin(); 257 InetAddress inetAddress = connectAddress.getAddress(); 258 int port = connectAddress.getPort(); 259 finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately. 260 isBound = finished; 261 } catch (ConnectException e) { 262 if (isOpen()) { 263 close(); 264 finished = true; 265 } 266 throw e; 267 } finally { 268 end(finished); 269 } 270 271 synchronized (this) { 272 status = (finished ? SOCKET_STATUS_CONNECTED : status); 273 isBound = finished; 274 } 275 return finished; 276 } 277 278 void finishAccept() { 279 initLocalAddressAndPort(); 280 } 281 282 @Override 283 public int read(ByteBuffer dst) throws IOException { 284 dst.checkWritable(); 285 checkOpenConnected(); 286 if (!dst.hasRemaining()) { 287 return 0; 288 } 289 return readImpl(dst); 290 } 291 292 @Override 293 public long read(ByteBuffer[] targets, int offset, int length) throws IOException { 294 Arrays.checkOffsetAndCount(targets.length, offset, length); 295 checkOpenConnected(); 296 int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true); 297 if (totalCount == 0) { 298 return 0; 299 } 300 byte[] readArray = new byte[totalCount]; 301 ByteBuffer readBuffer = ByteBuffer.wrap(readArray); 302 int readCount; 303 // read data to readBuffer, and then transfer data from readBuffer to targets. 304 readCount = readImpl(readBuffer); 305 readBuffer.flip(); 306 if (readCount > 0) { 307 int left = readCount; 308 int index = offset; 309 // transfer data from readArray to targets 310 while (left > 0) { 311 int putLength = Math.min(targets[index].remaining(), left); 312 targets[index].put(readArray, readCount - left, putLength); 313 index++; 314 left -= putLength; 315 } 316 } 317 return readCount; 318 } 319 320 private int readImpl(ByteBuffer dst) throws IOException { 321 synchronized (readLock) { 322 int readCount = 0; 323 try { 324 if (isBlocking()) { 325 begin(); 326 } 327 readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false); 328 if (readCount > 0) { 329 dst.position(dst.position() + readCount); 330 } 331 } finally { 332 if (isBlocking()) { 333 end(readCount > 0); 334 } 335 } 336 return readCount; 337 } 338 } 339 340 @Override 341 public int write(ByteBuffer src) throws IOException { 342 if (src == null) { 343 throw new NullPointerException("src == null"); 344 } 345 checkOpenConnected(); 346 if (!src.hasRemaining()) { 347 return 0; 348 } 349 return writeImpl(src); 350 } 351 352 @Override 353 public long write(ByteBuffer[] sources, int offset, int length) throws IOException { 354 Arrays.checkOffsetAndCount(sources.length, offset, length); 355 checkOpenConnected(); 356 int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false); 357 if (count == 0) { 358 return 0; 359 } 360 ByteBuffer writeBuf = ByteBuffer.allocate(count); 361 for (int val = offset; val < length + offset; val++) { 362 ByteBuffer source = sources[val]; 363 int oldPosition = source.position(); 364 writeBuf.put(source); 365 source.position(oldPosition); 366 } 367 writeBuf.flip(); 368 int result = writeImpl(writeBuf); 369 int val = offset; 370 int written = result; 371 while (result > 0) { 372 ByteBuffer source = sources[val]; 373 int gap = Math.min(result, source.remaining()); 374 source.position(source.position() + gap); 375 val++; 376 result -= gap; 377 } 378 return written; 379 } 380 381 private int writeImpl(ByteBuffer src) throws IOException { 382 synchronized (writeLock) { 383 if (!src.hasRemaining()) { 384 return 0; 385 } 386 int writeCount = 0; 387 try { 388 if (isBlocking()) { 389 begin(); 390 } 391 writeCount = IoBridge.sendto(fd, src, 0, null, 0); 392 if (writeCount > 0) { 393 src.position(src.position() + writeCount); 394 } 395 } finally { 396 if (isBlocking()) { 397 end(writeCount >= 0); 398 } 399 } 400 return writeCount; 401 } 402 } 403 404 /* 405 * Status check, open and "connected", when read and write. 406 */ 407 synchronized private void checkOpenConnected() throws ClosedChannelException { 408 if (!isOpen()) { 409 throw new ClosedChannelException(); 410 } 411 if (!isConnected()) { 412 throw new NotYetConnectedException(); 413 } 414 } 415 416 /* 417 * Status check, open and "unconnected", before connection. 418 */ 419 synchronized private void checkUnconnected() throws IOException { 420 if (!isOpen()) { 421 throw new ClosedChannelException(); 422 } 423 if (status == SOCKET_STATUS_CONNECTED) { 424 throw new AlreadyConnectedException(); 425 } 426 if (status == SOCKET_STATUS_PENDING) { 427 throw new ConnectionPendingException(); 428 } 429 } 430 431 /* 432 * Shared by this class and DatagramChannelImpl, to do the address transfer 433 * and check. 434 */ 435 static InetSocketAddress validateAddress(SocketAddress socketAddress) { 436 if (socketAddress == null) { 437 throw new IllegalArgumentException("socketAddress == null"); 438 } 439 if (!(socketAddress instanceof InetSocketAddress)) { 440 throw new UnsupportedAddressTypeException(); 441 } 442 InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; 443 if (inetSocketAddress.isUnresolved()) { 444 throw new UnresolvedAddressException(); 445 } 446 return inetSocketAddress; 447 } 448 449 /* 450 * Get local address. 451 */ 452 public InetAddress getLocalAddress() throws UnknownHostException { 453 return isBound ? localAddress : Inet4Address.ANY; 454 } 455 456 /* 457 * Do really closing action here. 458 */ 459 @Override 460 protected synchronized void implCloseSelectableChannel() throws IOException { 461 if (status != SOCKET_STATUS_CLOSED) { 462 status = SOCKET_STATUS_CLOSED; 463 if (socket != null && !socket.isClosed()) { 464 socket.close(); 465 } else { 466 IoBridge.closeSocket(fd); 467 } 468 } 469 } 470 471 @Override protected void implConfigureBlocking(boolean blocking) throws IOException { 472 synchronized (blockingLock()) { 473 IoUtils.setBlocking(fd, blocking); 474 } 475 } 476 477 /* 478 * Get the fd. 479 */ 480 public FileDescriptor getFD() { 481 return fd; 482 } 483 484 /* 485 * Adapter classes for internal socket. 486 */ 487 private static class SocketAdapter extends Socket { 488 private final SocketChannelImpl channel; 489 private final PlainSocketImpl socketImpl; 490 491 SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException { 492 super(socketImpl); 493 this.socketImpl = socketImpl; 494 this.channel = channel; 495 SocketUtils.setCreated(this); 496 } 497 498 PlainSocketImpl socketImpl() { 499 return socketImpl; 500 } 501 502 @Override 503 public SocketChannel getChannel() { 504 return channel; 505 } 506 507 @Override 508 public boolean isBound() { 509 return channel.isBound; 510 } 511 512 @Override 513 public boolean isConnected() { 514 return channel.isConnected(); 515 } 516 517 @Override 518 public InetAddress getLocalAddress() { 519 try { 520 return channel.getLocalAddress(); 521 } catch (UnknownHostException e) { 522 return null; 523 } 524 } 525 526 @Override 527 public void connect(SocketAddress remoteAddr, int timeout) throws IOException { 528 if (!channel.isBlocking()) { 529 throw new IllegalBlockingModeException(); 530 } 531 if (isConnected()) { 532 throw new AlreadyConnectedException(); 533 } 534 super.connect(remoteAddr, timeout); 535 channel.initLocalAddressAndPort(); 536 if (super.isConnected()) { 537 channel.setConnected(); 538 channel.isBound = super.isBound(); 539 } 540 } 541 542 @Override 543 public void bind(SocketAddress localAddr) throws IOException { 544 if (channel.isConnected()) { 545 throw new AlreadyConnectedException(); 546 } 547 if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) { 548 throw new ConnectionPendingException(); 549 } 550 super.bind(localAddr); 551 channel.initLocalAddressAndPort(); 552 channel.isBound = true; 553 } 554 555 @Override 556 public void close() throws IOException { 557 synchronized (channel) { 558 if (channel.isOpen()) { 559 channel.close(); 560 } else { 561 super.close(); 562 } 563 channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED; 564 } 565 } 566 567 @Override 568 public OutputStream getOutputStream() throws IOException { 569 checkOpenAndConnected(); 570 if (isOutputShutdown()) { 571 throw new SocketException("Socket output is shutdown"); 572 } 573 return new SocketChannelOutputStream(channel); 574 } 575 576 @Override 577 public InputStream getInputStream() throws IOException { 578 checkOpenAndConnected(); 579 if (isInputShutdown()) { 580 throw new SocketException("Socket input is shutdown"); 581 } 582 return new SocketChannelInputStream(channel); 583 } 584 585 private void checkOpenAndConnected() throws SocketException { 586 if (!channel.isOpen()) { 587 throw new SocketException("Socket is closed"); 588 } 589 if (!channel.isConnected()) { 590 throw new SocketException("Socket is not connected"); 591 } 592 } 593 594 @Override 595 public FileDescriptor getFileDescriptor$() { 596 return socketImpl.getFD$(); 597 } 598 } 599 600 /* 601 * This output stream delegates all operations to the associated channel. 602 * Throws an IllegalBlockingModeException if the channel is in non-blocking 603 * mode when performing write operations. 604 */ 605 private static class SocketChannelOutputStream extends OutputStream { 606 private final SocketChannel channel; 607 608 public SocketChannelOutputStream(SocketChannel channel) { 609 this.channel = channel; 610 } 611 612 /* 613 * Closes this stream and channel. 614 * 615 * @exception IOException thrown if an error occurs during the close 616 */ 617 @Override 618 public void close() throws IOException { 619 channel.close(); 620 } 621 622 @Override 623 public void write(byte[] buffer, int offset, int byteCount) throws IOException { 624 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 625 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 626 if (!channel.isBlocking()) { 627 throw new IllegalBlockingModeException(); 628 } 629 channel.write(buf); 630 } 631 632 @Override 633 public void write(int oneByte) throws IOException { 634 if (!channel.isBlocking()) { 635 throw new IllegalBlockingModeException(); 636 } 637 ByteBuffer buffer = ByteBuffer.allocate(1); 638 buffer.put(0, (byte) (oneByte & 0xFF)); 639 channel.write(buffer); 640 } 641 } 642 643 /* 644 * This input stream delegates all operations to the associated channel. 645 * Throws an IllegalBlockingModeException if the channel is in non-blocking 646 * mode when performing read operations. 647 */ 648 private static class SocketChannelInputStream extends InputStream { 649 private final SocketChannel channel; 650 651 public SocketChannelInputStream(SocketChannel channel) { 652 this.channel = channel; 653 } 654 655 /* 656 * Closes this stream and channel. 657 */ 658 @Override 659 public void close() throws IOException { 660 channel.close(); 661 } 662 663 @Override 664 public int read() throws IOException { 665 if (!channel.isBlocking()) { 666 throw new IllegalBlockingModeException(); 667 } 668 ByteBuffer buf = ByteBuffer.allocate(1); 669 int result = channel.read(buf); 670 return (result == -1) ? result : (buf.get(0) & 0xff); 671 } 672 673 @Override 674 public int read(byte[] buffer, int offset, int byteCount) throws IOException { 675 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 676 if (!channel.isBlocking()) { 677 throw new IllegalBlockingModeException(); 678 } 679 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 680 return channel.read(buf); 681 } 682 } 683} 684