159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta/* 259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Copyright (c) 2011 jMonkeyEngine 359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * All rights reserved. 459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Redistribution and use in source and binary forms, with or without 659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * modification, are permitted provided that the following conditions are 759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * met: 859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Redistributions of source code must retain the above copyright 1059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * notice, this list of conditions and the following disclaimer. 1159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 1259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Redistributions in binary form must reproduce the above copyright 1359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * notice, this list of conditions and the following disclaimer in the 1459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * documentation and/or other materials provided with the distribution. 1559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 1659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 1759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * may be used to endorse or promote products derived from this software 1859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * without specific prior written permission. 1959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 2059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 2259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 2359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 2459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 2559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 2659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 2759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 2859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 2959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 3059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta */ 3259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 3359b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartapackage com.jme3.network.base; 3459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 3559b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.Filter; 3659b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.HostedConnection; 3759b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.Message; 3859b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.MessageListener; 3959b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.Endpoint; 4059b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.EndpointEvent; 4159b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.Envelope; 4259b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.Kernel; 4359b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.message.ClientRegistrationMessage; 4459b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.nio.ByteBuffer; 4559b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.Map; 4659b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.concurrent.ConcurrentHashMap; 4759b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.concurrent.atomic.AtomicBoolean; 4859b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.logging.Level; 4959b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.logging.Logger; 5059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 5159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta/** 5259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Wraps a single Kernel and forwards new messages 5359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * to the supplied message dispatcher and new endpoint 5459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * events to the connection dispatcher. This is used 5559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * by DefaultServer to manage its kernel objects. 5659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 5759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * <p>This adapter assumes a simple protocol where two 5859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * bytes define a (short) object size with the object data 5959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * to follow. Note: this limits the size of serialized 6059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * objects to 32676 bytes... even though, for example, 6159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * datagram packets can hold twice that. :P</p> 6259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 6359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * @version $Revision: 8944 $ 6459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * @author Paul Speed 6559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta */ 6659b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartapublic class KernelAdapter extends Thread 6759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta{ 6859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta static Logger log = Logger.getLogger(KernelAdapter.class.getName()); 6959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 7059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private DefaultServer server; // this is unfortunate 7159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private Kernel kernel; 7259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private MessageListener<HostedConnection> messageDispatcher; 7359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private AtomicBoolean go = new AtomicBoolean(true); 7459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 7559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Keeps track of the in-progress messages that are received 7659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // on reliable connections 7759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>(); 7859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 7959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Marks the messages as reliable or not if they came 8059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // through this connector. 8159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta private boolean reliable; 8259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 8359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher, 8459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta boolean reliable ) 8559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 8659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta super( String.valueOf(kernel) ); 8759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta this.server = server; 8859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta this.kernel = kernel; 8959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta this.messageDispatcher = messageDispatcher; 9059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta this.reliable = reliable; 9159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta setDaemon(true); 9259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 9359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 9459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public Kernel getKernel() 9559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 9659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return kernel; 9759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 9859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 9959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public void initialize() 10059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 10159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta kernel.initialize(); 10259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 10359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 10459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, 10559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta boolean copy ) 10659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 10759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta kernel.broadcast( filter, data, reliable, copy ); 10859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 10959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 11059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public void close() throws InterruptedException 11159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 11259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta go.set(false); 11359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 11459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Kill the kernel 11559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta kernel.terminate(); 11659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 11759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 11859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void reportError( Endpoint p, Object context, Exception e ) 11959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 12059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Should really be queued up so the outer thread can 12159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // retrieve them. For now we'll just log it. FIXME 12259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e ); 12359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 12459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // In lieu of other options, at least close the endpoint 12559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta p.close(); 12659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 12759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 12859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected HostedConnection getConnection( Endpoint p ) 12959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 13059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return server.getConnection(p); 13159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 13259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 13359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void connectionClosed( Endpoint p ) 13459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 13559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Remove any message buffer we've been accumulating 13659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // on behalf of this endpoing 13759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta messageBuffers.remove(p); 13859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 13959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() ); 14059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 14159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta server.connectionClosed(p); 14259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 14359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 14459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta /** 14559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Note on threading for those writing their own server 14659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * or adapter implementations. The rule that a single connection be 14759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * processed by only one thread at a time is more about ensuring that 14859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * the messages are delivered in the order that they are received 14959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * than for any user-code safety. 99% of the time the user code should 15059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * be writing for multithreaded access anyway. 15159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 15259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * <p>The issue with the messages is that if a an implementation is 15359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * using a general thread pool then it would be possible for a 15459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * naive implementation to have one thread grab an Envelope from 15559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * connection 1's and another grab the next Envelope. Since an Envelope 15659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * may contain several messages, delivering the second thread's messages 15759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * before or during the first's would be really confusing and hard 15859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * to code for in user code.</p> 15959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * 16059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * <p>And that's why this note is here. DefaultServer does a rudimentary 16159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * per-connection locking but it couldn't possibly guard against 16259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * out of order Envelope processing.</p> 16359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta */ 16459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void dispatch( Endpoint p, Message m ) 16559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 16659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Because this class is the only one with the information 16759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // to do it... we need to pull of the registration message 16859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // here. 16959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( m instanceof ClientRegistrationMessage ) { 17059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta server.registerClient( this, p, (ClientRegistrationMessage)m ); 17159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return; 17259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 17359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 17459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta try { 17559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta HostedConnection source = getConnection(p); 17659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( source == null ) { 17759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( reliable ) { 17859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // If it's a reliable connection then it's slightly more 17959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // concerning but this can happen all the time for a UDP endpoint. 18059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + " message:" + m ); 18159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 18259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return; 18359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 18459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta messageDispatcher.messageReceived( source, m ); 18559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } catch( Exception e ) { 18659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta reportError(p, m, e); 18759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 18859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 18959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 19059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected MessageProtocol getMessageBuffer( Endpoint p ) 19159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 19259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( !reliable ) { 19359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Since UDP comes in packets and they aren't split 19459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // up, there is no reason to buffer. In fact, there would 19559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // be a down side because there is no way for us to reliably 19659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // clean these up later since we'd create another one for 19759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // any random UDP packet that comes to the port. 19859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return new MessageProtocol(); 19959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } else { 20059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // See if we already have one 20159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta MessageProtocol result = messageBuffers.get(p); 20259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( result == null ) { 20359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta result = new MessageProtocol(); 20459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta messageBuffers.put(p, result); 20559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 20659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return result; 20759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 20859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 20959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 21059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void createAndDispatch( Envelope env ) 21159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 21259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta MessageProtocol protocol = getMessageBuffer(env.getSource()); 21359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 21459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta byte[] data = env.getData(); 21559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta ByteBuffer buffer = ByteBuffer.wrap(data); 21659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 21759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta int count = protocol.addBuffer( buffer ); 21859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( count == 0 ) { 21959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // This can happen if there was only a partial message 22059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // received. However, this should never happen for unreliable 22159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // connections. 22259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( !reliable ) { 22359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Log some additional information about the packet. 22459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta int len = Math.min( 10, data.length ); 22559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta StringBuilder sb = new StringBuilder(); 22659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta for( int i = 0; i < len; i++ ) { 22759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta sb.append( "[" + Integer.toHexString(data[i]) + "]" ); 22859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 22959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb ); 23059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta throw new RuntimeException( "Envelope contained incomplete data:" + env ); 23159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 23259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 23359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 23459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Should be complete... and maybe we should check but we don't 23559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta Message m = null; 23659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta while( (m = protocol.getMessage()) != null ) { 23759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta m.setReliable(reliable); 23859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta dispatch( env.getSource(), m ); 23959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 24059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 24159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 24259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void createAndDispatch( EndpointEvent event ) 24359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 24459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Only need to tell the server about disconnects 24559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( event.getType() == EndpointEvent.Type.REMOVE ) { 24659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta connectionClosed( event.getEndpoint() ); 24759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 24859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 24959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 25059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta protected void flushEvents() 25159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 25259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta EndpointEvent event; 25359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta while( (event = kernel.nextEvent()) != null ) { 25459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta try { 25559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta createAndDispatch( event ); 25659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } catch( Exception e ) { 25759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta reportError(event.getEndpoint(), event, e); 25859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 25959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 26059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 26159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 26259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta public void run() 26359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta { 26459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta while( go.get() ) { 26559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 26659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta try { 26759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Check for pending events 26859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta flushEvents(); 26959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 27059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Grab the next envelope 27159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta Envelope e = kernel.read(); 27259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( e == Kernel.EVENTS_PENDING ) 27359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta continue; // We'll catch it up above 27459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 27559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // Check for pending events that might have 27659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // come in while we were blocking. This is usually 27759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta // when the connection add events come through 27859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta flushEvents(); 27959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 28059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta try { 28159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta createAndDispatch( e ); 28259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } catch( Exception ex ) { 28359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta reportError(e.getSource(), e, ex); 28459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 28559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 28659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } catch( InterruptedException ex ) { 28759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta if( !go.get() ) 28859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta return; 28959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta throw new RuntimeException( "Unexpected interruption", ex ); 29059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 29159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 29259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta } 29359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 29459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta} 29559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 29659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta 297