1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.kernel.tcp;
34
35import com.jme3.network.kernel.Endpoint;
36import com.jme3.network.kernel.Kernel;
37import com.jme3.network.kernel.KernelException;
38import java.io.IOException;
39import java.nio.ByteBuffer;
40import java.nio.channels.SocketChannel;
41import java.util.concurrent.ConcurrentLinkedQueue;
42
43
44/**
45 *  Endpoint implementation that encapsulates the
46 *  channel IO based connection information and keeps
47 *  track of the outbound data queue for the channel.
48 *
49 *  @version   $Revision: 8944 $
50 *  @author    Paul Speed
51 */
52public class NioEndpoint implements Endpoint
53{
54    protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0);
55
56    private long id;
57    private SocketChannel socket;
58    private SelectorKernel kernel;
59    private ConcurrentLinkedQueue<ByteBuffer> outbound = new ConcurrentLinkedQueue<ByteBuffer>();
60    private boolean closing = false;
61
62    public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket )
63    {
64        this.id = id;
65        this.socket = socket;
66        this.kernel = kernel;
67    }
68
69    public Kernel getKernel()
70    {
71        return kernel;
72    }
73
74    public void close()
75    {
76        close(false);
77    }
78
79    public void close( boolean flushData )
80    {
81        if( flushData ) {
82            closing = true;
83
84            // Enqueue a close marker message to let the server
85            // know we should close
86            send( CLOSE_MARKER, false, true );
87
88            return;
89        }
90
91        try {
92            // Note: even though we may be disconnected from the socket.isConnected()
93            // standpoint, it's still safest to tell the kernel so that it can be sure
94            // to stop managing us gracefully.
95            kernel.closeEndpoint(this);
96        } catch( IOException e ) {
97            throw new KernelException( "Error closing endpoint for socket:" + socket, e );
98        }
99    }
100
101    public long getId()
102    {
103        return id;
104    }
105
106    public String getAddress()
107    {
108        return String.valueOf(socket.socket().getRemoteSocketAddress());
109    }
110
111    public boolean isConnected()
112    {
113        return socket.isConnected();
114    }
115
116    /**
117     *  The wakeup option is used internally when the kernel is
118     *  broadcasting out to a bunch of endpoints and doesn't want to
119     *  necessarily wakeup right away.
120     */
121    protected void send( ByteBuffer data, boolean copy, boolean wakeup )
122    {
123        // We create a ByteBuffer per endpoint since we
124        // use it to track the data sent to each endpoint
125        // separately.
126        ByteBuffer buffer;
127        if( !copy ) {
128            buffer = data;
129        } else {
130            // Copy the buffer
131            buffer = ByteBuffer.allocate(data.remaining());
132            buffer.put(data);
133            buffer.flip();
134        }
135
136        // Queue it up
137        outbound.add(buffer);
138
139        if( wakeup )
140            kernel.wakeupSelector();
141    }
142
143    /**
144     *  Called by the SelectorKernel to get the current top
145     *  buffer for writing.
146     */
147    protected ByteBuffer peekPending()
148    {
149        return outbound.peek();
150    }
151
152    /**
153     *  Called by the SelectorKernel when the top buffer
154     *  has been exhausted.
155     */
156    protected ByteBuffer removePending()
157    {
158        return outbound.poll();
159    }
160
161    protected boolean hasPending()
162    {
163        return !outbound.isEmpty();
164    }
165
166    public void send( ByteBuffer data )
167    {
168        if( data == null ) {
169            throw new IllegalArgumentException( "Data cannot be null." );
170        }
171        if( closing ) {
172            throw new KernelException( "Endpoint has been closed:" + socket );
173        }
174        send( data, true, true );
175    }
176
177    public String toString()
178    {
179        return "NioEndpoint[" + id + ", " + socket + "]";
180    }
181}
182