SocketChannelImpl.java revision 3218082325b6b8713a8ac15731482e3da86a7df9
1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio; 19 20import java.io.FileDescriptor; 21import java.io.IOException; 22import java.io.InputStream; 23import java.io.OutputStream; 24import java.net.ConnectException; 25import java.net.Inet4Address; 26import java.net.InetAddress; 27import java.net.InetSocketAddress; 28import java.net.PlainSocketImpl; 29import java.net.Socket; 30import java.net.SocketAddress; 31import java.net.SocketException; 32import java.net.SocketUtils; 33import java.net.UnknownHostException; 34import java.nio.channels.AlreadyConnectedException; 35import java.nio.channels.ClosedChannelException; 36import java.nio.channels.ConnectionPendingException; 37import java.nio.channels.IllegalBlockingModeException; 38import java.nio.channels.NoConnectionPendingException; 39import java.nio.channels.NotYetConnectedException; 40import java.nio.channels.SocketChannel; 41import java.nio.channels.UnresolvedAddressException; 42import java.nio.channels.UnsupportedAddressTypeException; 43import java.nio.channels.spi.SelectorProvider; 44import java.util.Arrays; 45import libcore.io.ErrnoException; 46import libcore.io.Libcore; 47import libcore.io.IoBridge; 48import libcore.io.IoUtils; 49import static libcore.io.OsConstants.*; 50 51/* 52 * The default implementation class of java.nio.channels.SocketChannel. 53 */ 54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel { 55 private static final int SOCKET_STATUS_UNINITIALIZED = -1; 56 57 // Status before connect. 58 private static final int SOCKET_STATUS_UNCONNECTED = 0; 59 60 // Status connection pending. 61 private static final int SOCKET_STATUS_PENDING = 1; 62 63 // Status after connection success. 64 private static final int SOCKET_STATUS_CONNECTED = 2; 65 66 // Status closed. 67 private static final int SOCKET_STATUS_CLOSED = 3; 68 69 private final FileDescriptor fd; 70 71 // Our internal Socket. 72 private SocketAdapter socket = null; 73 74 // The address to be connected. 75 private InetSocketAddress connectAddress = null; 76 77 private InetAddress localAddress = null; 78 private int localPort; 79 80 private int status = SOCKET_STATUS_UNINITIALIZED; 81 82 // Whether the socket is bound. 83 private volatile boolean isBound = false; 84 85 private final Object readLock = new Object(); 86 87 private final Object writeLock = new Object(); 88 89 /* 90 * Constructor for creating a connected socket channel. 91 */ 92 public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException { 93 this(selectorProvider, true); 94 } 95 96 /* 97 * Constructor for creating an optionally connected socket channel. 98 */ 99 public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException { 100 super(selectorProvider); 101 status = SOCKET_STATUS_UNCONNECTED; 102 fd = (connect ? IoBridge.socket(true) : new FileDescriptor()); 103 } 104 105 /* 106 * Constructor for use by Pipe.SinkChannel and Pipe.SourceChannel. 107 */ 108 public SocketChannelImpl(SelectorProvider selectorProvider, FileDescriptor existingFd) throws IOException { 109 super(selectorProvider); 110 status = SOCKET_STATUS_CONNECTED; 111 fd = existingFd; 112 } 113 114 /* 115 * Getting the internal Socket If we have not the socket, we create a new 116 * one. 117 */ 118 @Override 119 synchronized public Socket socket() { 120 if (socket == null) { 121 try { 122 InetAddress addr = null; 123 int port = 0; 124 if (connectAddress != null) { 125 addr = connectAddress.getAddress(); 126 port = connectAddress.getPort(); 127 } 128 socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this); 129 } catch (SocketException e) { 130 return null; 131 } 132 } 133 return socket; 134 } 135 136 @Override 137 synchronized public boolean isConnected() { 138 return status == SOCKET_STATUS_CONNECTED; 139 } 140 141 /* 142 * Status setting used by other class. 143 */ 144 synchronized void setConnected() { 145 status = SOCKET_STATUS_CONNECTED; 146 } 147 148 void setBound(boolean flag) { 149 isBound = flag; 150 } 151 152 @Override 153 synchronized public boolean isConnectionPending() { 154 return status == SOCKET_STATUS_PENDING; 155 } 156 157 @Override 158 public boolean connect(SocketAddress socketAddress) throws IOException { 159 // status must be open and unconnected 160 checkUnconnected(); 161 162 // check the address 163 InetSocketAddress inetSocketAddress = validateAddress(socketAddress); 164 InetAddress normalAddr = inetSocketAddress.getAddress(); 165 int port = inetSocketAddress.getPort(); 166 167 // When connecting, map ANY address to localhost 168 if (normalAddr.isAnyLocalAddress()) { 169 normalAddr = InetAddress.getLocalHost(); 170 } 171 172 boolean finished = false; 173 try { 174 if (isBlocking()) { 175 begin(); 176 } 177 finished = IoBridge.connect(fd, normalAddr, port); 178 isBound = finished; 179 } catch (IOException e) { 180 if (e instanceof ConnectException && !isBlocking()) { 181 status = SOCKET_STATUS_PENDING; 182 } else { 183 if (isOpen()) { 184 close(); 185 finished = true; 186 } 187 throw e; 188 } 189 } finally { 190 if (isBlocking()) { 191 end(finished); 192 } 193 } 194 195 initLocalAddressAndPort(); 196 connectAddress = inetSocketAddress; 197 if (socket != null) { 198 socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(), 199 connectAddress.getPort()); 200 } 201 202 synchronized (this) { 203 if (isBlocking()) { 204 status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED); 205 } else { 206 status = SOCKET_STATUS_PENDING; 207 } 208 } 209 return finished; 210 } 211 212 private void initLocalAddressAndPort() { 213 SocketAddress sa; 214 try { 215 sa = Libcore.os.getsockname(fd); 216 } catch (ErrnoException errnoException) { 217 throw new AssertionError(errnoException); 218 } 219 InetSocketAddress isa = (InetSocketAddress) sa; 220 localAddress = isa.getAddress(); 221 localPort = isa.getPort(); 222 if (socket != null) { 223 socket.socketImpl().initLocalPort(localPort); 224 } 225 } 226 227 @Override 228 public boolean finishConnect() throws IOException { 229 synchronized (this) { 230 if (!isOpen()) { 231 throw new ClosedChannelException(); 232 } 233 if (status == SOCKET_STATUS_CONNECTED) { 234 return true; 235 } 236 if (status != SOCKET_STATUS_PENDING) { 237 throw new NoConnectionPendingException(); 238 } 239 } 240 241 boolean finished = false; 242 try { 243 begin(); 244 InetAddress inetAddress = connectAddress.getAddress(); 245 int port = connectAddress.getPort(); 246 finished = IoBridge.isConnected(fd, inetAddress, port, 0, 0); // Return immediately. 247 isBound = finished; 248 } catch (ConnectException e) { 249 if (isOpen()) { 250 close(); 251 finished = true; 252 } 253 throw e; 254 } finally { 255 end(finished); 256 } 257 258 synchronized (this) { 259 status = (finished ? SOCKET_STATUS_CONNECTED : status); 260 isBound = finished; 261 } 262 return finished; 263 } 264 265 void finishAccept() { 266 initLocalAddressAndPort(); 267 } 268 269 @Override 270 public int read(ByteBuffer dst) throws IOException { 271 dst.checkWritable(); 272 checkOpenConnected(); 273 if (!dst.hasRemaining()) { 274 return 0; 275 } 276 return readImpl(dst); 277 } 278 279 @Override 280 public long read(ByteBuffer[] targets, int offset, int length) throws IOException { 281 Arrays.checkOffsetAndCount(targets.length, offset, length); 282 checkOpenConnected(); 283 int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true); 284 if (totalCount == 0) { 285 return 0; 286 } 287 byte[] readArray = new byte[totalCount]; 288 ByteBuffer readBuffer = ByteBuffer.wrap(readArray); 289 int readCount; 290 // read data to readBuffer, and then transfer data from readBuffer to targets. 291 readCount = readImpl(readBuffer); 292 readBuffer.flip(); 293 if (readCount > 0) { 294 int left = readCount; 295 int index = offset; 296 // transfer data from readArray to targets 297 while (left > 0) { 298 int putLength = Math.min(targets[index].remaining(), left); 299 targets[index].put(readArray, readCount - left, putLength); 300 index++; 301 left -= putLength; 302 } 303 } 304 return readCount; 305 } 306 307 private int readImpl(ByteBuffer dst) throws IOException { 308 synchronized (readLock) { 309 int readCount = 0; 310 try { 311 if (isBlocking()) { 312 begin(); 313 } 314 readCount = IoBridge.recvfrom(true, fd, dst, 0, null, false); 315 if (readCount > 0) { 316 dst.position(dst.position() + readCount); 317 } 318 } finally { 319 if (isBlocking()) { 320 end(readCount > 0); 321 } 322 } 323 return readCount; 324 } 325 } 326 327 @Override 328 public int write(ByteBuffer src) throws IOException { 329 if (src == null) { 330 throw new NullPointerException("src == null"); 331 } 332 checkOpenConnected(); 333 if (!src.hasRemaining()) { 334 return 0; 335 } 336 return writeImpl(src); 337 } 338 339 @Override 340 public long write(ByteBuffer[] sources, int offset, int length) throws IOException { 341 Arrays.checkOffsetAndCount(sources.length, offset, length); 342 checkOpenConnected(); 343 int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false); 344 if (count == 0) { 345 return 0; 346 } 347 ByteBuffer writeBuf = ByteBuffer.allocate(count); 348 for (int val = offset; val < length + offset; val++) { 349 ByteBuffer source = sources[val]; 350 int oldPosition = source.position(); 351 writeBuf.put(source); 352 source.position(oldPosition); 353 } 354 writeBuf.flip(); 355 int result = writeImpl(writeBuf); 356 int val = offset; 357 int written = result; 358 while (result > 0) { 359 ByteBuffer source = sources[val]; 360 int gap = Math.min(result, source.remaining()); 361 source.position(source.position() + gap); 362 val++; 363 result -= gap; 364 } 365 return written; 366 } 367 368 private int writeImpl(ByteBuffer src) throws IOException { 369 synchronized (writeLock) { 370 if (!src.hasRemaining()) { 371 return 0; 372 } 373 int writeCount = 0; 374 try { 375 if (isBlocking()) { 376 begin(); 377 } 378 writeCount = IoBridge.sendto(fd, src, 0, null, 0); 379 if (writeCount > 0) { 380 src.position(src.position() + writeCount); 381 } 382 } finally { 383 if (isBlocking()) { 384 end(writeCount >= 0); 385 } 386 } 387 return writeCount; 388 } 389 } 390 391 /* 392 * Status check, open and "connected", when read and write. 393 */ 394 synchronized private void checkOpenConnected() throws ClosedChannelException { 395 if (!isOpen()) { 396 throw new ClosedChannelException(); 397 } 398 if (!isConnected()) { 399 throw new NotYetConnectedException(); 400 } 401 } 402 403 /* 404 * Status check, open and "unconnected", before connection. 405 */ 406 synchronized private void checkUnconnected() throws IOException { 407 if (!isOpen()) { 408 throw new ClosedChannelException(); 409 } 410 if (status == SOCKET_STATUS_CONNECTED) { 411 throw new AlreadyConnectedException(); 412 } 413 if (status == SOCKET_STATUS_PENDING) { 414 throw new ConnectionPendingException(); 415 } 416 } 417 418 /* 419 * Shared by this class and DatagramChannelImpl, to do the address transfer 420 * and check. 421 */ 422 static InetSocketAddress validateAddress(SocketAddress socketAddress) { 423 if (socketAddress == null) { 424 throw new IllegalArgumentException("socketAddress == null"); 425 } 426 if (!(socketAddress instanceof InetSocketAddress)) { 427 throw new UnsupportedAddressTypeException(); 428 } 429 InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; 430 if (inetSocketAddress.isUnresolved()) { 431 throw new UnresolvedAddressException(); 432 } 433 return inetSocketAddress; 434 } 435 436 /* 437 * Get local address. 438 */ 439 public InetAddress getLocalAddress() throws UnknownHostException { 440 return isBound ? localAddress : Inet4Address.ANY; 441 } 442 443 /* 444 * Do really closing action here. 445 */ 446 @Override 447 protected synchronized void implCloseSelectableChannel() throws IOException { 448 if (status != SOCKET_STATUS_CLOSED) { 449 status = SOCKET_STATUS_CLOSED; 450 if (socket != null && !socket.isClosed()) { 451 socket.close(); 452 } else { 453 IoBridge.closeSocket(fd); 454 } 455 } 456 } 457 458 @Override protected void implConfigureBlocking(boolean blocking) throws IOException { 459 synchronized (blockingLock()) { 460 IoUtils.setBlocking(fd, blocking); 461 } 462 } 463 464 /* 465 * Get the fd. 466 */ 467 public FileDescriptor getFD() { 468 return fd; 469 } 470 471 /* 472 * Adapter classes for internal socket. 473 */ 474 private static class SocketAdapter extends Socket { 475 private final SocketChannelImpl channel; 476 private final PlainSocketImpl socketImpl; 477 478 SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException { 479 super(socketImpl); 480 this.socketImpl = socketImpl; 481 this.channel = channel; 482 SocketUtils.setCreated(this); 483 } 484 485 PlainSocketImpl socketImpl() { 486 return socketImpl; 487 } 488 489 @Override 490 public SocketChannel getChannel() { 491 return channel; 492 } 493 494 @Override 495 public boolean isBound() { 496 return channel.isBound; 497 } 498 499 @Override 500 public boolean isConnected() { 501 return channel.isConnected(); 502 } 503 504 @Override 505 public InetAddress getLocalAddress() { 506 try { 507 return channel.getLocalAddress(); 508 } catch (UnknownHostException e) { 509 return null; 510 } 511 } 512 513 @Override 514 public void connect(SocketAddress remoteAddr, int timeout) throws IOException { 515 if (!channel.isBlocking()) { 516 throw new IllegalBlockingModeException(); 517 } 518 if (isConnected()) { 519 throw new AlreadyConnectedException(); 520 } 521 super.connect(remoteAddr, timeout); 522 channel.initLocalAddressAndPort(); 523 if (super.isConnected()) { 524 channel.setConnected(); 525 channel.isBound = super.isBound(); 526 } 527 } 528 529 @Override 530 public void bind(SocketAddress localAddr) throws IOException { 531 if (channel.isConnected()) { 532 throw new AlreadyConnectedException(); 533 } 534 if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) { 535 throw new ConnectionPendingException(); 536 } 537 super.bind(localAddr); 538 channel.initLocalAddressAndPort(); 539 channel.isBound = true; 540 } 541 542 @Override 543 public void close() throws IOException { 544 synchronized (channel) { 545 if (channel.isOpen()) { 546 channel.close(); 547 } else { 548 super.close(); 549 } 550 channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED; 551 } 552 } 553 554 @Override 555 public OutputStream getOutputStream() throws IOException { 556 checkOpenAndConnected(); 557 if (isOutputShutdown()) { 558 throw new SocketException("Socket output is shutdown"); 559 } 560 return new SocketChannelOutputStream(channel); 561 } 562 563 @Override 564 public InputStream getInputStream() throws IOException { 565 checkOpenAndConnected(); 566 if (isInputShutdown()) { 567 throw new SocketException("Socket input is shutdown"); 568 } 569 return new SocketChannelInputStream(channel); 570 } 571 572 private void checkOpenAndConnected() throws SocketException { 573 if (!channel.isOpen()) { 574 throw new SocketException("Socket is closed"); 575 } 576 if (!channel.isConnected()) { 577 throw new SocketException("Socket is not connected"); 578 } 579 } 580 581 @Override 582 public FileDescriptor getFileDescriptor$() { 583 return socketImpl.getFD$(); 584 } 585 } 586 587 /* 588 * This output stream delegates all operations to the associated channel. 589 * Throws an IllegalBlockingModeException if the channel is in non-blocking 590 * mode when performing write operations. 591 */ 592 private static class SocketChannelOutputStream extends OutputStream { 593 private final SocketChannel channel; 594 595 public SocketChannelOutputStream(SocketChannel channel) { 596 this.channel = channel; 597 } 598 599 /* 600 * Closes this stream and channel. 601 * 602 * @exception IOException thrown if an error occurs during the close 603 */ 604 @Override 605 public void close() throws IOException { 606 channel.close(); 607 } 608 609 @Override 610 public void write(byte[] buffer, int offset, int byteCount) throws IOException { 611 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 612 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 613 if (!channel.isBlocking()) { 614 throw new IllegalBlockingModeException(); 615 } 616 channel.write(buf); 617 } 618 619 @Override 620 public void write(int oneByte) throws IOException { 621 if (!channel.isBlocking()) { 622 throw new IllegalBlockingModeException(); 623 } 624 ByteBuffer buffer = ByteBuffer.allocate(1); 625 buffer.put(0, (byte) (oneByte & 0xFF)); 626 channel.write(buffer); 627 } 628 } 629 630 /* 631 * This input stream delegates all operations to the associated channel. 632 * Throws an IllegalBlockingModeException if the channel is in non-blocking 633 * mode when performing read operations. 634 */ 635 private static class SocketChannelInputStream extends InputStream { 636 private final SocketChannel channel; 637 638 public SocketChannelInputStream(SocketChannel channel) { 639 this.channel = channel; 640 } 641 642 /* 643 * Closes this stream and channel. 644 */ 645 @Override 646 public void close() throws IOException { 647 channel.close(); 648 } 649 650 @Override 651 public int read() throws IOException { 652 if (!channel.isBlocking()) { 653 throw new IllegalBlockingModeException(); 654 } 655 ByteBuffer buf = ByteBuffer.allocate(1); 656 int result = channel.read(buf); 657 return (result == -1) ? result : (buf.get(0) & 0xff); 658 } 659 660 @Override 661 public int read(byte[] buffer, int offset, int byteCount) throws IOException { 662 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 663 if (!channel.isBlocking()) { 664 throw new IllegalBlockingModeException(); 665 } 666 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 667 return channel.read(buf); 668 } 669 } 670} 671