1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.kernel.udp;
34
35import com.jme3.network.Filter;
36import com.jme3.network.kernel.*;
37import java.io.IOException;
38import java.net.*;
39import java.nio.ByteBuffer;
40import java.util.Map;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44import java.util.concurrent.atomic.AtomicBoolean;
45import java.util.logging.Level;
46import java.util.logging.Logger;
47
48/**
49 *  A Kernel implementation using UDP packets.
50 *
51 *  @version   $Revision: 8944 $
52 *  @author    Paul Speed
53 */
54public class UdpKernel extends AbstractKernel
55{
56    static Logger log = Logger.getLogger(UdpKernel.class.getName());
57
58    private InetSocketAddress address;
59    private HostThread thread;
60
61    private ExecutorService writer;
62
63    // The nature of UDP means that even through a firewall,
64    // a user would have to have a unique address+port since UDP
65    // can't really be NAT'ed.
66    private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>();
67
68    public UdpKernel( InetAddress host, int port )
69    {
70        this( new InetSocketAddress(host, port) );
71    }
72
73    public UdpKernel( int port ) throws IOException
74    {
75        this( new InetSocketAddress(port) );
76    }
77
78    public UdpKernel( InetSocketAddress address )
79    {
80        this.address = address;
81    }
82
83    protected HostThread createHostThread()
84    {
85        return new HostThread();
86    }
87
88    public void initialize()
89    {
90        if( thread != null )
91            throw new IllegalStateException( "Kernel already initialized." );
92
93        writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer"));
94
95        thread = createHostThread();
96
97        try {
98            thread.connect();
99            thread.start();
100        } catch( IOException e ) {
101            throw new KernelException( "Error hosting:" + address, e );
102        }
103    }
104
105    public void terminate() throws InterruptedException
106    {
107        if( thread == null )
108            throw new IllegalStateException( "Kernel not initialized." );
109
110        try {
111            thread.close();
112            writer.shutdown();
113            thread = null;
114        } catch( IOException e ) {
115            throw new KernelException( "Error closing host connection:" + address, e );
116        }
117    }
118
119    /**
120     *  Dispatches the data to all endpoints managed by the
121     *  kernel.  'routing' is currently ignored.
122     */
123    public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
124                           boolean copy )
125    {
126        if( reliable )
127            throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );
128
129        if( copy ) {
130            // Copy the data just once
131            byte[] temp = new byte[data.remaining()];
132            System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
133            data = ByteBuffer.wrap(temp);
134        }
135
136        // Hand it to all of the endpoints that match our routing
137        for( UdpEndpoint p : socketEndpoints.values() ) {
138            // Does it match the filter?
139            if( filter != null && !filter.apply(p) )
140                continue;
141
142            // Send the data
143            p.send( data );
144        }
145    }
146
147    protected Endpoint getEndpoint( SocketAddress address, boolean create )
148    {
149        UdpEndpoint p = socketEndpoints.get(address);
150        if( p == null && create ) {
151            p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() );
152            socketEndpoints.put( address, p );
153
154            // Add an event for it.
155            addEvent( EndpointEvent.createAdd( this, p ) );
156        }
157        return p;
158    }
159
160    /**
161     *  Called by the endpoints when they need to be closed.
162     */
163    protected void closeEndpoint( UdpEndpoint p ) throws IOException
164    {
165        // Just book-keeping to do here.
166        if( socketEndpoints.remove( p.getRemoteAddress() ) == null )
167            return;
168
169        log.log( Level.INFO, "Closing endpoint:{0}.", p );
170        log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );
171
172        addEvent( EndpointEvent.createRemove( this, p ) );
173
174        // If there are no pending messages then add one so that the
175        // kernel-user knows to wake up if it is only listening for
176        // envelopes.
177        if( !hasEnvelopes() ) {
178            // Note: this is not really a race condition.  At worst, our
179            // event has already been handled by now and it does no harm
180            // to check again.
181            addEnvelope( EVENTS_PENDING );
182        }
183    }
184
185    protected void newData( DatagramPacket packet )
186    {
187        // So the tricky part here is figuring out the endpoint and
188        // whether it's new or not.  In these UDP schemes, firewalls have
189        // to be ported back to a specific machine so we will consider
190        // the address + port (ie: SocketAddress) the defacto unique
191        // ID.
192        Endpoint p = getEndpoint( packet.getSocketAddress(), true );
193
194        // We'll copy the data to trim it.
195        byte[] data = new byte[packet.getLength()];
196        System.arraycopy(packet.getData(), 0, data, 0, data.length);
197
198        Envelope env = new Envelope( p, data, false );
199        addEnvelope( env );
200    }
201
202    protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet )
203    {
204        writer.execute( new MessageWriter(endpoint, packet) );
205    }
206
207    protected class MessageWriter implements Runnable
208    {
209        private Endpoint endpoint;
210        private DatagramPacket packet;
211
212        public MessageWriter( Endpoint endpoint, DatagramPacket packet )
213        {
214            this.endpoint = endpoint;
215            this.packet = packet;
216        }
217
218        public void run()
219        {
220            // Not guaranteed to always work but an extra datagram
221            // to a dead connection isn't so big of a deal.
222            if( !endpoint.isConnected() ) {
223                return;
224            }
225
226            try {
227                thread.getSocket().send(packet);
228            } catch( Exception e ) {
229                KernelException exc = new KernelException( "Error sending datagram to:" + address, e );
230                exc.fillInStackTrace();
231                reportError(exc);
232            }
233        }
234    }
235
236    protected class HostThread extends Thread
237    {
238        private DatagramSocket socket;
239        private AtomicBoolean go = new AtomicBoolean(true);
240
241        private byte[] buffer = new byte[65535]; // slightly bigger than needed.
242
243        public HostThread()
244        {
245            setName( "UDP Host@" + address );
246            setDaemon(true);
247        }
248
249        protected DatagramSocket getSocket()
250        {
251            return socket;
252        }
253
254        public void connect() throws IOException
255        {
256            socket = new DatagramSocket( address );
257            log.log( Level.INFO, "Hosting UDP connection:{0}.", address );
258        }
259
260        public void close() throws IOException, InterruptedException
261        {
262            // Set the thread to stop
263            go.set(false);
264
265            // Make sure the channel is closed
266            socket.close();
267
268            // And wait for it
269            join();
270        }
271
272        public void run()
273        {
274            log.log( Level.INFO, "Kernel started for connection:{0}.", address );
275
276            // An atomic is safest and costs almost nothing
277            while( go.get() ) {
278                try {
279                    // Could reuse the packet but I don't see the
280                    // point and it may lead to subtle bugs if not properly
281                    // reset.
282                    DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
283                    socket.receive(packet);
284
285                    newData( packet );
286                } catch( IOException e ) {
287                    if( !go.get() )
288                        return;
289                    reportError( e );
290                }
291            }
292        }
293    }
294}
295