1/*
2 * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 *   - Redistributions of source code must retain the above copyright
9 *     notice, this list of conditions and the following disclaimer.
10 *
11 *   - Redistributions in binary form must reproduce the above copyright
12 *     notice, this list of conditions and the following disclaimer in the
13 *     documentation and/or other materials provided with the distribution.
14 *
15 *   - Neither the name of Oracle nor the names of its
16 *     contributors may be used to endorse or promote products derived
17 *     from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 * This source code is provided to illustrate the usage of a given feature
34 * or technique and has been deliberately simplified. Additional steps
35 * required for a production-quality application, such as security checks,
36 * input validation and proper error handling, might not be present in
37 * this sample code.
38 */
39
40
41import java.io.IOException;
42import java.nio.ByteBuffer;
43import java.nio.channels.AsynchronousSocketChannel;
44import java.nio.channels.CompletionHandler;
45import java.util.LinkedList;
46import java.util.Queue;
47import java.util.concurrent.atomic.AtomicReference;
48
49/**
50 * Client represents a remote connection to the chat server.
51 * It contains methods for reading and writing messages from the
52 * channel.
53 * Messages are considered to be separated by newline, so incomplete
54 * messages are buffered in the {@code Client}.
55 *
56 * All reads and writes are asynchronous and uses the nio2 asynchronous
57 * elements.
58 */
59class Client {
60    private final AsynchronousSocketChannel channel;
61    private AtomicReference<ClientReader> reader;
62    private String userName;
63    private final StringBuilder messageBuffer = new StringBuilder();
64
65    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
66    private boolean writing = false;
67
68    public Client(AsynchronousSocketChannel channel, ClientReader reader) {
69        this.channel = channel;
70        this.reader = new AtomicReference<ClientReader>(reader);
71    }
72
73    /**
74     * Enqueues a write of the buffer to the channel.
75     * The call is asynchronous so the buffer is not safe to modify after
76     * passing the buffer here.
77     *
78     * @param buffer the buffer to send to the channel
79     */
80    private void writeMessage(final ByteBuffer buffer) {
81        boolean threadShouldWrite = false;
82
83        synchronized(queue) {
84            queue.add(buffer);
85            // Currently no thread writing, make this thread dispatch a write
86            if (!writing) {
87                writing = true;
88                threadShouldWrite = true;
89            }
90        }
91
92        if (threadShouldWrite) {
93            writeFromQueue();
94        }
95    }
96
97    private void writeFromQueue() {
98        ByteBuffer buffer;
99
100        synchronized (queue) {
101            buffer = queue.poll();
102            if (buffer == null) {
103                writing = false;
104            }
105        }
106
107        // No new data in buffer to write
108        if (writing) {
109            writeBuffer(buffer);
110        }
111    }
112
113    private void writeBuffer(ByteBuffer buffer) {
114        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
115            @Override
116            public void completed(Integer result, ByteBuffer buffer) {
117                if (buffer.hasRemaining()) {
118                    channel.write(buffer, buffer, this);
119                } else {
120                    // Go back and check if there is new data to write
121                    writeFromQueue();
122                }
123            }
124
125            @Override
126            public void failed(Throwable exc, ByteBuffer attachment) {
127            }
128        });
129    }
130
131    /**
132     * Sends a message
133     * @param string the message
134     */
135    public void writeStringMessage(String string) {
136        writeMessage(ByteBuffer.wrap(string.getBytes()));
137    }
138
139    /**
140     * Send a message from a specific client
141     * @param client the message is sent from
142     * @param message to send
143     */
144    public void writeMessageFrom(Client client, String message) {
145        if (reader.get().acceptsMessages()) {
146            writeStringMessage(client.getUserName() + ": " + message);
147        }
148    }
149
150    /**
151     * Enqueue a read
152     * @param completionHandler callback on completed read
153     */
154    public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
155        ByteBuffer input = ByteBuffer.allocate(256);
156        if (!channel.isOpen()) {
157            return;
158        }
159        channel.read(input, input, completionHandler);
160    }
161
162    /**
163     * Closes the channel
164     */
165    public void close() {
166        try {
167            channel.close();
168        } catch (IOException e) {
169            e.printStackTrace();
170        }
171    }
172
173    /**
174     * Run the current states actions.
175     */
176    public void run() {
177        reader.get().run(this);
178    }
179
180    public void setUserName(String userName) {
181        this.userName = userName;
182    }
183
184    public void setReader(ClientReader reader) {
185        this.reader.set(reader);
186    }
187
188    public String getUserName() {
189        return userName;
190    }
191
192    public void appendMessage(String message) {
193        synchronized (messageBuffer) {
194            messageBuffer.append(message);
195        }
196    }
197
198    /**
199     * @return the next newline separated message in the buffer. null is returned if the buffer
200     * doesn't contain any newline.
201     */
202    public String nextMessage() {
203        synchronized(messageBuffer) {
204            int nextNewline = messageBuffer.indexOf("\n");
205            if (nextNewline == -1) {
206                return null;
207            }
208            String message = messageBuffer.substring(0, nextNewline + 1);
209            messageBuffer.delete(0, nextNewline + 1);
210            return message;
211        }
212    }
213}
214