1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.base;
34
35import com.jme3.network.ErrorListener;
36import com.jme3.network.Message;
37import com.jme3.network.MessageListener;
38import com.jme3.network.kernel.Connector;
39import com.jme3.network.kernel.ConnectorException;
40import java.nio.ByteBuffer;
41import java.util.concurrent.ArrayBlockingQueue;
42import java.util.concurrent.BlockingQueue;
43import java.util.concurrent.atomic.AtomicBoolean;
44
45/**
46 *  Wraps a single Connector and forwards new messages
47 *  to the supplied message dispatcher.  This is used
48 *  by DefaultClient to manage its connector objects.
49 *  This is only responsible for message reading and provides
50 *  no support for buffering writes.
51 *
52 *  <p>This adapter assumes a simple protocol where two
53 *  bytes define a (short) object size with the object data
54 *  to follow.  Note: this limits the size of serialized
55 *  objects to 32676 bytes... even though, for example,
56 *  datagram packets can hold twice that. :P</p>
57 *
58 *  @version   $Revision: 8944 $
59 *  @author    Paul Speed
60 */
61public class ConnectorAdapter extends Thread
62{
63    private static final int OUTBOUND_BACKLOG = 16000;
64
65    private Connector connector;
66    private MessageListener<Object> dispatcher;
67    private ErrorListener<Object> errorHandler;
68    private AtomicBoolean go = new AtomicBoolean(true);
69
70    private BlockingQueue<ByteBuffer> outbound;
71
72    // Writes messages out on a background thread
73    private WriterThread writer;
74
75    // Marks the messages as reliable or not if they came
76    // through this connector.
77    private boolean reliable;
78
79    public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
80                             ErrorListener<Object> errorHandler, boolean reliable )
81    {
82        super( String.valueOf(connector) );
83        this.connector = connector;
84        this.dispatcher = dispatcher;
85        this.errorHandler = errorHandler;
86        this.reliable = reliable;
87        setDaemon(true);
88
89        // The backlog makes sure that the outbound channel blocks once
90        // a certain backlog level is reached.  It is set high so that it
91        // is only reached in the worst cases... which are usually things like
92        // raw throughput tests.  Technically, a saturated TCP channel could
93        // back up quite a bit if the buffers are full and the socket has
94        // stalled but 16,000 messages is still a big backlog.
95        outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
96
97        // Note: this technically adds a potential deadlock case
98        // with the above code where there wasn't one before.  For example,
99        // if a TCP outbound queue fills to capacity and a client sends
100        // in such a way that they block TCP message handling then if the HostedConnection
101        // on the server is similarly blocked then the TCP network buffers may
102        // all get full and no outbound messages move and we forever block
103        // on the queue.
104        // However, in practice this can't really happen... or at least it's
105        // the sign of other really bad things.
106        // First, currently the server-side outbound queues are all unbounded and
107        // so won't ever block the handling of messages if the outbound channel is full.
108        // Second, there would have to be a huge amount of data backlog for this
109        // to ever occur anyway.
110        // Third, it's a sign of a really poor architecture if 16,000 messages
111        // can go out in a way that blocks reads.
112
113        writer = new WriterThread();
114        writer.start();
115    }
116
117    public void close()
118    {
119        go.set(false);
120
121        // Kill the writer service
122        writer.shutdown();
123
124        if( connector.isConnected() )
125            {
126            // Kill the connector
127            connector.close();
128            }
129    }
130
131    protected void dispatch( Message m )
132    {
133        dispatcher.messageReceived( null, m );
134    }
135
136    public void write( ByteBuffer data )
137    {
138        try {
139            outbound.put( data );
140        } catch( InterruptedException e ) {
141            throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
142        }
143    }
144
145    protected void handleError( Exception e )
146    {
147        if( !go.get() )
148            return;
149
150        errorHandler.handleError( this, e );
151    }
152
153    public void run()
154    {
155        MessageProtocol protocol = new MessageProtocol();
156
157        try {
158            while( go.get() ) {
159                ByteBuffer buffer = connector.read();
160                if( buffer == null ) {
161                    if( go.get() ) {
162                        throw new ConnectorException( "Connector closed." );
163                    } else {
164                        // Just dump out because a null buffer is expected
165                        // from a closed/closing connector
166                        break;
167                    }
168                }
169
170                protocol.addBuffer( buffer );
171
172                Message m = null;
173                while( (m = protocol.getMessage()) != null ) {
174                    m.setReliable( reliable );
175                    dispatch( m );
176                }
177            }
178        } catch( Exception e ) {
179            handleError( e );
180        }
181    }
182
183    protected class WriterThread extends Thread
184    {
185        public WriterThread()
186        {
187            super( String.valueOf(connector) + "-writer" );
188        }
189
190        public void shutdown()
191        {
192            interrupt();
193        }
194
195        private void write( ByteBuffer data )
196        {
197            try {
198                connector.write(data);
199            } catch( Exception e ) {
200                handleError( e );
201            }
202        }
203
204        public void run()
205        {
206            while( go.get() ) {
207                try {
208                    ByteBuffer data = outbound.take();
209                    write(data);
210                } catch( InterruptedException e ) {
211                    if( !go.get() )
212                        return;
213                    throw new RuntimeException( "Interrupted waiting for data", e );
214                }
215            }
216        }
217    }
218}
219