/* * Copyright (c) 2011 jMonkeyEngine * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * * Neither the name of 'jMonkeyEngine' nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ package com.jme3.network.base; import com.jme3.network.Filter; import com.jme3.network.HostedConnection; import com.jme3.network.Message; import com.jme3.network.MessageListener; import com.jme3.network.kernel.Endpoint; import com.jme3.network.kernel.EndpointEvent; import com.jme3.network.kernel.Envelope; import com.jme3.network.kernel.Kernel; import com.jme3.network.message.ClientRegistrationMessage; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; /** * Wraps a single Kernel and forwards new messages * to the supplied message dispatcher and new endpoint * events to the connection dispatcher. This is used * by DefaultServer to manage its kernel objects. * *
This adapter assumes a simple protocol where two * bytes define a (short) object size with the object data * to follow. Note: this limits the size of serialized * objects to 32676 bytes... even though, for example, * datagram packets can hold twice that. :P
* * @version $Revision: 8944 $ * @author Paul Speed */ public class KernelAdapter extends Thread { static Logger log = Logger.getLogger(KernelAdapter.class.getName()); private DefaultServer server; // this is unfortunate private Kernel kernel; private MessageListenerThe issue with the messages is that if a an implementation is * using a general thread pool then it would be possible for a * naive implementation to have one thread grab an Envelope from * connection 1's and another grab the next Envelope. Since an Envelope * may contain several messages, delivering the second thread's messages * before or during the first's would be really confusing and hard * to code for in user code.
* *And that's why this note is here. DefaultServer does a rudimentary * per-connection locking but it couldn't possibly guard against * out of order Envelope processing.
*/ protected void dispatch( Endpoint p, Message m ) { // Because this class is the only one with the information // to do it... we need to pull of the registration message // here. if( m instanceof ClientRegistrationMessage ) { server.registerClient( this, p, (ClientRegistrationMessage)m ); return; } try { HostedConnection source = getConnection(p); if( source == null ) { if( reliable ) { // If it's a reliable connection then it's slightly more // concerning but this can happen all the time for a UDP endpoint. log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + " message:" + m ); } return; } messageDispatcher.messageReceived( source, m ); } catch( Exception e ) { reportError(p, m, e); } } protected MessageProtocol getMessageBuffer( Endpoint p ) { if( !reliable ) { // Since UDP comes in packets and they aren't split // up, there is no reason to buffer. In fact, there would // be a down side because there is no way for us to reliably // clean these up later since we'd create another one for // any random UDP packet that comes to the port. return new MessageProtocol(); } else { // See if we already have one MessageProtocol result = messageBuffers.get(p); if( result == null ) { result = new MessageProtocol(); messageBuffers.put(p, result); } return result; } } protected void createAndDispatch( Envelope env ) { MessageProtocol protocol = getMessageBuffer(env.getSource()); byte[] data = env.getData(); ByteBuffer buffer = ByteBuffer.wrap(data); int count = protocol.addBuffer( buffer ); if( count == 0 ) { // This can happen if there was only a partial message // received. However, this should never happen for unreliable // connections. if( !reliable ) { // Log some additional information about the packet. int len = Math.min( 10, data.length ); StringBuilder sb = new StringBuilder(); for( int i = 0; i < len; i++ ) { sb.append( "[" + Integer.toHexString(data[i]) + "]" ); } log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb ); throw new RuntimeException( "Envelope contained incomplete data:" + env ); } } // Should be complete... and maybe we should check but we don't Message m = null; while( (m = protocol.getMessage()) != null ) { m.setReliable(reliable); dispatch( env.getSource(), m ); } } protected void createAndDispatch( EndpointEvent event ) { // Only need to tell the server about disconnects if( event.getType() == EndpointEvent.Type.REMOVE ) { connectionClosed( event.getEndpoint() ); } } protected void flushEvents() { EndpointEvent event; while( (event = kernel.nextEvent()) != null ) { try { createAndDispatch( event ); } catch( Exception e ) { reportError(event.getEndpoint(), event, e); } } } public void run() { while( go.get() ) { try { // Check for pending events flushEvents(); // Grab the next envelope Envelope e = kernel.read(); if( e == Kernel.EVENTS_PENDING ) continue; // We'll catch it up above // Check for pending events that might have // come in while we were blocking. This is usually // when the connection add events come through flushEvents(); try { createAndDispatch( e ); } catch( Exception ex ) { reportError(e.getSource(), e, ex); } } catch( InterruptedException ex ) { if( !go.get() ) return; throw new RuntimeException( "Unexpected interruption", ex ); } } } }