1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.server.nio;
20
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.Socket;
24import java.nio.channels.ByteChannel;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.ServerSocketChannel;
27import java.nio.channels.SocketChannel;
28import java.util.Set;
29
30import org.eclipse.jetty.http.HttpException;
31import org.eclipse.jetty.io.Buffer;
32import org.eclipse.jetty.io.ConnectedEndPoint;
33import org.eclipse.jetty.io.Connection;
34import org.eclipse.jetty.io.EndPoint;
35import org.eclipse.jetty.io.EofException;
36import org.eclipse.jetty.io.nio.ChannelEndPoint;
37import org.eclipse.jetty.server.BlockingHttpConnection;
38import org.eclipse.jetty.server.Request;
39import org.eclipse.jetty.util.ConcurrentHashSet;
40import org.eclipse.jetty.util.log.Log;
41import org.eclipse.jetty.util.log.Logger;
42
43
44/* ------------------------------------------------------------------------------- */
45/**  Blocking NIO connector.
46 * This connector uses efficient NIO buffers with a traditional blocking thread model.
47 * Direct NIO buffers are used and a thread is allocated per connections.
48 *
49 * This connector is best used when there are a few very active connections.
50 *
51 * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
52 *
53 *
54 *
55 */
56public class BlockingChannelConnector extends AbstractNIOConnector
57{
58    private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
59
60    private transient ServerSocketChannel _acceptChannel;
61    private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
62
63
64    /* ------------------------------------------------------------ */
65    /** Constructor.
66     *
67     */
68    public BlockingChannelConnector()
69    {
70    }
71
72    /* ------------------------------------------------------------ */
73    public Object getConnection()
74    {
75        return _acceptChannel;
76    }
77
78    /* ------------------------------------------------------------ */
79    /**
80     * @see org.eclipse.jetty.server.AbstractConnector#doStart()
81     */
82    @Override
83    protected void doStart() throws Exception
84    {
85        super.doStart();
86        getThreadPool().dispatch(new Runnable()
87        {
88
89            public void run()
90            {
91                while (isRunning())
92                {
93                    try
94                    {
95                        Thread.sleep(400);
96                        long now=System.currentTimeMillis();
97                        for (BlockingChannelEndPoint endp : _endpoints)
98                        {
99                            endp.checkIdleTimestamp(now);
100                        }
101                    }
102                    catch(InterruptedException e)
103                    {
104                        LOG.ignore(e);
105                    }
106                    catch(Exception e)
107                    {
108                        LOG.warn(e);
109                    }
110                }
111            }
112
113        });
114
115    }
116
117
118    /* ------------------------------------------------------------ */
119    public void open() throws IOException
120    {
121        // Create a new server socket and set to non blocking mode
122        _acceptChannel= ServerSocketChannel.open();
123        _acceptChannel.configureBlocking(true);
124
125        // Bind the server socket to the local host and port
126        InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
127        _acceptChannel.socket().bind(addr,getAcceptQueueSize());
128    }
129
130    /* ------------------------------------------------------------ */
131    public void close() throws IOException
132    {
133        if (_acceptChannel != null)
134            _acceptChannel.close();
135        _acceptChannel=null;
136    }
137
138    /* ------------------------------------------------------------ */
139    @Override
140    public void accept(int acceptorID)
141    	throws IOException, InterruptedException
142    {
143        SocketChannel channel = _acceptChannel.accept();
144        channel.configureBlocking(true);
145        Socket socket=channel.socket();
146        configure(socket);
147
148        BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
149        connection.dispatch();
150    }
151
152    /* ------------------------------------------------------------------------------- */
153    @Override
154    public void customize(EndPoint endpoint, Request request)
155        throws IOException
156    {
157        super.customize(endpoint, request);
158        endpoint.setMaxIdleTime(_maxIdleTime);
159        configure(((SocketChannel)endpoint.getTransport()).socket());
160    }
161
162
163    /* ------------------------------------------------------------------------------- */
164    public int getLocalPort()
165    {
166        if (_acceptChannel==null || !_acceptChannel.isOpen())
167            return -1;
168        return _acceptChannel.socket().getLocalPort();
169    }
170
171    /* ------------------------------------------------------------------------------- */
172    /* ------------------------------------------------------------------------------- */
173    /* ------------------------------------------------------------------------------- */
174    private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
175    {
176        private Connection _connection;
177        private int _timeout;
178        private volatile long _idleTimestamp;
179
180        BlockingChannelEndPoint(ByteChannel channel)
181            throws IOException
182        {
183            super(channel,BlockingChannelConnector.this._maxIdleTime);
184            _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
185        }
186
187        /* ------------------------------------------------------------ */
188        /** Get the connection.
189         * @return the connection
190         */
191        public Connection getConnection()
192        {
193            return _connection;
194        }
195
196        /* ------------------------------------------------------------ */
197        public void setConnection(Connection connection)
198        {
199            _connection=connection;
200        }
201
202        /* ------------------------------------------------------------ */
203        public void checkIdleTimestamp(long now)
204        {
205            if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
206            {
207                idleExpired();
208            }
209        }
210
211        /* ------------------------------------------------------------ */
212        protected void idleExpired()
213        {
214            try
215            {
216                super.close();
217            }
218            catch (IOException e)
219            {
220                LOG.ignore(e);
221            }
222        }
223
224        /* ------------------------------------------------------------ */
225        void dispatch() throws IOException
226        {
227            if (!getThreadPool().dispatch(this))
228            {
229                LOG.warn("dispatch failed for  {}",_connection);
230                super.close();
231            }
232        }
233
234        /* ------------------------------------------------------------ */
235        /**
236         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
237         */
238        @Override
239        public int fill(Buffer buffer) throws IOException
240        {
241            _idleTimestamp=System.currentTimeMillis();
242            return super.fill(buffer);
243        }
244
245        /* ------------------------------------------------------------ */
246        /**
247         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
248         */
249        @Override
250        public int flush(Buffer buffer) throws IOException
251        {
252            _idleTimestamp=System.currentTimeMillis();
253            return super.flush(buffer);
254        }
255
256        /* ------------------------------------------------------------ */
257        /**
258         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
259         */
260        @Override
261        public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
262        {
263            _idleTimestamp=System.currentTimeMillis();
264            return super.flush(header,buffer,trailer);
265        }
266
267        /* ------------------------------------------------------------ */
268        public void run()
269        {
270            try
271            {
272                _timeout=getMaxIdleTime();
273                connectionOpened(_connection);
274                _endpoints.add(this);
275
276                while (isOpen())
277                {
278                    _idleTimestamp=System.currentTimeMillis();
279                    if (_connection.isIdle())
280                    {
281                        if (getServer().getThreadPool().isLowOnThreads())
282                        {
283                            int lrmit = getLowResourcesMaxIdleTime();
284                            if (lrmit>=0 && _timeout!= lrmit)
285                            {
286                                _timeout=lrmit;
287                            }
288                        }
289                    }
290                    else
291                    {
292                        if (_timeout!=getMaxIdleTime())
293                        {
294                            _timeout=getMaxIdleTime();
295                        }
296                    }
297
298                    _connection = _connection.handle();
299
300                }
301            }
302            catch (EofException e)
303            {
304                LOG.debug("EOF", e);
305                try{BlockingChannelEndPoint.this.close();}
306                catch(IOException e2){LOG.ignore(e2);}
307            }
308            catch (HttpException e)
309            {
310                LOG.debug("BAD", e);
311                try{super.close();}
312                catch(IOException e2){LOG.ignore(e2);}
313            }
314            catch(Throwable e)
315            {
316                LOG.warn("handle failed",e);
317                try{super.close();}
318                catch(IOException e2){LOG.ignore(e2);}
319            }
320            finally
321            {
322                connectionClosed(_connection);
323                _endpoints.remove(this);
324
325                // wait for client to close, but if not, close ourselves.
326                try
327                {
328                    if (!_socket.isClosed())
329                    {
330                        long timestamp=System.currentTimeMillis();
331                        int max_idle=getMaxIdleTime();
332
333                        _socket.setSoTimeout(getMaxIdleTime());
334                        int c=0;
335                        do
336                        {
337                            c = _socket.getInputStream().read();
338                        }
339                        while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
340                        if (!_socket.isClosed())
341                            _socket.close();
342                    }
343                }
344                catch(IOException e)
345                {
346                    LOG.ignore(e);
347                }
348            }
349        }
350
351        /* ------------------------------------------------------------ */
352        @Override
353        public String toString()
354        {
355            return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
356                    hashCode(),
357                    _socket.getRemoteSocketAddress(),
358                    _socket.getLocalSocketAddress(),
359                    isOpen(),
360                    isInputShutdown(),
361                    isOutputShutdown(),
362                    _connection);
363        }
364
365    }
366}
367