1/* 2 * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * - Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * - Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * - Neither the name of Oracle nor the names of its 16 * contributors may be used to endorse or promote products derived 17 * from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32/* 33 * This source code is provided to illustrate the usage of a given feature 34 * or technique and has been deliberately simplified. Additional steps 35 * required for a production-quality application, such as security checks, 36 * input validation and proper error handling, might not be present in 37 * this sample code. 38 */ 39 40 41import java.io.IOException; 42import java.nio.ByteBuffer; 43import java.nio.channels.AsynchronousSocketChannel; 44import java.nio.channels.CompletionHandler; 45import java.util.LinkedList; 46import java.util.Queue; 47import java.util.concurrent.atomic.AtomicReference; 48 49/** 50 * Client represents a remote connection to the chat server. 51 * It contains methods for reading and writing messages from the 52 * channel. 53 * Messages are considered to be separated by newline, so incomplete 54 * messages are buffered in the {@code Client}. 55 * 56 * All reads and writes are asynchronous and uses the nio2 asynchronous 57 * elements. 58 */ 59class Client { 60 private final AsynchronousSocketChannel channel; 61 private AtomicReference<ClientReader> reader; 62 private String userName; 63 private final StringBuilder messageBuffer = new StringBuilder(); 64 65 private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>(); 66 private boolean writing = false; 67 68 public Client(AsynchronousSocketChannel channel, ClientReader reader) { 69 this.channel = channel; 70 this.reader = new AtomicReference<ClientReader>(reader); 71 } 72 73 /** 74 * Enqueues a write of the buffer to the channel. 75 * The call is asynchronous so the buffer is not safe to modify after 76 * passing the buffer here. 77 * 78 * @param buffer the buffer to send to the channel 79 */ 80 private void writeMessage(final ByteBuffer buffer) { 81 boolean threadShouldWrite = false; 82 83 synchronized(queue) { 84 queue.add(buffer); 85 // Currently no thread writing, make this thread dispatch a write 86 if (!writing) { 87 writing = true; 88 threadShouldWrite = true; 89 } 90 } 91 92 if (threadShouldWrite) { 93 writeFromQueue(); 94 } 95 } 96 97 private void writeFromQueue() { 98 ByteBuffer buffer; 99 100 synchronized (queue) { 101 buffer = queue.poll(); 102 if (buffer == null) { 103 writing = false; 104 } 105 } 106 107 // No new data in buffer to write 108 if (writing) { 109 writeBuffer(buffer); 110 } 111 } 112 113 private void writeBuffer(ByteBuffer buffer) { 114 channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { 115 @Override 116 public void completed(Integer result, ByteBuffer buffer) { 117 if (buffer.hasRemaining()) { 118 channel.write(buffer, buffer, this); 119 } else { 120 // Go back and check if there is new data to write 121 writeFromQueue(); 122 } 123 } 124 125 @Override 126 public void failed(Throwable exc, ByteBuffer attachment) { 127 } 128 }); 129 } 130 131 /** 132 * Sends a message 133 * @param string the message 134 */ 135 public void writeStringMessage(String string) { 136 writeMessage(ByteBuffer.wrap(string.getBytes())); 137 } 138 139 /** 140 * Send a message from a specific client 141 * @param client the message is sent from 142 * @param message to send 143 */ 144 public void writeMessageFrom(Client client, String message) { 145 if (reader.get().acceptsMessages()) { 146 writeStringMessage(client.getUserName() + ": " + message); 147 } 148 } 149 150 /** 151 * Enqueue a read 152 * @param completionHandler callback on completed read 153 */ 154 public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) { 155 ByteBuffer input = ByteBuffer.allocate(256); 156 if (!channel.isOpen()) { 157 return; 158 } 159 channel.read(input, input, completionHandler); 160 } 161 162 /** 163 * Closes the channel 164 */ 165 public void close() { 166 try { 167 channel.close(); 168 } catch (IOException e) { 169 e.printStackTrace(); 170 } 171 } 172 173 /** 174 * Run the current states actions. 175 */ 176 public void run() { 177 reader.get().run(this); 178 } 179 180 public void setUserName(String userName) { 181 this.userName = userName; 182 } 183 184 public void setReader(ClientReader reader) { 185 this.reader.set(reader); 186 } 187 188 public String getUserName() { 189 return userName; 190 } 191 192 public void appendMessage(String message) { 193 synchronized (messageBuffer) { 194 messageBuffer.append(message); 195 } 196 } 197 198 /** 199 * @return the next newline separated message in the buffer. null is returned if the buffer 200 * doesn't contain any newline. 201 */ 202 public String nextMessage() { 203 synchronized(messageBuffer) { 204 int nextNewline = messageBuffer.indexOf("\n"); 205 if (nextNewline == -1) { 206 return null; 207 } 208 String message = messageBuffer.substring(0, nextNewline + 1); 209 messageBuffer.delete(0, nextNewline + 1); 210 return message; 211 } 212 } 213} 214