159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta/*
259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Copyright (c) 2011 jMonkeyEngine
359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * All rights reserved.
459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * Redistribution and use in source and binary forms, with or without
659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * modification, are permitted provided that the following conditions are
759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * met:
859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Redistributions of source code must retain the above copyright
1059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *   notice, this list of conditions and the following disclaimer.
1159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
1259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Redistributions in binary form must reproduce the above copyright
1359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *   notice, this list of conditions and the following disclaimer in the
1459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *   documentation and/or other materials provided with the distribution.
1559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
1659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
1759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *   may be used to endorse or promote products derived from this software
1859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *   without specific prior written permission.
1959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
2059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
2159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
2259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
2359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
2459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
2559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
2659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
2759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
2859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
2959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
3059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta */
3259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
3359b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartapackage com.jme3.network.base;
3459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
3559b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.ErrorListener;
3659b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.Message;
3759b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.MessageListener;
3859b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.Connector;
3959b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport com.jme3.network.kernel.ConnectorException;
4059b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.nio.ByteBuffer;
4159b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.concurrent.ArrayBlockingQueue;
4259b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.concurrent.BlockingQueue;
4359b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartaimport java.util.concurrent.atomic.AtomicBoolean;
4459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
4559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta/**
4659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  Wraps a single Connector and forwards new messages
4759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  to the supplied message dispatcher.  This is used
4859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  by DefaultClient to manage its connector objects.
4959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  This is only responsible for message reading and provides
5059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  no support for buffering writes.
5159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
5259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  <p>This adapter assumes a simple protocol where two
5359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  bytes define a (short) object size with the object data
5459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  to follow.  Note: this limits the size of serialized
5559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  objects to 32676 bytes... even though, for example,
5659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  datagram packets can hold twice that. :P</p>
5759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *
5859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  @version   $Revision: 8944 $
5959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta *  @author    Paul Speed
6059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta */
6159b2e6871c65f58fdad78cd7229c292f6a177578Scott Bartapublic class ConnectorAdapter extends Thread
6259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta{
6359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private static final int OUTBOUND_BACKLOG = 16000;
6459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
6559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private Connector connector;
6659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private MessageListener<Object> dispatcher;
6759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private ErrorListener<Object> errorHandler;
6859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private AtomicBoolean go = new AtomicBoolean(true);
6959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
7059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private BlockingQueue<ByteBuffer> outbound;
7159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
7259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    // Writes messages out on a background thread
7359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private WriterThread writer;
7459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
7559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    // Marks the messages as reliable or not if they came
7659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    // through this connector.
7759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    private boolean reliable;
7859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
7959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
8059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                             ErrorListener<Object> errorHandler, boolean reliable )
8159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
8259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        super( String.valueOf(connector) );
8359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        this.connector = connector;
8459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        this.dispatcher = dispatcher;
8559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        this.errorHandler = errorHandler;
8659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        this.reliable = reliable;
8759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        setDaemon(true);
8859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
8959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // The backlog makes sure that the outbound channel blocks once
9059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // a certain backlog level is reached.  It is set high so that it
9159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // is only reached in the worst cases... which are usually things like
9259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // raw throughput tests.  Technically, a saturated TCP channel could
9359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // back up quite a bit if the buffers are full and the socket has
9459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // stalled but 16,000 messages is still a big backlog.
9559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
9659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
9759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // Note: this technically adds a potential deadlock case
9859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // with the above code where there wasn't one before.  For example,
9959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // if a TCP outbound queue fills to capacity and a client sends
10059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // in such a way that they block TCP message handling then if the HostedConnection
10159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // on the server is similarly blocked then the TCP network buffers may
10259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // all get full and no outbound messages move and we forever block
10359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // on the queue.
10459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // However, in practice this can't really happen... or at least it's
10559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // the sign of other really bad things.
10659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // First, currently the server-side outbound queues are all unbounded and
10759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // so won't ever block the handling of messages if the outbound channel is full.
10859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // Second, there would have to be a huge amount of data backlog for this
10959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // to ever occur anyway.
11059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // Third, it's a sign of a really poor architecture if 16,000 messages
11159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // can go out in a way that blocks reads.
11259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
11359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        writer = new WriterThread();
11459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        writer.start();
11559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
11659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
11759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    public void close()
11859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
11959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        go.set(false);
12059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
12159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        // Kill the writer service
12259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        writer.shutdown();
12359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
12459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        if( connector.isConnected() )
12559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            {
12659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            // Kill the connector
12759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            connector.close();
12859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            }
12959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
13059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
13159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    protected void dispatch( Message m )
13259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
13359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        dispatcher.messageReceived( null, m );
13459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
13559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
13659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    public void write( ByteBuffer data )
13759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
13859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        try {
13959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            outbound.put( data );
14059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        } catch( InterruptedException e ) {
14159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
14259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
14359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
14459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
14559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    protected void handleError( Exception e )
14659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
14759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        if( !go.get() )
14859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            return;
14959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
15059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        errorHandler.handleError( this, e );
15159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
15259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
15359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    public void run()
15459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
15559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        MessageProtocol protocol = new MessageProtocol();
15659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
15759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        try {
15859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            while( go.get() ) {
15959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                ByteBuffer buffer = connector.read();
16059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                if( buffer == null ) {
16159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    if( go.get() ) {
16259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                        throw new ConnectorException( "Connector closed." );
16359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    } else {
16459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                        // Just dump out because a null buffer is expected
16559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                        // from a closed/closing connector
16659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                        break;
16759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    }
16859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                }
16959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
17059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                protocol.addBuffer( buffer );
17159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
17259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                Message m = null;
17359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                while( (m = protocol.getMessage()) != null ) {
17459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    m.setReliable( reliable );
17559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    dispatch( m );
17659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                }
17759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            }
17859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        } catch( Exception e ) {
17959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            handleError( e );
18059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
18159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
18259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
18359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    protected class WriterThread extends Thread
18459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    {
18559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        public WriterThread()
18659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        {
18759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            super( String.valueOf(connector) + "-writer" );
18859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
18959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
19059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        public void shutdown()
19159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        {
19259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            interrupt();
19359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
19459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
19559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        private void write( ByteBuffer data )
19659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        {
19759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            try {
19859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                connector.write(data);
19959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            } catch( Exception e ) {
20059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                handleError( e );
20159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            }
20259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
20359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta
20459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        public void run()
20559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        {
20659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            while( go.get() ) {
20759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                try {
20859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    ByteBuffer data = outbound.take();
20959b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    write(data);
21059b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                } catch( InterruptedException e ) {
21159b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    if( !go.get() )
21259b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                        return;
21359b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                    throw new RuntimeException( "Interrupted waiting for data", e );
21459b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta                }
21559b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta            }
21659b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta        }
21759b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta    }
21859b2e6871c65f58fdad78cd7229c292f6a177578Scott Barta}
219