1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.base;
34
35import com.jme3.network.*;
36import com.jme3.network.kernel.Endpoint;
37import com.jme3.network.kernel.Kernel;
38import com.jme3.network.message.ChannelInfoMessage;
39import com.jme3.network.message.ClientRegistrationMessage;
40import com.jme3.network.message.DisconnectMessage;
41import java.io.IOException;
42import java.nio.ByteBuffer;
43import java.util.*;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.CopyOnWriteArrayList;
46import java.util.concurrent.atomic.AtomicInteger;
47import java.util.logging.Level;
48import java.util.logging.Logger;
49
50/**
51 *  A default implementation of the Server interface that delegates
52 *  its network connectivity to kernel.Kernel.
53 *
54 *  @version   $Revision: 9114 $
55 *  @author    Paul Speed
56 */
57public class DefaultServer implements Server
58{
59    static Logger log = Logger.getLogger(DefaultServer.class.getName());
60
61    // First two channels are reserved for reliable and
62    // unreliable
63    private static final int CH_RELIABLE = 0;
64    private static final int CH_UNRELIABLE = 1;
65    private static final int CH_FIRST = 2;
66
67    private boolean isRunning = false;
68    private AtomicInteger nextId = new AtomicInteger(0);
69    private String gameName;
70    private int version;
71    private KernelFactory kernelFactory = KernelFactory.DEFAULT;
72    private KernelAdapter reliableAdapter;
73    private KernelAdapter fastAdapter;
74    private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
75    private List<Integer> alternatePorts = new ArrayList<Integer>();
76    private Redispatch dispatcher = new Redispatch();
77    private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
78    private Map<Endpoint,HostedConnection> endpointConnections
79                            = new ConcurrentHashMap<Endpoint,HostedConnection>();
80
81    // Keeps track of clients for whom we've only received the UDP
82    // registration message
83    private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
84
85    private MessageListenerRegistry<HostedConnection> messageListeners
86                            = new MessageListenerRegistry<HostedConnection>();
87    private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
88
89    public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
90    {
91        if( reliable == null )
92            throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
93
94        this.gameName = gameName;
95        this.version = version;
96
97        reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
98        channels.add( reliableAdapter );
99        if( fast != null ) {
100            fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
101            channels.add( fastAdapter );
102        }
103    }
104
105    public String getGameName()
106    {
107        return gameName;
108    }
109
110    public int getVersion()
111    {
112        return version;
113    }
114
115    public int addChannel( int port )
116    {
117        if( isRunning )
118            throw new IllegalStateException( "Channels cannot be added once server is started." );
119
120        // Note: it does bug me that channels aren't 100% universal and
121        // setup externally but it requires a more invasive set of changes
122        // for "connection types" and some kind of registry of kernel and
123        // connector factories.  This really would be the best approach and
124        // would allow all kinds of channel customization maybe... but for
125        // now, we hard-code the standard connections and treat the +2 extras
126        // differently.
127
128        // Check for consistency with the channels list
129        if( channels.size() - CH_FIRST != alternatePorts.size() )
130            throw new IllegalStateException( "Channel and port lists do not match." );
131
132        try {
133            int result = alternatePorts.size();
134            alternatePorts.add(port);
135
136            Kernel kernel = kernelFactory.createKernel(result, port);
137            channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
138
139            return result;
140        } catch( IOException e ) {
141            throw new RuntimeException( "Error adding channel for port:" + port, e );
142        }
143    }
144
145    protected void checkChannel( int channel )
146    {
147        if( channel < 0 || channel >= alternatePorts.size() )
148            throw new IllegalArgumentException( "Channel is undefined:" + channel );
149    }
150
151    public void start()
152    {
153        if( isRunning )
154            throw new IllegalStateException( "Server is already started." );
155
156        // Initialize the kernels
157        for( KernelAdapter ka : channels ) {
158            ka.initialize();
159        }
160
161        // Start em up
162        for( KernelAdapter ka : channels ) {
163            ka.start();
164        }
165
166        isRunning = true;
167    }
168
169    public boolean isRunning()
170    {
171        return isRunning;
172    }
173
174    public void close()
175    {
176        if( !isRunning )
177            throw new IllegalStateException( "Server is not started." );
178
179        try {
180            // Kill the adpaters, they will kill the kernels
181            for( KernelAdapter ka : channels ) {
182                ka.close();
183            }
184
185            isRunning = false;
186        } catch( InterruptedException e ) {
187            throw new RuntimeException( "Interrupted while closing", e );
188        }
189    }
190
191    public void broadcast( Message message )
192    {
193        broadcast( null, message );
194    }
195
196    public void broadcast( Filter<? super HostedConnection> filter, Message message )
197    {
198        if( connections.isEmpty() )
199            return;
200
201        ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
202
203        FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
204
205        if( message.isReliable() || fastAdapter == null ) {
206            // Don't need to copy the data because message protocol is already
207            // giving us a fresh buffer
208            reliableAdapter.broadcast( adapter, buffer, true, false );
209        } else {
210            fastAdapter.broadcast( adapter, buffer, false, false );
211        }
212    }
213
214    public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
215    {
216        if( connections.isEmpty() )
217            return;
218
219        checkChannel(channel);
220
221        ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
222
223        FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
224
225        channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
226    }
227
228    public HostedConnection getConnection( int id )
229    {
230        return connections.get(id);
231    }
232
233    public boolean hasConnections()
234    {
235        return !connections.isEmpty();
236    }
237
238    public Collection<HostedConnection> getConnections()
239    {
240        return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
241    }
242
243    public void addConnectionListener( ConnectionListener listener )
244    {
245        connectionListeners.add(listener);
246    }
247
248    public void removeConnectionListener( ConnectionListener listener )
249    {
250        connectionListeners.remove(listener);
251    }
252
253    public void addMessageListener( MessageListener<? super HostedConnection> listener )
254    {
255        messageListeners.addMessageListener( listener );
256    }
257
258    public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
259    {
260        messageListeners.addMessageListener( listener, classes );
261    }
262
263    public void removeMessageListener( MessageListener<? super HostedConnection> listener )
264    {
265        messageListeners.removeMessageListener( listener );
266    }
267
268    public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
269    {
270        messageListeners.removeMessageListener( listener, classes );
271    }
272
273    protected void dispatch( HostedConnection source, Message m )
274    {
275        if( source == null ) {
276            messageListeners.messageReceived( source, m );
277        } else {
278
279            // A semi-heavy handed way to make sure the listener
280            // doesn't get called at the same time from two different
281            // threads for the same hosted connection.
282            synchronized( source ) {
283                messageListeners.messageReceived( source, m );
284            }
285        }
286    }
287
288    protected void fireConnectionAdded( HostedConnection conn )
289    {
290        for( ConnectionListener l : connectionListeners ) {
291            l.connectionAdded( this, conn );
292        }
293    }
294
295    protected void fireConnectionRemoved( HostedConnection conn )
296    {
297        for( ConnectionListener l : connectionListeners ) {
298            l.connectionRemoved( this, conn );
299        }
300    }
301
302    protected int getChannel( KernelAdapter ka )
303    {
304        return channels.indexOf(ka);
305    }
306
307    protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
308    {
309        Connection addedConnection = null;
310
311        // generally this will only be called by one thread but it's
312        // important enough I won't take chances
313        synchronized( this ) {
314            // Grab the random ID that the client created when creating
315            // its two registration messages
316            long tempId = m.getId();
317
318            // See if we already have one
319            Connection c = connecting.remove(tempId);
320            if( c == null ) {
321                c = new Connection(channels.size());
322                log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
323            } else {
324                log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
325            }
326
327            // Fill in what we now know
328            int channel = getChannel(ka);
329            c.setChannel(channel, p);
330            log.log( Level.FINE, "Setting up channel:{0}", channel );
331
332            // If it's channel 0 then this is the initial connection
333            // and we will send the connection information
334            if( channel == CH_RELIABLE ) {
335                // Validate the name and version which is only sent
336                // over the reliable connection at this point.
337                if( !getGameName().equals(m.getGameName())
338                    || getVersion() != m.getVersion() ) {
339
340                    log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
341
342                    // Need to kick them off... I may regret doing this from within
343                    // the sync block but the alternative is more code
344                    c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
345                             + "  client:" + m.getGameName() + " v" + m.getVersion() );
346                    return;
347                }
348
349                // Else send the extra channel information to the client
350                if( !alternatePorts.isEmpty() ) {
351                    ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
352                    c.send(cim);
353                }
354            }
355
356            if( c.isComplete() ) {
357                // Then we are fully connected
358                if( connections.put( c.getId(), c ) == null ) {
359
360                    for( Endpoint cp : c.channels ) {
361                        if( cp == null )
362                            continue;
363                        endpointConnections.put( cp, c );
364                    }
365
366                    addedConnection = c;
367                }
368            } else {
369                // Need to keep getting channels so we'll keep it in
370                // the map
371                connecting.put(tempId, c);
372            }
373        }
374
375        // Best to do this outside of the synch block to avoid
376        // over synchronizing which is the path to deadlocks
377        if( addedConnection != null ) {
378            log.log( Level.INFO, "Client registered:{0}.", addedConnection );
379
380            // Send the ID back to the client letting it know it's
381            // fully connected.
382            m = new ClientRegistrationMessage();
383            m.setId( addedConnection.getId() );
384            m.setReliable(true);
385            addedConnection.send(m);
386
387            // Now we can notify the listeners about the
388            // new connection.
389            fireConnectionAdded( addedConnection );
390        }
391    }
392
393    protected HostedConnection getConnection( Endpoint endpoint )
394    {
395        return endpointConnections.get(endpoint);
396    }
397
398    protected void connectionClosed( Endpoint p )
399    {
400        if( p.isConnected() ) {
401            log.log( Level.INFO, "Connection closed:{0}.", p );
402        } else {
403            log.log( Level.FINE, "Connection closed:{0}.", p );
404        }
405
406        // Try to find the endpoint in all ways that it might
407        // exist.  Note: by this point the raw network channel is
408        // closed already.
409
410        // Also note: this method will be called multiple times per
411        // HostedConnection if it has multiple endpoints.
412
413        Connection removed = null;
414        synchronized( this ) {
415            // Just in case the endpoint was still connecting
416            connecting.values().remove(p);
417
418            // And the regular management
419            removed = (Connection)endpointConnections.remove(p);
420            if( removed != null ) {
421                connections.remove( removed.getId() );
422            }
423
424            log.log( Level.FINE, "Connections size:{0}", connections.size() );
425            log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
426        }
427
428        // Better not to fire events while we hold a lock
429        // so always do this outside the synch block.
430        // Note: checking removed.closed just to avoid spurious log messages
431        //       since in general we are called back for every endpoint closing.
432        if( removed != null && !removed.closed ) {
433
434            log.log( Level.INFO, "Client closed:{0}.", removed );
435
436            removed.closeConnection();
437        }
438    }
439
440    protected class Connection implements HostedConnection
441    {
442        private int id;
443        private boolean closed;
444        private Endpoint[] channels;
445        private int setChannelCount = 0;
446
447        private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
448
449        public Connection( int channelCount )
450        {
451            id = nextId.getAndIncrement();
452            channels = new Endpoint[channelCount];
453        }
454
455        void setChannel( int channel, Endpoint p )
456        {
457            if( channels[channel] != null && channels[channel] != p ) {
458                throw new RuntimeException( "Channel has already been set:" + channel
459                                            + " = " + channels[channel] + ", cannot be set to:" + p );
460            }
461            channels[channel] = p;
462            if( p != null )
463                setChannelCount++;
464        }
465
466        boolean isComplete()
467        {
468            return setChannelCount == channels.length;
469        }
470
471        public Server getServer()
472        {
473            return DefaultServer.this;
474        }
475
476        public int getId()
477        {
478            return id;
479        }
480
481        public String getAddress()
482        {
483            return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
484        }
485
486        public void send( Message message )
487        {
488            ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
489            if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
490                channels[CH_RELIABLE].send( buffer );
491            } else {
492                channels[CH_UNRELIABLE].send( buffer );
493            }
494        }
495
496        public void send( int channel, Message message )
497        {
498            checkChannel(channel);
499            ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
500            channels[channel+CH_FIRST].send(buffer);
501        }
502
503        protected void closeConnection()
504        {
505            if( closed )
506                return;
507            closed = true;
508
509            // Make sure all endpoints are closed.  Note: reliable
510            // should always already be closed through all paths that I
511            // can conceive... but it doesn't hurt to be sure.
512            for( Endpoint p : channels ) {
513                if( p == null )
514                    continue;
515                p.close();
516            }
517
518            fireConnectionRemoved( this );
519        }
520
521        public void close( String reason )
522        {
523            // Send a reason
524            DisconnectMessage m = new DisconnectMessage();
525            m.setType( DisconnectMessage.KICK );
526            m.setReason( reason );
527            m.setReliable( true );
528            send( m );
529
530            // Just close the reliable endpoint
531            // fast will be cleaned up as a side-effect
532            // when closeConnection() is called by the
533            // connectionClosed() endpoint callback.
534            if( channels[CH_RELIABLE] != null ) {
535                // Close with flush so we make sure our
536                // message gets out
537                channels[CH_RELIABLE].close(true);
538            }
539        }
540
541        public Object setAttribute( String name, Object value )
542        {
543            if( value == null )
544                return sessionData.remove(name);
545            return sessionData.put(name, value);
546        }
547
548        @SuppressWarnings("unchecked")
549        public <T> T getAttribute( String name )
550        {
551            return (T)sessionData.get(name);
552        }
553
554        public Set<String> attributeNames()
555        {
556            return Collections.unmodifiableSet(sessionData.keySet());
557        }
558
559        public String toString()
560        {
561            return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
562                                     + ", fast=" + channels[CH_UNRELIABLE] + " ]";
563        }
564    }
565
566    protected class Redispatch implements MessageListener<HostedConnection>
567    {
568        public void messageReceived( HostedConnection source, Message m )
569        {
570            dispatch( source, m );
571        }
572    }
573
574    protected class FilterAdapter implements Filter<Endpoint>
575    {
576        private Filter<? super HostedConnection> delegate;
577
578        public FilterAdapter( Filter<? super HostedConnection> delegate )
579        {
580            this.delegate = delegate;
581        }
582
583        public boolean apply( Endpoint input )
584        {
585            HostedConnection conn = getConnection( input );
586            if( conn == null )
587                return false;
588            return delegate.apply(conn);
589        }
590    }
591}
592