1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.util.thread;
20
21import java.util.concurrent.ArrayBlockingQueue;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.LinkedBlockingQueue;
25import java.util.concurrent.RejectedExecutionException;
26import java.util.concurrent.SynchronousQueue;
27import java.util.concurrent.ThreadPoolExecutor;
28import java.util.concurrent.TimeUnit;
29
30import org.eclipse.jetty.util.component.AbstractLifeCycle;
31import org.eclipse.jetty.util.component.LifeCycle;
32import org.eclipse.jetty.util.log.Log;
33import org.eclipse.jetty.util.log.Logger;
34
35/* ------------------------------------------------------------ */
36/**
37 * Jetty ThreadPool using java 5 ThreadPoolExecutor
38 * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and
39 * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code>
40 */
41public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle
42{
43    private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class);
44    private final ExecutorService _executor;
45
46    /* ------------------------------------------------------------ */
47    public ExecutorThreadPool(ExecutorService executor)
48    {
49        _executor = executor;
50    }
51
52    /* ------------------------------------------------------------ */
53    /**
54     * Wraps an {@link ThreadPoolExecutor}.
55     * Max pool size is 256, pool thread timeout after 60 seconds and
56     * an unbounded {@link LinkedBlockingQueue} is used for the job queue;
57     */
58    public ExecutorThreadPool()
59    {
60        // Using an unbounded queue makes the maxThreads parameter useless
61        // Refer to ThreadPoolExecutor javadocs for details
62        this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
63    }
64
65    /* ------------------------------------------------------------ */
66    /**
67     * Wraps an {@link ThreadPoolExecutor}.
68     * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0.
69     * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a
70     * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size.
71     */
72    public ExecutorThreadPool(int queueSize)
73    {
74        this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) :
75                queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) :
76                        new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)));
77    }
78
79    /* ------------------------------------------------------------ */
80    /**
81     * Wraps an {@link ThreadPoolExecutor} using
82     * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
83     * @param corePoolSize must be equal to maximumPoolSize
84     * @param maximumPoolSize the maximum number of threads to allow in the pool
85     * @param keepAliveTime the max time a thread can remain idle, in milliseconds
86     */
87    public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime)
88    {
89        this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS);
90    }
91
92    /* ------------------------------------------------------------ */
93    /**
94     * Wraps an {@link ThreadPoolExecutor} using
95     * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue.
96     * @param corePoolSize must be equal to maximumPoolSize
97     * @param maximumPoolSize the maximum number of threads to allow in the pool
98     * @param keepAliveTime the max time a thread can remain idle
99     * @param unit the unit for the keepAliveTime
100     */
101    public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)
102    {
103        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>());
104    }
105
106    /* ------------------------------------------------------------ */
107
108    /**
109     * Wraps an {@link ThreadPoolExecutor}
110     * @param corePoolSize the number of threads to keep in the pool, even if they are idle
111     * @param maximumPoolSize the maximum number of threads to allow in the pool
112     * @param keepAliveTime the max time a thread can remain idle
113     * @param unit the unit for the keepAliveTime
114     * @param workQueue the queue to use for holding tasks before they are executed
115     */
116    public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
117    {
118        this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue));
119    }
120
121    /* ------------------------------------------------------------ */
122    public boolean dispatch(Runnable job)
123    {
124        try
125        {
126            _executor.execute(job);
127            return true;
128        }
129        catch(RejectedExecutionException e)
130        {
131            LOG.warn(e);
132            return false;
133        }
134    }
135
136    /* ------------------------------------------------------------ */
137    public int getIdleThreads()
138    {
139        if (_executor instanceof ThreadPoolExecutor)
140        {
141            final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
142            return tpe.getPoolSize() - tpe.getActiveCount();
143        }
144        return -1;
145    }
146
147    /* ------------------------------------------------------------ */
148    public int getThreads()
149    {
150        if (_executor instanceof ThreadPoolExecutor)
151        {
152            final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
153            return tpe.getPoolSize();
154        }
155        return -1;
156    }
157
158    /* ------------------------------------------------------------ */
159    public boolean isLowOnThreads()
160    {
161        if (_executor instanceof ThreadPoolExecutor)
162        {
163            final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
164            // getActiveCount() locks the thread pool, so execute it last
165            return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
166                    tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
167        }
168        return false;
169    }
170
171    /* ------------------------------------------------------------ */
172    public void join() throws InterruptedException
173    {
174        _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
175    }
176
177    /* ------------------------------------------------------------ */
178    @Override
179    protected void doStop() throws Exception
180    {
181        super.doStop();
182        _executor.shutdownNow();
183    }
184}
185