SocketChannelImpl.java revision e27d02044ec399884bfd4343d513bf270b9b9b11
1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package java.nio; 19 20import java.io.FileDescriptor; 21import java.io.IOException; 22import java.io.InputStream; 23import java.io.OutputStream; 24import java.net.ConnectException; 25import java.net.Inet4Address; 26import java.net.InetAddress; 27import java.net.InetSocketAddress; 28import java.net.PlainSocketImpl; 29import java.net.Socket; 30import java.net.SocketAddress; 31import java.net.SocketException; 32import java.net.SocketUtils; 33import java.net.UnknownHostException; 34import java.nio.channels.AlreadyConnectedException; 35import java.nio.channels.ClosedChannelException; 36import java.nio.channels.ConnectionPendingException; 37import java.nio.channels.IllegalBlockingModeException; 38import java.nio.channels.NoConnectionPendingException; 39import java.nio.channels.NotYetConnectedException; 40import java.nio.channels.SocketChannel; 41import java.nio.channels.UnresolvedAddressException; 42import java.nio.channels.UnsupportedAddressTypeException; 43import java.nio.channels.spi.SelectorProvider; 44import java.util.Arrays; 45import libcore.io.ErrnoException; 46import libcore.io.Libcore; 47import libcore.io.IoUtils; 48import org.apache.harmony.luni.platform.Platform; 49import static libcore.io.OsConstants.*; 50 51/* 52 * The default implementation class of java.nio.channels.SocketChannel. 53 */ 54class SocketChannelImpl extends SocketChannel implements FileDescriptorChannel { 55 private static final int SOCKET_STATUS_UNINITIALIZED = -1; 56 57 // Status before connect. 58 private static final int SOCKET_STATUS_UNCONNECTED = 0; 59 60 // Status connection pending. 61 private static final int SOCKET_STATUS_PENDING = 1; 62 63 // Status after connection success. 64 private static final int SOCKET_STATUS_CONNECTED = 2; 65 66 // Status closed. 67 private static final int SOCKET_STATUS_CLOSED = 3; 68 69 private final FileDescriptor fd; 70 71 // Our internal Socket. 72 private SocketAdapter socket = null; 73 74 // The address to be connected. 75 private InetSocketAddress connectAddress = null; 76 77 private InetAddress localAddress = null; 78 private int localPort; 79 80 private int status = SOCKET_STATUS_UNINITIALIZED; 81 82 // Whether the socket is bound. 83 private volatile boolean isBound = false; 84 85 private final Object readLock = new Object(); 86 87 private final Object writeLock = new Object(); 88 89 /* 90 * Constructor for creating a connected socket channel. 91 */ 92 public SocketChannelImpl(SelectorProvider selectorProvider) throws IOException { 93 this(selectorProvider, true); 94 } 95 96 /* 97 * Constructor for creating an optionally connected socket channel. 98 */ 99 public SocketChannelImpl(SelectorProvider selectorProvider, boolean connect) throws IOException { 100 super(selectorProvider); 101 status = SOCKET_STATUS_UNCONNECTED; 102 fd = (connect ? IoUtils.socket(true) : new FileDescriptor()); 103 } 104 105 /* 106 * Getting the internal Socket If we have not the socket, we create a new 107 * one. 108 */ 109 @Override 110 synchronized public Socket socket() { 111 if (socket == null) { 112 try { 113 InetAddress addr = null; 114 int port = 0; 115 if (connectAddress != null) { 116 addr = connectAddress.getAddress(); 117 port = connectAddress.getPort(); 118 } 119 socket = new SocketAdapter(new PlainSocketImpl(fd, localPort, addr, port), this); 120 } catch (SocketException e) { 121 return null; 122 } 123 } 124 return socket; 125 } 126 127 @Override 128 synchronized public boolean isConnected() { 129 return status == SOCKET_STATUS_CONNECTED; 130 } 131 132 /* 133 * Status setting used by other class. 134 */ 135 synchronized void setConnected() { 136 status = SOCKET_STATUS_CONNECTED; 137 } 138 139 void setBound(boolean flag) { 140 isBound = flag; 141 } 142 143 @Override 144 synchronized public boolean isConnectionPending() { 145 return status == SOCKET_STATUS_PENDING; 146 } 147 148 @Override 149 public boolean connect(SocketAddress socketAddress) throws IOException { 150 // status must be open and unconnected 151 checkUnconnected(); 152 153 // check the address 154 InetSocketAddress inetSocketAddress = validateAddress(socketAddress); 155 InetAddress normalAddr = inetSocketAddress.getAddress(); 156 int port = inetSocketAddress.getPort(); 157 158 // When connecting, map ANY address to localhost 159 if (normalAddr.isAnyLocalAddress()) { 160 normalAddr = InetAddress.getLocalHost(); 161 } 162 163 boolean finished = false; 164 try { 165 if (isBlocking()) { 166 begin(); 167 } 168 finished = IoUtils.connect(fd, normalAddr, port); 169 isBound = finished; 170 } catch (IOException e) { 171 if (e instanceof ConnectException && !isBlocking()) { 172 status = SOCKET_STATUS_PENDING; 173 } else { 174 if (isOpen()) { 175 close(); 176 finished = true; 177 } 178 throw e; 179 } 180 } finally { 181 if (isBlocking()) { 182 end(finished); 183 } 184 } 185 186 initLocalAddressAndPort(); 187 connectAddress = inetSocketAddress; 188 if (socket != null) { 189 socket.socketImpl().initRemoteAddressAndPort(connectAddress.getAddress(), 190 connectAddress.getPort()); 191 } 192 193 synchronized (this) { 194 if (isBlocking()) { 195 status = (finished ? SOCKET_STATUS_CONNECTED : SOCKET_STATUS_UNCONNECTED); 196 } else { 197 status = SOCKET_STATUS_PENDING; 198 } 199 } 200 return finished; 201 } 202 203 private void initLocalAddressAndPort() { 204 SocketAddress sa = Libcore.os.getsockname(fd); 205 InetSocketAddress isa = (InetSocketAddress) sa; 206 localAddress = isa.getAddress(); 207 localPort = isa.getPort(); 208 if (socket != null) { 209 socket.socketImpl().initLocalPort(localPort); 210 } 211 } 212 213 @Override 214 public boolean finishConnect() throws IOException { 215 synchronized (this) { 216 if (!isOpen()) { 217 throw new ClosedChannelException(); 218 } 219 if (status == SOCKET_STATUS_CONNECTED) { 220 return true; 221 } 222 if (status != SOCKET_STATUS_PENDING) { 223 throw new NoConnectionPendingException(); 224 } 225 } 226 227 boolean finished = false; 228 try { 229 begin(); 230 InetAddress inetAddress = connectAddress.getAddress(); 231 int port = connectAddress.getPort(); 232 finished = IoUtils.isConnected(fd, inetAddress, port, 0, 0); // Return immediately. 233 isBound = finished; 234 } catch (ConnectException e) { 235 if (isOpen()) { 236 close(); 237 finished = true; 238 } 239 throw e; 240 } finally { 241 end(finished); 242 } 243 244 synchronized (this) { 245 status = (finished ? SOCKET_STATUS_CONNECTED : status); 246 isBound = finished; 247 } 248 return finished; 249 } 250 251 void finishAccept() { 252 initLocalAddressAndPort(); 253 } 254 255 @Override 256 public int read(ByteBuffer target) throws IOException { 257 FileChannelImpl.checkWritable(target); 258 checkOpenConnected(); 259 if (!target.hasRemaining()) { 260 return 0; 261 } 262 263 int readCount; 264 if (target.isDirect() || target.hasArray()) { 265 readCount = readImpl(target); 266 if (readCount > 0) { 267 target.position(target.position() + readCount); 268 } 269 } else { 270 ByteBuffer readBuffer = null; 271 byte[] readArray = null; 272 readArray = new byte[target.remaining()]; 273 readBuffer = ByteBuffer.wrap(readArray); 274 readCount = readImpl(readBuffer); 275 if (readCount > 0) { 276 target.put(readArray, 0, readCount); 277 } 278 } 279 return readCount; 280 } 281 282 @Override 283 public long read(ByteBuffer[] targets, int offset, int length) throws IOException { 284 Arrays.checkOffsetAndCount(targets.length, offset, length); 285 checkOpenConnected(); 286 int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true); 287 if (totalCount == 0) { 288 return 0; 289 } 290 byte[] readArray = new byte[totalCount]; 291 ByteBuffer readBuffer = ByteBuffer.wrap(readArray); 292 int readCount; 293 // read data to readBuffer, and then transfer data from readBuffer to 294 // targets. 295 readCount = readImpl(readBuffer); 296 if (readCount > 0) { 297 int left = readCount; 298 int index = offset; 299 // transfer data from readArray to targets 300 while (left > 0) { 301 int putLength = Math.min(targets[index].remaining(), left); 302 targets[index].put(readArray, readCount - left, putLength); 303 index++; 304 left -= putLength; 305 } 306 } 307 return readCount; 308 } 309 310 /** 311 * Read from channel, and store the result in the target. 312 * 313 * @param target 314 * output parameter 315 */ 316 private int readImpl(ByteBuffer target) throws IOException { 317 synchronized (readLock) { 318 int readCount = 0; 319 try { 320 if (isBlocking()) { 321 begin(); 322 } 323 int offset = target.position(); 324 int length = target.remaining(); 325 if (target.isDirect()) { 326 int address = NioUtils.getDirectBufferAddress(target); 327 readCount = Platform.NETWORK.readDirect(fd, address + offset, length); 328 } else { 329 // target is assured to have array. 330 byte[] array = target.array(); 331 offset += target.arrayOffset(); 332 readCount = Platform.NETWORK.read(fd, array, offset, length); 333 } 334 return readCount; 335 } finally { 336 if (isBlocking()) { 337 end(readCount > 0); 338 } 339 } 340 } 341 } 342 343 @Override 344 public int write(ByteBuffer source) throws IOException { 345 if (source == null) { 346 throw new NullPointerException(); 347 } 348 checkOpenConnected(); 349 if (!source.hasRemaining()) { 350 return 0; 351 } 352 return writeImpl(source); 353 } 354 355 @Override 356 public long write(ByteBuffer[] sources, int offset, int length) throws IOException { 357 Arrays.checkOffsetAndCount(sources.length, offset, length); 358 checkOpenConnected(); 359 int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false); 360 if (count == 0) { 361 return 0; 362 } 363 ByteBuffer writeBuf = ByteBuffer.allocate(count); 364 for (int val = offset; val < length + offset; val++) { 365 ByteBuffer source = sources[val]; 366 int oldPosition = source.position(); 367 writeBuf.put(source); 368 source.position(oldPosition); 369 } 370 writeBuf.flip(); 371 int result = writeImpl(writeBuf); 372 int val = offset; 373 int written = result; 374 while (result > 0) { 375 ByteBuffer source = sources[val]; 376 int gap = Math.min(result, source.remaining()); 377 source.position(source.position() + gap); 378 val++; 379 result -= gap; 380 } 381 return written; 382 } 383 384 /* 385 * Write the source. return the count of bytes written. 386 */ 387 private int writeImpl(ByteBuffer source) throws IOException { 388 synchronized (writeLock) { 389 if (!source.hasRemaining()) { 390 return 0; 391 } 392 int writeCount = 0; 393 try { 394 int pos = source.position(); 395 int length = source.remaining(); 396 if (isBlocking()) { 397 begin(); 398 } 399 if (source.isDirect()) { 400 int address = NioUtils.getDirectBufferAddress(source); 401 writeCount = Platform.NETWORK.writeDirect(fd, address, pos, length); 402 } else if (source.hasArray()) { 403 pos += source.arrayOffset(); 404 writeCount = Platform.NETWORK.write(fd, source.array(), pos, length); 405 } else { 406 byte[] array = new byte[length]; 407 source.get(array); 408 writeCount = Platform.NETWORK.write(fd, array, 0, length); 409 } 410 source.position(pos + writeCount); 411 } finally { 412 if (isBlocking()) { 413 end(writeCount >= 0); 414 } 415 } 416 return writeCount; 417 } 418 } 419 420 /* 421 * Status check, open and "connected", when read and write. 422 */ 423 synchronized private void checkOpenConnected() throws ClosedChannelException { 424 if (!isOpen()) { 425 throw new ClosedChannelException(); 426 } 427 if (!isConnected()) { 428 throw new NotYetConnectedException(); 429 } 430 } 431 432 /* 433 * Status check, open and "unconnected", before connection. 434 */ 435 synchronized private void checkUnconnected() throws IOException { 436 if (!isOpen()) { 437 throw new ClosedChannelException(); 438 } 439 if (status == SOCKET_STATUS_CONNECTED) { 440 throw new AlreadyConnectedException(); 441 } 442 if (status == SOCKET_STATUS_PENDING) { 443 throw new ConnectionPendingException(); 444 } 445 } 446 447 /* 448 * Shared by this class and DatagramChannelImpl, to do the address transfer 449 * and check. 450 */ 451 static InetSocketAddress validateAddress(SocketAddress socketAddress) { 452 if (socketAddress == null) { 453 throw new IllegalArgumentException(); 454 } 455 if (!(socketAddress instanceof InetSocketAddress)) { 456 throw new UnsupportedAddressTypeException(); 457 } 458 InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; 459 if (inetSocketAddress.isUnresolved()) { 460 throw new UnresolvedAddressException(); 461 } 462 return inetSocketAddress; 463 } 464 465 /* 466 * Get local address. 467 */ 468 public InetAddress getLocalAddress() throws UnknownHostException { 469 return isBound ? localAddress : Inet4Address.ANY; 470 } 471 472 /* 473 * Do really closing action here. 474 */ 475 @Override 476 protected synchronized void implCloseSelectableChannel() throws IOException { 477 if (status != SOCKET_STATUS_CLOSED) { 478 status = SOCKET_STATUS_CLOSED; 479 if (socket != null && !socket.isClosed()) { 480 socket.close(); 481 } else { 482 IoUtils.closeSocket(fd); 483 } 484 } 485 } 486 487 @Override protected void implConfigureBlocking(boolean blocking) throws IOException { 488 synchronized (blockingLock()) { 489 IoUtils.setBlocking(fd, blocking); 490 } 491 } 492 493 /* 494 * Get the fd. 495 */ 496 public FileDescriptor getFD() { 497 return fd; 498 } 499 500 /* 501 * Adapter classes for internal socket. 502 */ 503 private static class SocketAdapter extends Socket { 504 private final SocketChannelImpl channel; 505 private final PlainSocketImpl socketImpl; 506 507 SocketAdapter(PlainSocketImpl socketImpl, SocketChannelImpl channel) throws SocketException { 508 super(socketImpl); 509 this.socketImpl = socketImpl; 510 this.channel = channel; 511 SocketUtils.setCreated(this); 512 } 513 514 PlainSocketImpl socketImpl() { 515 return socketImpl; 516 } 517 518 @Override 519 public SocketChannel getChannel() { 520 return channel; 521 } 522 523 @Override 524 public boolean isBound() { 525 return channel.isBound; 526 } 527 528 @Override 529 public boolean isConnected() { 530 return channel.isConnected(); 531 } 532 533 @Override 534 public InetAddress getLocalAddress() { 535 try { 536 return channel.getLocalAddress(); 537 } catch (UnknownHostException e) { 538 return null; 539 } 540 } 541 542 @Override 543 public void connect(SocketAddress remoteAddr, int timeout) throws IOException { 544 if (!channel.isBlocking()) { 545 throw new IllegalBlockingModeException(); 546 } 547 if (isConnected()) { 548 throw new AlreadyConnectedException(); 549 } 550 super.connect(remoteAddr, timeout); 551 channel.initLocalAddressAndPort(); 552 if (super.isConnected()) { 553 channel.setConnected(); 554 channel.isBound = super.isBound(); 555 } 556 } 557 558 @Override 559 public void bind(SocketAddress localAddr) throws IOException { 560 if (channel.isConnected()) { 561 throw new AlreadyConnectedException(); 562 } 563 if (SocketChannelImpl.SOCKET_STATUS_PENDING == channel.status) { 564 throw new ConnectionPendingException(); 565 } 566 super.bind(localAddr); 567 // keep here to see if need next version 568 // channel.Address = getLocalSocketAddress(); 569 // channel.localport = getLocalPort(); 570 channel.isBound = true; 571 } 572 573 @Override 574 public void close() throws IOException { 575 synchronized (channel) { 576 if (channel.isOpen()) { 577 channel.close(); 578 } else { 579 super.close(); 580 } 581 channel.status = SocketChannelImpl.SOCKET_STATUS_CLOSED; 582 } 583 } 584 585 @Override 586 public OutputStream getOutputStream() throws IOException { 587 checkOpenAndConnected(); 588 if (isOutputShutdown()) { 589 throw new SocketException("Socket output is shutdown"); 590 } 591 return new SocketChannelOutputStream(channel); 592 } 593 594 @Override 595 public InputStream getInputStream() throws IOException { 596 checkOpenAndConnected(); 597 if (isInputShutdown()) { 598 throw new SocketException("Socket input is shutdown"); 599 } 600 return new SocketChannelInputStream(channel); 601 } 602 603 private void checkOpenAndConnected() throws SocketException { 604 if (!channel.isOpen()) { 605 throw new SocketException("Socket is closed"); 606 } 607 if (!channel.isConnected()) { 608 throw new SocketException("Socket is not connected"); 609 } 610 } 611 } 612 613 /* 614 * This output stream delegates all operations to the associated channel. 615 * Throws an IllegalBlockingModeException if the channel is in non-blocking 616 * mode when performing write operations. 617 */ 618 private static class SocketChannelOutputStream extends OutputStream { 619 private final SocketChannel channel; 620 621 public SocketChannelOutputStream(SocketChannel channel) { 622 this.channel = channel; 623 } 624 625 /* 626 * Closes this stream and channel. 627 * 628 * @exception IOException thrown if an error occurs during the close 629 */ 630 @Override 631 public void close() throws IOException { 632 channel.close(); 633 } 634 635 @Override 636 public void write(byte[] buffer, int offset, int byteCount) throws IOException { 637 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 638 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 639 if (!channel.isBlocking()) { 640 throw new IllegalBlockingModeException(); 641 } 642 channel.write(buf); 643 } 644 645 @Override 646 public void write(int oneByte) throws IOException { 647 if (!channel.isBlocking()) { 648 throw new IllegalBlockingModeException(); 649 } 650 ByteBuffer buffer = ByteBuffer.allocate(1); 651 buffer.put(0, (byte) (oneByte & 0xFF)); 652 channel.write(buffer); 653 } 654 } 655 656 /* 657 * This input stream delegates all operations to the associated channel. 658 * Throws an IllegalBlockingModeException if the channel is in non-blocking 659 * mode when performing read operations. 660 */ 661 private static class SocketChannelInputStream extends InputStream { 662 private final SocketChannel channel; 663 664 public SocketChannelInputStream(SocketChannel channel) { 665 this.channel = channel; 666 } 667 668 /* 669 * Closes this stream and channel. 670 */ 671 @Override 672 public void close() throws IOException { 673 channel.close(); 674 } 675 676 @Override 677 public int read() throws IOException { 678 if (!channel.isBlocking()) { 679 throw new IllegalBlockingModeException(); 680 } 681 ByteBuffer buf = ByteBuffer.allocate(1); 682 int result = channel.read(buf); 683 return (result == -1) ? result : (buf.get(0) & 0xff); 684 } 685 686 @Override 687 public int read(byte[] buffer, int offset, int byteCount) throws IOException { 688 Arrays.checkOffsetAndCount(buffer.length, offset, byteCount); 689 if (!channel.isBlocking()) { 690 throw new IllegalBlockingModeException(); 691 } 692 ByteBuffer buf = ByteBuffer.wrap(buffer, offset, byteCount); 693 return channel.read(buf); 694 } 695 } 696} 697