SocketChannelImpl.java revision 62e34a21db1cbef5d2cec186ee4f15c5ec39d187
1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio; 19 20import java.io.FileDescriptor; 21import java.io.IOException; 22import java.io.InputStream; 23import java.io.OutputStream; 24import java.net.ConnectException; 25import java.net.InetAddress; 26import java.net.InetSocketAddress; 27import java.net.Socket; 28import java.net.SocketAddress; 29import java.net.SocketException; 30import java.net.SocketUtils; 31import java.net.UnknownHostException; 32import java.nio.channels.AlreadyConnectedException; 33import java.nio.channels.ClosedChannelException; 34import java.nio.channels.ConnectionPendingException; 35import java.nio.channels.IllegalBlockingModeException; 36import java.nio.channels.NoConnectionPendingException; 37import java.nio.channels.NotYetConnectedException; 38import java.nio.channels.SocketChannel; 39import java.nio.channels.UnresolvedAddressException; 40import java.nio.channels.UnsupportedAddressTypeException; 41import java.nio.channels.spi.SelectorProvider; 42import java.util.Arrays; 43import libcore.io.IoUtils; 44import org.apache.harmony.luni.net.PlainSocketImpl; 45import org.apache.harmony.luni.platform.FileDescriptorHandler; 46import org.apache.harmony.luni.platform.Platform; 47 48/* 49 * The default implementation class of java.nio.channels.SocketChannel. 50 */ 51class SocketChannelImpl extends SocketChannel implements FileDescriptorHandler { 52 53 private static final int EOF = -1; 54 55 // Status un-init, not initialized. 56 static final int SOCKET_STATUS_UNINIT = EOF; 57 58 // Status before connect. 59 static final int SOCKET_STATUS_UNCONNECTED = 0; 60 61 // Status connection pending. 62 static final int SOCKET_STATUS_PENDING = 1; 63 64 // Status after connection success. 65 static final int SOCKET_STATUS_CONNECTED = 2; 66 67 // Status closed. 68 static final int SOCKET_STATUS_CLOSED = 3; 69 70 // The descriptor to interact with native code. 71 final FileDescriptor fd; 72 73 // Our internal Socket. 74 private SocketAdapter socket = null; 75 76 // The address to be connected. 77 InetSocketAddress connectAddress = null; 78 79 // Local address of the this socket (package private for adapter). 80 InetAddress localAddress = null; 81 82 // Local port number. 83 int localPort; 84 85 // At first, uninitialized. 86 int status = SOCKET_STATUS_UNINIT; 87 88 // Whether the socket is bound. 89 volatile boolean isBound = false; 90 91 private final Object readLock = new Object(); 92 93 private final Object writeLock = new Object(); 94 95 /* 96 * Constructor for creating a connected socket channel. 97 */ 98 public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException { 99 this(selectorProvider, true); 100 } 101 102 /* 103 * Constructor for creating an optionally connected socket channel. 104 */ 105 public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException { 106 super(selectorProvider); 107 fd = new FileDescriptor(); 108 status = SOCKET_STATUS_UNCONNECTED; 109 if (connect) { 110 Platform.NETWORK.socket(fd, true); 111 } 112 } 113 114 /* 115 * Getting the internal Socket If we have not the socket, we create a new 116 * one. 117 */ 118 @Override 119 synchronized public Socket socket() { 120 if (socket == null) { 121 try { 122 InetAddress addr = null; 123 int port = 0; 124 if (connectAddress != null) { 125 addr = connectAddress.getAddress(); 126 port = connectAddress.getPort(); 127 } 128 socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this); 129 } catch (SocketException e) { 130 return null; 131 } 132 } 133 return socket; 134 } 135 136 @Override 137 synchronized public boolean isConnected() { 138 return status == SOCKET_STATUS_CONNECTED; 139 } 140 141 /* 142 * Status setting used by other class. 143 */ 144 synchronized void setConnected() { 145 status = SOCKET_STATUS_CONNECTED; 146 } 147 148 void setBound(boolean flag) { 149 isBound = flag; 150 } 151 152 @Override 153 synchronized public boolean isConnectionPending() { 154 return status == SOCKET_STATUS_PENDING; 155 } 156 157 @Override 158 public boolean connect(SocketAddress socketAddress) throws IOException { 159 // status must be open and unconnected 160 checkUnconnected(); 161 162 // check the address 163 InetSocketAddress inetSocketAddress = validateAddress(socketAddress); 164 InetAddress normalAddr = inetSocketAddress.getAddress(); 165 int port = inetSocketAddress.getPort(); 166 167 // When connecting, map ANY address to Localhost 168 if (normalAddr.isAnyLocalAddress()) { 169 normalAddr = InetAddress.getLocalHost(); 170 } 171 172 // connect result 173 int result = EOF; 174 boolean finished = false; 175 176 try { 177 if (isBlocking()) { 178 begin(); 179 Platform.NETWORK.connect(fd, normalAddr, port, 0); 180 finished = true; // Or we'd have thrown an exception. 181 } else { 182 finished = Platform.NETWORK.connectNonBlocking(fd, normalAddr, port); 183 // set back to nonblocking to work around with a bug in portlib 184 if (!isBlocking()) { 185 IoUtils.setBlocking(fd, false); 186 } 187 } 188 isBound = finished; 189 } catch (IOException e) { 190 if (e instanceof ConnectException && !isBlocking()) { 191 status = SOCKET_STATUS_PENDING; 192 } else { 193 if (isOpen()) { 194 close(); 195 finished = true; 196 } 197 throw e; 198 } 199 } finally { 200 if (isBlocking()) { 201 end(finished); 202 } 203 } 204 205 initLocalAddressAndPort(); 206 connectAddress = inetSocketAddress; 207 if (socket != null) { 208 socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(), 209 connectAddress.getPort()); 210 } 211 212 synchronized (this) { 213 if (isBlocking()) { 214 status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED); 215 } else { 216 status = SOCKET_STATUS_PENDING; 217 } 218 } 219 return finished; 220 } 221 222 private void initLocalAddressAndPort() { 223 localAddress = Platform.NETWORK.getSocketLocalAddress(fd); 224 localPort = Platform.NETWORK.getSocketLocalPort(fd); 225 if (socket != null) { 226 socket.socketImpl().initLocalPort(localPort); 227 } 228 } 229 230 @Override 231 public boolean finishConnect() throws IOException { 232 // status check 233 synchronized (this) { 234 if (!isOpen()) { 235 throw new ClosedChannelException(); 236 } 237 if (status == SOCKET_STATUS_CONNECTED) { 238 return true; 239 } 240 if (status != SOCKET_STATUS_PENDING) { 241 throw new NoConnectionPendingException(); 242 } 243 } 244 245 boolean finished = false; 246 try { 247 begin(); 248 final int WAIT_FOREVER = -1; 249 final int POLL = 0; 250 finished = Platform.NETWORK.isConnected(fd, isBlocking() ? WAIT_FOREVER : POLL); 251 isBound = finished; 252 initLocalAddressAndPort(); 253 } catch (ConnectException e) { 254 if (isOpen()) { 255 close(); 256 finished = true; 257 } 258 throw e; 259 } finally { 260 end(finished); 261 } 262 263 synchronized (this) { 264 status = (finished ? SOCKET_STATUS_CONNECTED : status); 265 isBound = finished; 266 // TPE: Workaround for bug that turns socket back to blocking 267 if (!isBlocking()) implConfigureBlocking(false); 268 } 269 return finished; 270 } 271 272 void finishAccept() { 273 initLocalAddressAndPort(); 274 } 275 276 @Override 277 public int read(ByteBuffer target) throws IOException { 278 FileChannelImpl.checkWritable(target); 279 checkOpenConnected(); 280 if (!target.hasRemaining()) { 281 return 0; 282 } 283 284 int readCount; 285 if (target.isDirect() || target.hasArray()) { 286 readCount = readImpl(target); 287 if (readCount > 0) { 288 target.position(target.position() + readCount); 289 } 290 } else { 291 ByteBuffer readBuffer = null; 292 byte[] readArray = null; 293 readArray = new byte[target.remaining()]; 294 readBuffer = ByteBuffer.wrap(readArray); 295 readCount = readImpl(readBuffer); 296 if (readCount > 0) { 297 target.put(readArray, 0, readCount); 298 } 299 } 300 return readCount; 301 } 302 303 @Override 304 public long read(ByteBuffer[] targets, int offset, int length) throws IOException { 305 Arrays.checkOffsetAndCount(targets.length, offset, length); 306 checkOpenConnected(); 307 int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true); 308 if (totalCount == 0) { 309 return 0; 310 } 311 byte[] readArray = new byte[totalCount]; 312 ByteBuffer readBuffer = ByteBuffer.wrap(readArray); 313 int readCount; 314 // read data to readBuffer, and then transfer data from readBuffer to 315 // targets. 316 readCount = readImpl(readBuffer); 317 if (readCount > 0) { 318 int left = readCount; 319 int index = offset; 320 // transfer data from readArray to targets 321 while (left > 0) { 322 int putLength = Math.min(targets[index].remaining(), left); 323 targets[index].put(readArray, readCount - left, putLength); 324 index++; 325 left -= putLength; 326 } 327 } 328 return readCount; 329 } 330 331 /** 332 * Read from channel, and store the result in the target. 333 * 334 * @param target 335 * output parameter 336 */ 337 private int readImpl(ByteBuffer target) throws IOException { 338 synchronized (readLock) { 339 int readCount = 0; 340 try { 341 if (isBlocking()) { 342 begin(); 343 } 344 int offset = target.position(); 345 int length = target.remaining(); 346 if (target.isDirect()) { 347 int address = NioUtils.getDirectBufferAddress(target); 348 readCount = Platform.NETWORK.readDirect(fd, address + offset, length); 349 } else { 350 // target is assured to have array. 351 byte[] array = target.array(); 352 offset += target.arrayOffset(); 353 readCount = Platform.NETWORK.read(fd, array, offset, length); 354 } 355 return readCount; 356 } finally { 357 if (isBlocking()) { 358 end(readCount > 0); 359 } 360 } 361 } 362 } 363 364 @Override 365 public int write(ByteBuffer source) throws IOException { 366 if (source == null) { 367 throw new NullPointerException(); 368 } 369 checkOpenConnected(); 370 if (!source.hasRemaining()) { 371 return 0; 372 } 373 return writeImpl(source); 374 } 375 376 @Override 377 public long write(ByteBuffer[] sources, int offset, int length) throws IOException { 378 Arrays.checkOffsetAndCount(sources.length, offset, length); 379 checkOpenConnected(); 380 int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false); 381 if (count == 0) { 382 return 0; 383 } 384 ByteBuffer writeBuf = ByteBuffer.allocate(count); 385 for (int val = offset; val < length + offset; val++) { 386 ByteBuffer source = sources[val]; 387 int oldPosition = source.position(); 388 writeBuf.put(source); 389 source.position(oldPosition); 390 } 391 writeBuf.flip(); 392 int result = writeImpl(writeBuf); 393 int val = offset; 394 int written = result; 395 while (result > 0) { 396 ByteBuffer source = sources[val]; 397 int gap = Math.min(result, source.remaining()); 398 source.position(source.position() + gap); 399 val++; 400 result -= gap; 401 } 402 return written; 403 } 404 405 /* 406 * Write the source. return the count of bytes written. 407 */ 408 private int writeImpl(ByteBuffer source) throws IOException { 409 synchronized (writeLock) { 410 if (!source.hasRemaining()) { 411 return 0; 412 } 413 int writeCount = 0; 414 try { 415 int pos = source.position(); 416 int length = source.remaining(); 417 if (isBlocking()) { 418 begin(); 419 } 420 if (source.isDirect()) { 421 int address = NioUtils.getDirectBufferAddress(source); 422 writeCount = Platform.NETWORK.writeDirect(fd, address, pos, length); 423 } else if (source.hasArray()) { 424 pos += source.arrayOffset(); 425 writeCount = Platform.NETWORK.write(fd, source.array(), pos, length); 426 } else { 427 byte[] array = new byte[length]; 428 source.get(array); 429 writeCount = Platform.NETWORK.write(fd, array, 0, length); 430 } 431 source.position(pos + writeCount); 432 } finally { 433 if (isBlocking()) { 434 end(writeCount >= 0); 435 } 436 } 437 return writeCount; 438 } 439 } 440 441 /* 442 * Status check, open and "connected", when read and write. 443 */ 444 synchronized private void checkOpenConnected() throws ClosedChannelException { 445 if (!isOpen()) { 446 throw new ClosedChannelException(); 447 } 448 if (!isConnected()) { 449 throw new NotYetConnectedException(); 450 } 451 } 452 453 /* 454 * Status check, open and "unconnected", before connection. 455 */ 456 synchronized private void checkUnconnected() throws IOException { 457 if (!isOpen()) { 458 throw new ClosedChannelException(); 459 } 460 if (status == SOCKET_STATUS_CONNECTED) { 461 throw new AlreadyConnectedException(); 462 } 463 if (status == SOCKET_STATUS_PENDING) { 464 throw new ConnectionPendingException(); 465 } 466 } 467 468 /* 469 * Shared by this class and DatagramChannelImpl, to do the address transfer 470 * and check. 471 */ 472 static InetSocketAddress validateAddress(SocketAddress socketAddress) { 473 if (socketAddress == null) { 474 throw new IllegalArgumentException(); 475 } 476 if (!(socketAddress instanceof InetSocketAddress)) { 477 throw new UnsupportedAddressTypeException(); 478 } 479 InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; 480 if (inetSocketAddress.isUnresolved()) { 481 throw new UnresolvedAddressException(); 482 } 483 return inetSocketAddress; 484 } 485 486 /* 487 * Get local address. 488 */ 489 public InetAddress getLocalAddress() throws UnknownHostException { 490 if (!isBound) { 491 byte[] any_bytes = { 0, 0, 0, 0 }; 492 return InetAddress.getByAddress(any_bytes); 493 } 494 return localAddress; 495 } 496 497 /* 498 * Do really closing action here. 499 */ 500 @Override 501 protected synchronized void implCloseSelectableChannel() throws IOException { 502 if (status != SOCKET_STATUS_CLOSED) { 503 status = SOCKET_STATUS_CLOSED; 504 if (socket != null && !socket.isClosed()) { 505 socket.close(); 506 } else { 507 Platform.NETWORK.close(fd); 508 } 509 } 510 } 511 512 @Override 513 protected void implConfigureBlocking(boolean blockMode) throws IOException { 514 synchronized (blockingLock()) { 515 IoUtils.setBlocking(fd, blockMode); 516 } 517 } 518 519 /* 520 * Get the fd. 521 */ 522 public FileDescriptor getFD() { 523 return fd; 524 } 525 526 /* 527 * Adapter classes for internal socket. 528 */ 529 private static class SocketAdapter extends Socket { 530 private final SocketChannelImpl channel; 531 private final PlainSocketImpl socketImpl; 532 533 SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException { 534 super(socketImpl); 535 this.socketImpl = socketImpl; 536 this.channel = channel; 537 SocketUtils.setCreated(this); 538 } 539 540 PlainSocketImpl socketImpl() { 541 return socketImpl; 542 } 543 544 @Override 545 public SocketChannel getChannel() { 546 return channel; 547 } 548 549 @Override 550 public boolean isBound() { 551 return channel.isBound; 552 } 553 554 @Override 555 public boolean isConnected() { 556 return channel.isConnected(); 557 } 558 559 @Override 560 public InetAddress getLocalAddress() { 561 try { 562 return channel.getLocalAddress(); 563 } catch (UnknownHostException e) { 564 return null; 565 } 566 } 567 568 @Override 569 public void connect(SocketAddress remoteAddr, int timeout) throws IOException { 570 if (!channel.isBlocking()) { 571 throw new IllegalBlockingModeException(); 572 } 573 if (isConnected()) { 574 throw new AlreadyConnectedException(); 575 } 576 super.connect(remoteAddr, timeout); 577 channel.initLocalAddressAndPort(); 578 if (super.isConnected()) { 579 channel.setConnected(); 580 channel.isBound = super.isBound(); 581 } 582 } 583 584 @Override 585 public void bind(SocketAddress localAddr) throws IOException { 586 if (channel.isConnected()) { 587 throw new AlreadyConnectedException(); 588 } 589 if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) { 590 throw new ConnectionPendingException(); 591 } 592 super.bind(localAddr); 593 // keep here to see if need next version 594 // channel.Address = getLocalSocketAddress(); 595 // channel.localport = getLocalPort(); 596 channel.isBound = true; 597 } 598 599 @Override 600 public void close() throws IOException { 601 synchronized (channel) { 602 if (channel.isOpen()) { 603 channel.close(); 604 } else { 605 super.close(); 606 } 607 channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED; 608 } 609 } 610 611 @Override 612 public OutputStream getOutputStream() throws IOException { 613 checkOpenAndConnected(); 614 if (isOutputShutdown()) { 615 throw new SocketException("Socket output is shutdown"); 616 } 617 return new SocketChannelOutputStream(channel); 618 } 619 620 @Override 621 public InputStream getInputStream() throws IOException { 622 checkOpenAndConnected(); 623 if (isInputShutdown()) { 624 throw new SocketException("Socket input is shutdown"); 625 } 626 return new SocketChannelInputStream(channel); 627 } 628 629 private void checkOpenAndConnected() throws SocketException { 630 if (!channel.isOpen()) { 631 throw new SocketException("Socket is closed"); 632 } 633 if (!channel.isConnected()) { 634 throw new SocketException("Socket is not connected"); 635 } 636 } 637 } 638 639 /* 640 * This output stream delegates all operations to the associated channel. 641 * Throws an IllegalBlockingModeException if the channel is in non-blocking 642 * mode when performing write operations. 643 */ 644 private static class SocketChannelOutputStream extends OutputStream { 645 private final SocketChannel channel; 646 647 public SocketChannelOutputStream(SocketChannel channel) { 648 this.channel = channel; 649 } 650 651 /* 652 * Closes this stream and channel. 653 * 654 * @exception IOException thrown if an error occurs during the close 655 */ 656 @Override 657 public void close() throws IOException { 658 channel.close(); 659 } 660 661 @Override 662 public void write(byte[] buffer, int offset, int byteCount) throws IOException { 663 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 664 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 665 if (!channel.isBlocking()) { 666 throw new IllegalBlockingModeException(); 667 } 668 channel.write(buf); 669 } 670 671 @Override 672 public void write(int oneByte) throws IOException { 673 if (!channel.isBlocking()) { 674 throw new IllegalBlockingModeException(); 675 } 676 ByteBuffer buffer = ByteBuffer.allocate(1); 677 buffer.put(0, (byte) (oneByte & 0xFF)); 678 channel.write(buffer); 679 } 680 } 681 682 /* 683 * This input stream delegates all operations to the associated channel. 684 * Throws an IllegalBlockingModeException if the channel is in non-blocking 685 * mode when performing read operations. 686 */ 687 private static class SocketChannelInputStream extends InputStream { 688 private final SocketChannel channel; 689 690 public SocketChannelInputStream(SocketChannel channel) { 691 this.channel = channel; 692 } 693 694 /* 695 * Closes this stream and channel. 696 */ 697 @Override 698 public void close() throws IOException { 699 channel.close(); 700 } 701 702 @Override 703 public int read() throws IOException { 704 if (!channel.isBlocking()) { 705 throw new IllegalBlockingModeException(); 706 } 707 ByteBuffer buf = ByteBuffer.allocate(1); 708 int result = channel.read(buf); 709 return (result == -1) ? result : (buf.get(0) & 0xff); 710 } 711 712 @Override 713 public int read(byte[] buffer, int offset, int byteCount) throws IOException { 714 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 715 if (!channel.isBlocking()) { 716 throw new IllegalBlockingModeException(); 717 } 718 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 719 return channel.read(buf); 720 } 721 } 722} 723