1/*
2 * Copyright (c) 2011 jMonkeyEngine
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 *   notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 *   notice, this list of conditions and the following disclaimer in the
14 *   documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17 *   may be used to endorse or promote products derived from this software
18 *   without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
33package com.jme3.network.kernel.tcp;
34
35import com.jme3.network.Filter;
36import com.jme3.network.kernel.*;
37import java.io.IOException;
38import java.net.InetAddress;
39import java.net.InetSocketAddress;
40import java.net.Socket;
41import java.nio.ByteBuffer;
42import java.nio.channels.*;
43import java.nio.channels.spi.SelectorProvider;
44import java.util.Iterator;
45import java.util.Map;
46import java.util.concurrent.ConcurrentHashMap;
47import java.util.concurrent.atomic.AtomicBoolean;
48import java.util.logging.Level;
49import java.util.logging.Logger;
50
51
52/**
53 *  A Kernel implementation based on NIO selectors.
54 *
55 *  @version   $Revision: 8944 $
56 *  @author    Paul Speed
57 */
58public class SelectorKernel extends AbstractKernel
59{
60    static Logger log = Logger.getLogger(SelectorKernel.class.getName());
61
62    private InetSocketAddress address;
63    private SelectorThread thread;
64
65    private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>();
66
67    public SelectorKernel( InetAddress host, int port )
68    {
69        this( new InetSocketAddress(host, port) );
70    }
71
72    public SelectorKernel( int port ) throws IOException
73    {
74        this( new InetSocketAddress(port) );
75    }
76
77    public SelectorKernel( InetSocketAddress address )
78    {
79        this.address = address;
80    }
81
82    protected SelectorThread createSelectorThread()
83    {
84        return new SelectorThread();
85    }
86
87    public void initialize()
88    {
89        if( thread != null )
90            throw new IllegalStateException( "Kernel already initialized." );
91
92        thread = createSelectorThread();
93
94        try {
95            thread.connect();
96            thread.start();
97        } catch( IOException e ) {
98            throw new KernelException( "Error hosting:" + address, e );
99        }
100    }
101
102    public void terminate() throws InterruptedException
103    {
104        if( thread == null )
105            throw new IllegalStateException( "Kernel not initialized." );
106
107        try {
108            thread.close();
109            thread = null;
110        } catch( IOException e ) {
111            throw new KernelException( "Error closing host connection:" + address, e );
112        }
113    }
114
115    public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
116                           boolean copy )
117    {
118        if( !reliable )
119            throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." );
120
121        if( copy ) {
122            // Copy the data just once
123            byte[] temp = new byte[data.remaining()];
124            System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
125            data = ByteBuffer.wrap(temp);
126        }
127
128        // Hand it to all of the endpoints that match our routing
129        for( NioEndpoint p : endpoints.values() ) {
130            // Does it match the filter?
131            if( filter != null && !filter.apply(p) )
132                continue;
133
134            // Give it the data... but let each endpoint track their
135            // own completion over the shared array of bytes by
136            // duplicating it
137            p.send( data.duplicate(), false, false );
138        }
139
140        // Wake up the selector so it can reinitialize its
141        // state accordingly.
142        wakeupSelector();
143    }
144
145    protected NioEndpoint addEndpoint( SocketChannel c )
146    {
147        // Note: we purposely do NOT put the key in the endpoint.
148        //       SelectionKeys are dangerous outside the selector thread
149        //       and this is safer.
150        NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c );
151
152        endpoints.put( p.getId(), p );
153
154        // Enqueue an endpoint event for the listeners
155        addEvent( EndpointEvent.createAdd( this, p ) );
156
157        return p;
158    }
159
160    protected void removeEndpoint( NioEndpoint p, SocketChannel c )
161    {
162        endpoints.remove( p.getId() );
163        log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() );
164
165        // Enqueue an endpoint event for the listeners
166        addEvent( EndpointEvent.createRemove( this, p ) );
167
168        // If there are no pending messages then add one so that the
169        // kernel-user knows to wake up if it is only listening for
170        // envelopes.
171        if( !hasEnvelopes() ) {
172            // Note: this is not really a race condition.  At worst, our
173            // event has already been handled by now and it does no harm
174            // to check again.
175            addEnvelope( EVENTS_PENDING );
176        }
177    }
178
179    /**
180     *  Called by the endpoints when they need to be closed.
181     */
182    protected void closeEndpoint( NioEndpoint p ) throws IOException
183    {
184        //log.log( Level.INFO, "Closing endpoint:{0}.", p );
185
186        thread.cancel(p);
187    }
188
189    /**
190     *  Used internally by the endpoints to wakeup the selector
191     *  when they have data to send.
192     */
193    protected void wakeupSelector()
194    {
195        thread.wakeupSelector();
196    }
197
198    protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size )
199    {
200        // Note: if ever desirable, it would be possible to accumulate
201        //       data per source channel and only 'finalize' it when
202        //       asked for more envelopes then were ready.  I just don't
203        //       think it will be an issue in practice.  The busier the
204        //       server, the more the buffers will fill before we get to them.
205        //       And if the server isn't busy, who cares if we chop things up
206        //       smaller... the network is still likely to deliver things in
207        //       bulk anyway.
208
209        // Must copy the shared data before we use it
210        byte[] dataCopy = new byte[size];
211		System.arraycopy(shared.array(), 0, dataCopy, 0, size);
212
213        Envelope env = new Envelope( p, dataCopy, true );
214        addEnvelope( env );
215    }
216
217    /**
218     *  This class is purposely tucked neatly away because
219     *  messing with the selector from other threads for any
220     *  reason is very bad.  This is the safest architecture.
221     */
222    protected class SelectorThread extends Thread
223    {
224        private ServerSocketChannel serverChannel;
225        private Selector selector;
226        private AtomicBoolean go = new AtomicBoolean(true);
227        private ByteBuffer working = ByteBuffer.allocate( 8192 );
228
229        /**
230         *  Because we want to keep the keys to ourselves, we'll do
231         *  the endpoint -> key mapping internally.
232         */
233        private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>();
234
235        public SelectorThread()
236        {
237            setName( "Selector@" + address );
238            setDaemon(true);
239        }
240
241        public void connect() throws IOException
242        {
243            // Create a new selector
244            this.selector = SelectorProvider.provider().openSelector();
245
246            // Create a new non-blocking server socket channel
247            this.serverChannel = ServerSocketChannel.open();
248            serverChannel.configureBlocking(false);
249
250            // Bind the server socket to the specified address and port
251            serverChannel.socket().bind(address);
252
253            // Register the server socket channel, indicating an interest in
254            // accepting new connections
255            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
256
257            log.log( Level.INFO, "Hosting TCP connection:{0}.", address );
258        }
259
260        public void close() throws IOException, InterruptedException
261        {
262            // Set the thread to stop
263            go.set(false);
264
265            // Make sure the channel is closed
266            serverChannel.close();
267
268            // Force the selector to stop blocking
269            wakeupSelector();
270
271            // And wait for it
272            join();
273        }
274
275        protected void wakeupSelector()
276        {
277            selector.wakeup();
278        }
279
280        protected void setupSelectorOptions()
281        {
282            // For now, selection keys will either be in OP_READ
283            // or OP_WRITE.  So while we are writing a buffer, we
284            // will not be reading.  This is way simpler and less
285            // error prone... it can always be changed when everything
286            // else works if we are looking to micro-optimize.
287
288            // Setup options based on the current state of
289            // the endpoints.  This could potentially be more
290            // efficiently done as change requests... or simply
291            // keeping a thread-safe set of endpoints with pending
292            // writes.  For most cases, it shouldn't matter.
293            for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
294                if( e.getKey().hasPending() ) {
295                    e.getValue().interestOps(SelectionKey.OP_WRITE);
296                }
297            }
298        }
299
300        protected void accept( SelectionKey key ) throws IOException
301        {
302            // Would only get accepts on a server channel
303            ServerSocketChannel serverChan = (ServerSocketChannel)key.channel();
304
305            // Setup the connection to be non-blocking
306            SocketChannel remoteChan = serverChan.accept();
307            remoteChan.configureBlocking(false);
308
309            // And disable Nagle's buffering algorithm... we want
310            // data to go when we put it there.
311            Socket sock = remoteChan.socket();
312            sock.setTcpNoDelay(true);
313
314            // Let the selector know we're interested in reading
315            // data from the channel
316            SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ );
317
318            // And now create a new endpoint
319            NioEndpoint p = addEndpoint( remoteChan );
320            endKey.attach(p);
321            endpointKeys.put(p, endKey);
322        }
323
324        protected void cancel( NioEndpoint p ) throws IOException
325        {
326            SelectionKey key = endpointKeys.remove(p);
327            if( key == null ) {
328                //log.log( Level.INFO, "Endpoint already closed:{0}.", p );
329                return;  // already closed it
330            }
331            log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
332
333            log.log( Level.INFO, "Closing endpoint:{0}.", p );
334            SocketChannel c = (SocketChannel)key.channel();
335
336            // Note: key.cancel() is specifically thread safe.  One of
337            //       the few things one can do with a key from another
338            //       thread.
339            key.cancel();
340            c.close();
341            removeEndpoint( p, c );
342        }
343
344        protected void cancel( SelectionKey key, SocketChannel c ) throws IOException
345        {
346            NioEndpoint p = (NioEndpoint)key.attachment();
347            log.log( Level.INFO, "Closing channel endpoint:{0}.", p );
348            Object o = endpointKeys.remove(p);
349
350            log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
351
352            key.cancel();
353            c.close();
354            removeEndpoint( p, c );
355        }
356
357        protected void read( SelectionKey key ) throws IOException
358        {
359            NioEndpoint p = (NioEndpoint)key.attachment();
360            SocketChannel c = (SocketChannel)key.channel();
361            working.clear();
362
363            int size;
364            try {
365                size = c.read(working);
366            } catch( IOException e ) {
367                // The remove end forcibly closed the connection...
368                // close out our end and cancel the key
369                cancel( key, c );
370                return;
371            }
372
373            if( size == -1 ) {
374                // The remote end shut down cleanly...
375                // close out our end and cancel the key
376                cancel( key, c );
377                return;
378            }
379
380            newData( p, c, working, size );
381        }
382
383        protected void write( SelectionKey key ) throws IOException
384        {
385            NioEndpoint p = (NioEndpoint)key.attachment();
386            SocketChannel c = (SocketChannel)key.channel();
387
388            // We will send what we can and move on.
389            ByteBuffer current = p.peekPending();
390            if( current == NioEndpoint.CLOSE_MARKER ) {
391                // This connection wants to be closed now
392                closeEndpoint(p);
393
394                // Nothing more to do
395                return;
396            }
397
398            c.write( current );
399
400            // If we wrote all of that packet then we need to remove it
401            if( current.remaining() == 0 ) {
402                p.removePending();
403            }
404
405            // If we happened to empty the pending queue then let's read
406            // again.
407            if( !p.hasPending() ) {
408                key.interestOps( SelectionKey.OP_READ );
409            }
410        }
411
412        protected void select() throws IOException
413        {
414            selector.select();
415
416            for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) {
417                SelectionKey key = i.next();
418                i.remove();
419
420                if( !key.isValid() )
421                    {
422                    // When does this happen?
423                    log.log( Level.INFO, "Key is not valid:{0}.", key );
424                    continue;
425                    }
426
427                try {
428                    if( key.isAcceptable() )
429                        accept(key);
430                    else if( key.isWritable() )
431                        write(key);
432                    else if( key.isReadable() )
433                        read(key);
434                } catch( IOException e ) {
435                    if( !go.get() )
436                        return;  // error likely due to shutting down
437                    reportError( e );
438
439                    // And at this level, errors likely mean the key is now
440                    // dead and it doesn't hurt to kick them anyway.  If we
441                    // find IOExceptions that are not fatal, this can be
442                    // readdressed
443                    cancel( key, (SocketChannel)key.channel() );
444                }
445            }
446        }
447
448        public void run()
449        {
450            log.log( Level.INFO, "Kernel started for connection:{0}.", address );
451
452            // An atomic is safest and costs almost nothing
453            while( go.get() ) {
454                // Setup any queued option changes
455                setupSelectorOptions();
456
457                // Check for available keys and process them
458                try {
459                    select();
460                } catch( ClosedSelectorException e ) {
461                    if( !go.get() )
462                        return;  // it's because we're shutting down
463                    throw new KernelException( "Premature selector closing", e );
464                } catch( IOException e ) {
465                    if( !go.get() )
466                        return;  // error likely due to shutting down
467                    reportError( e );
468                }
469            }
470        }
471    }
472}
473