1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.base;
34
35import com.jme3.network.Filter;
36import com.jme3.network.HostedConnection;
37import com.jme3.network.Message;
38import com.jme3.network.MessageListener;
39import com.jme3.network.kernel.Endpoint;
40import com.jme3.network.kernel.EndpointEvent;
41import com.jme3.network.kernel.Envelope;
42import com.jme3.network.kernel.Kernel;
43import com.jme3.network.message.ClientRegistrationMessage;
44import java.nio.ByteBuffer;
45import java.util.Map;
46import java.util.concurrent.ConcurrentHashMap;
47import java.util.concurrent.atomic.AtomicBoolean;
48import java.util.logging.Level;
49import java.util.logging.Logger;
50
51/**
52 *  Wraps a single Kernel and forwards new messages
53 *  to the supplied message dispatcher and new endpoint
54 *  events to the connection dispatcher.  This is used
55 *  by DefaultServer to manage its kernel objects.
56 *
57 *  <p>This adapter assumes a simple protocol where two
58 *  bytes define a (short) object size with the object data
59 *  to follow.  Note: this limits the size of serialized
60 *  objects to 32676 bytes... even though, for example,
61 *  datagram packets can hold twice that. :P</p>
62 *
63 *  @version   $Revision: 8944 $
64 *  @author    Paul Speed
65 */
66public class KernelAdapter extends Thread
67{
68    static Logger log = Logger.getLogger(KernelAdapter.class.getName());
69
70    private DefaultServer server; // this is unfortunate
71    private Kernel kernel;
72    private MessageListener<HostedConnection> messageDispatcher;
73    private AtomicBoolean go = new AtomicBoolean(true);
74
75    // Keeps track of the in-progress messages that are received
76    // on reliable connections
77    private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
78
79    // Marks the messages as reliable or not if they came
80    // through this connector.
81    private boolean reliable;
82
83    public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
84                          boolean reliable )
85    {
86        super( String.valueOf(kernel) );
87        this.server = server;
88        this.kernel = kernel;
89        this.messageDispatcher = messageDispatcher;
90        this.reliable = reliable;
91        setDaemon(true);
92    }
93
94    public Kernel getKernel()
95    {
96        return kernel;
97    }
98
99    public void initialize()
100    {
101        kernel.initialize();
102    }
103
104    public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
105                           boolean copy )
106    {
107        kernel.broadcast( filter, data, reliable, copy );
108    }
109
110    public void close() throws InterruptedException
111    {
112        go.set(false);
113
114        // Kill the kernel
115        kernel.terminate();
116    }
117
118    protected void reportError( Endpoint p, Object context, Exception e )
119    {
120        // Should really be queued up so the outer thread can
121        // retrieve them.  For now we'll just log it.  FIXME
122        log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
123
124        // In lieu of other options, at least close the endpoint
125        p.close();
126    }
127
128    protected HostedConnection getConnection( Endpoint p )
129    {
130        return server.getConnection(p);
131    }
132
133    protected void connectionClosed( Endpoint p )
134    {
135        // Remove any message buffer we've been accumulating
136        // on behalf of this endpoing
137        messageBuffers.remove(p);
138
139        log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() );
140
141        server.connectionClosed(p);
142    }
143
144    /**
145     *  Note on threading for those writing their own server
146     *  or adapter implementations.  The rule that a single connection be
147     *  processed by only one thread at a time is more about ensuring that
148     *  the messages are delivered in the order that they are received
149     *  than for any user-code safety.  99% of the time the user code should
150     *  be writing for multithreaded access anyway.
151     *
152     *  <p>The issue with the messages is that if a an implementation is
153     *  using a general thread pool then it would be possible for a
154     *  naive implementation to have one thread grab an Envelope from
155     *  connection 1's and another grab the next Envelope.  Since an Envelope
156     *  may contain several messages, delivering the second thread's messages
157     *  before or during the first's would be really confusing and hard
158     *  to code for in user code.</p>
159     *
160     *  <p>And that's why this note is here.  DefaultServer does a rudimentary
161     *  per-connection locking but it couldn't possibly guard against
162     *  out of order Envelope processing.</p>
163     */
164    protected void dispatch( Endpoint p, Message m )
165    {
166        // Because this class is the only one with the information
167        // to do it... we need to pull of the registration message
168        // here.
169        if( m instanceof ClientRegistrationMessage ) {
170            server.registerClient( this, p, (ClientRegistrationMessage)m );
171            return;
172        }
173
174        try {
175            HostedConnection source = getConnection(p);
176            if( source == null ) {
177                if( reliable ) {
178                    // If it's a reliable connection then it's slightly more
179                    // concerning but this can happen all the time for a UDP endpoint.
180                    log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + "  message:" + m );
181                }
182                return;
183            }
184            messageDispatcher.messageReceived( source, m );
185        } catch( Exception e ) {
186            reportError(p, m, e);
187        }
188    }
189
190    protected MessageProtocol getMessageBuffer( Endpoint p )
191    {
192        if( !reliable ) {
193            // Since UDP comes in packets and they aren't split
194            // up, there is no reason to buffer.  In fact, there would
195            // be a down side because there is no way for us to reliably
196            // clean these up later since we'd create another one for
197            // any random UDP packet that comes to the port.
198            return new MessageProtocol();
199        } else {
200            // See if we already have one
201            MessageProtocol result = messageBuffers.get(p);
202            if( result == null ) {
203                result = new MessageProtocol();
204                messageBuffers.put(p, result);
205            }
206            return result;
207        }
208    }
209
210    protected void createAndDispatch( Envelope env )
211    {
212        MessageProtocol protocol = getMessageBuffer(env.getSource());
213
214        byte[] data = env.getData();
215        ByteBuffer buffer = ByteBuffer.wrap(data);
216
217        int count = protocol.addBuffer( buffer );
218        if( count == 0 ) {
219            // This can happen if there was only a partial message
220            // received.  However, this should never happen for unreliable
221            // connections.
222            if( !reliable ) {
223                // Log some additional information about the packet.
224                int len = Math.min( 10, data.length );
225                StringBuilder sb = new StringBuilder();
226                for( int i = 0; i < len; i++ ) {
227                    sb.append( "[" + Integer.toHexString(data[i]) + "]" );
228                }
229                log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );
230                throw new RuntimeException( "Envelope contained incomplete data:" + env );
231            }
232        }
233
234        // Should be complete... and maybe we should check but we don't
235        Message m = null;
236        while( (m = protocol.getMessage()) != null ) {
237            m.setReliable(reliable);
238            dispatch( env.getSource(), m );
239        }
240    }
241
242    protected void createAndDispatch( EndpointEvent event )
243    {
244        // Only need to tell the server about disconnects
245        if( event.getType() == EndpointEvent.Type.REMOVE ) {
246            connectionClosed( event.getEndpoint() );
247        }
248    }
249
250    protected void flushEvents()
251    {
252        EndpointEvent event;
253        while( (event = kernel.nextEvent()) != null ) {
254            try {
255                createAndDispatch( event );
256            } catch( Exception e ) {
257                reportError(event.getEndpoint(), event, e);
258            }
259        }
260    }
261
262    public void run()
263    {
264        while( go.get() ) {
265
266            try {
267                // Check for pending events
268                flushEvents();
269
270                // Grab the next envelope
271                Envelope e = kernel.read();
272                if( e == Kernel.EVENTS_PENDING )
273                    continue; // We'll catch it up above
274
275                // Check for pending events that might have
276                // come in while we were blocking.  This is usually
277                // when the connection add events come through
278                flushEvents();
279
280                try {
281                    createAndDispatch( e );
282                } catch( Exception ex ) {
283                    reportError(e.getSource(), e, ex);
284                }
285
286            } catch( InterruptedException ex ) {
287                if( !go.get() )
288                    return;
289                throw new RuntimeException( "Unexpected interruption", ex );
290            }
291        }
292    }
293
294}
295
296
297