1/*
2 * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
3 * $Revision: 677240 $
4 * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
5 *
6 * ====================================================================
7 *
8 *  Licensed to the Apache Software Foundation (ASF) under one or more
9 *  contributor license agreements.  See the NOTICE file distributed with
10 *  this work for additional information regarding copyright ownership.
11 *  The ASF licenses this file to You under the Apache License, Version 2.0
12 *  (the "License"); you may not use this file except in compliance with
13 *  the License.  You may obtain a copy of the License at
14 *
15 *      http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 * ====================================================================
23 *
24 * This software consists of voluntary contributions made by many
25 * individuals on behalf of the Apache Software Foundation.  For more
26 * information on the Apache Software Foundation, please see
27 * <http://www.apache.org/>.
28 *
29 */
30
31package org.apache.http.impl.conn.tsccm;
32
33import java.util.Date;
34import java.util.HashMap;
35import java.util.Iterator;
36import java.util.Queue;
37import java.util.LinkedList;
38import java.util.Map;
39import java.util.concurrent.locks.Condition;
40import java.util.concurrent.TimeUnit;
41
42import org.apache.commons.logging.Log;
43import org.apache.commons.logging.LogFactory;
44import org.apache.http.conn.routing.HttpRoute;
45import org.apache.http.conn.ClientConnectionOperator;
46import org.apache.http.conn.ConnectionPoolTimeoutException;
47import org.apache.http.conn.params.ConnPerRoute;
48import org.apache.http.conn.params.ConnManagerParams;
49import org.apache.http.params.HttpParams;
50
51
52/**
53 * A connection pool that maintains connections by route.
54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
55 * in HttpClient 3.x, see there for original authors. It implements the same
56 * algorithm for connection re-use and connection-per-host enforcement:
57 * <ul>
58 * <li>connections are re-used only for the exact same route</li>
59 * <li>connection limits are enforced per route rather than per host</li>
60 * </ul>
61 * Note that access to the pool datastructures is synchronized via the
62 * {@link AbstractConnPool#poolLock poolLock} in the base class,
63 * not via <code>synchronized</code> methods.
64 *
65 * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
66 * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
67 * @author and others
68 */
69public class ConnPoolByRoute extends AbstractConnPool {
70
71    private final Log log = LogFactory.getLog(getClass());
72
73    /** Connection operator for this pool */
74    protected final ClientConnectionOperator operator;
75
76    /** The list of free connections */
77    protected Queue<BasicPoolEntry> freeConnections;
78
79    /** The list of WaitingThreads waiting for a connection */
80    protected Queue<WaitingThread> waitingThreads;
81
82    /**
83     * A map of route-specific pools.
84     * Keys are of class {@link HttpRoute},
85     * values of class {@link RouteSpecificPool}.
86     */
87    protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
88
89    protected final int maxTotalConnections;
90
91    private final ConnPerRoute connPerRoute;
92
93    /**
94     * Creates a new connection pool, managed by route.
95     */
96    public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
97        super();
98        if (operator == null) {
99            throw new IllegalArgumentException("Connection operator may not be null");
100        }
101        this.operator = operator;
102
103        freeConnections = createFreeConnQueue();
104        waitingThreads  = createWaitingThreadQueue();
105        routeToPool     = createRouteToPoolMap();
106        maxTotalConnections = ConnManagerParams
107            .getMaxTotalConnections(params);
108        connPerRoute = ConnManagerParams
109            .getMaxConnectionsPerRoute(params);
110    }
111
112
113    /**
114     * Creates the queue for {@link #freeConnections}.
115     * Called once by the constructor.
116     *
117     * @return  a queue
118     */
119    protected Queue<BasicPoolEntry> createFreeConnQueue() {
120        return new LinkedList<BasicPoolEntry>();
121    }
122
123    /**
124     * Creates the queue for {@link #waitingThreads}.
125     * Called once by the constructor.
126     *
127     * @return  a queue
128     */
129    protected Queue<WaitingThread> createWaitingThreadQueue() {
130        return new LinkedList<WaitingThread>();
131    }
132
133    /**
134     * Creates the map for {@link #routeToPool}.
135     * Called once by the constructor.
136     *
137     * @return  a map
138     */
139    protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
140        return new HashMap<HttpRoute, RouteSpecificPool>();
141    }
142
143
144    /**
145     * Creates a new route-specific pool.
146     * Called by {@link #getRoutePool} when necessary.
147     *
148     * @param route     the route
149     *
150     * @return  the new pool
151     */
152    protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
153        return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
154    }
155
156
157    /**
158     * Creates a new waiting thread.
159     * Called by {@link #getRoutePool} when necessary.
160     *
161     * @param cond      the condition to wait for
162     * @param rospl     the route specific pool, or <code>null</code>
163     *
164     * @return  a waiting thread representation
165     */
166    protected WaitingThread newWaitingThread(Condition cond,
167                                             RouteSpecificPool rospl) {
168        return new WaitingThread(cond, rospl);
169    }
170
171
172    /**
173     * Get a route-specific pool of available connections.
174     *
175     * @param route   the route
176     * @param create    whether to create the pool if it doesn't exist
177     *
178     * @return  the pool for the argument route,
179     *     never <code>null</code> if <code>create</code> is <code>true</code>
180     */
181    protected RouteSpecificPool getRoutePool(HttpRoute route,
182                                             boolean create) {
183        RouteSpecificPool rospl = null;
184        poolLock.lock();
185        try {
186
187            rospl = routeToPool.get(route);
188            if ((rospl == null) && create) {
189                // no pool for this route yet (or anymore)
190                rospl = newRouteSpecificPool(route);
191                routeToPool.put(route, rospl);
192            }
193
194        } finally {
195            poolLock.unlock();
196        }
197
198        return rospl;
199    }
200
201
202    //@@@ consider alternatives for gathering statistics
203    public int getConnectionsInPool(HttpRoute route) {
204
205        poolLock.lock();
206        try {
207            // don't allow a pool to be created here!
208            RouteSpecificPool rospl = getRoutePool(route, false);
209            return (rospl != null) ? rospl.getEntryCount() : 0;
210
211        } finally {
212            poolLock.unlock();
213        }
214    }
215
216    @Override
217    public PoolEntryRequest requestPoolEntry(
218            final HttpRoute route,
219            final Object state) {
220
221        final WaitingThreadAborter aborter = new WaitingThreadAborter();
222
223        return new PoolEntryRequest() {
224
225            public void abortRequest() {
226                poolLock.lock();
227                try {
228                    aborter.abort();
229                } finally {
230                    poolLock.unlock();
231                }
232            }
233
234            public BasicPoolEntry getPoolEntry(
235                    long timeout,
236                    TimeUnit tunit)
237                        throws InterruptedException, ConnectionPoolTimeoutException {
238                return getEntryBlocking(route, state, timeout, tunit, aborter);
239            }
240
241        };
242    }
243
244    /**
245     * Obtains a pool entry with a connection within the given timeout.
246     * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
247     * must be called before blocking, to allow the thread to be interrupted.
248     *
249     * @param route     the route for which to get the connection
250     * @param timeout   the timeout, 0 or negative for no timeout
251     * @param tunit     the unit for the <code>timeout</code>,
252     *                  may be <code>null</code> only if there is no timeout
253     * @param aborter   an object which can abort a {@link WaitingThread}.
254     *
255     * @return  pool entry holding a connection for the route
256     *
257     * @throws ConnectionPoolTimeoutException
258     *         if the timeout expired
259     * @throws InterruptedException
260     *         if the calling thread was interrupted
261     */
262    protected BasicPoolEntry getEntryBlocking(
263                                   HttpRoute route, Object state,
264                                   long timeout, TimeUnit tunit,
265                                   WaitingThreadAborter aborter)
266        throws ConnectionPoolTimeoutException, InterruptedException {
267
268        Date deadline = null;
269        if (timeout > 0) {
270            deadline = new Date
271                (System.currentTimeMillis() + tunit.toMillis(timeout));
272        }
273
274        BasicPoolEntry entry = null;
275        poolLock.lock();
276        try {
277
278            RouteSpecificPool rospl = getRoutePool(route, true);
279            WaitingThread waitingThread = null;
280
281            while (entry == null) {
282
283                if (isShutDown) {
284                    throw new IllegalStateException
285                        ("Connection pool shut down.");
286                }
287
288                if (log.isDebugEnabled()) {
289                    log.debug("Total connections kept alive: " + freeConnections.size());
290                    log.debug("Total issued connections: " + issuedConnections.size());
291                    log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
292                }
293
294                // the cases to check for:
295                // - have a free connection for that route
296                // - allowed to create a free connection for that route
297                // - can delete and replace a free connection for another route
298                // - need to wait for one of the things above to come true
299
300                entry = getFreeEntry(rospl, state);
301                if (entry != null) {
302                    break;
303                }
304
305                boolean hasCapacity = rospl.getCapacity() > 0;
306
307                if (log.isDebugEnabled()) {
308                    log.debug("Available capacity: " + rospl.getCapacity()
309                            + " out of " + rospl.getMaxEntries()
310                            + " [" + route + "][" + state + "]");
311                }
312
313                if (hasCapacity && numConnections < maxTotalConnections) {
314
315                    entry = createEntry(rospl, operator);
316
317                } else if (hasCapacity && !freeConnections.isEmpty()) {
318
319                    deleteLeastUsedEntry();
320                    entry = createEntry(rospl, operator);
321
322                } else {
323
324                    if (log.isDebugEnabled()) {
325                        log.debug("Need to wait for connection" +
326                                " [" + route + "][" + state + "]");
327                    }
328
329                    if (waitingThread == null) {
330                        waitingThread =
331                            newWaitingThread(poolLock.newCondition(), rospl);
332                        aborter.setWaitingThread(waitingThread);
333                    }
334
335                    boolean success = false;
336                    try {
337                        rospl.queueThread(waitingThread);
338                        waitingThreads.add(waitingThread);
339                        success = waitingThread.await(deadline);
340
341                    } finally {
342                        // In case of 'success', we were woken up by the
343                        // connection pool and should now have a connection
344                        // waiting for us, or else we're shutting down.
345                        // Just continue in the loop, both cases are checked.
346                        rospl.removeThread(waitingThread);
347                        waitingThreads.remove(waitingThread);
348                    }
349
350                    // check for spurious wakeup vs. timeout
351                    if (!success && (deadline != null) &&
352                        (deadline.getTime() <= System.currentTimeMillis())) {
353                        throw new ConnectionPoolTimeoutException
354                            ("Timeout waiting for connection");
355                    }
356                }
357            } // while no entry
358
359        } finally {
360            poolLock.unlock();
361        }
362
363        return entry;
364
365    } // getEntry
366
367
368    // non-javadoc, see base class AbstractConnPool
369    @Override
370    public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
371
372        HttpRoute route = entry.getPlannedRoute();
373        if (log.isDebugEnabled()) {
374            log.debug("Freeing connection" +
375                    " [" + route + "][" + entry.getState() + "]");
376        }
377
378        poolLock.lock();
379        try {
380            if (isShutDown) {
381                // the pool is shut down, release the
382                // connection's resources and get out of here
383                closeConnection(entry.getConnection());
384                return;
385            }
386
387            // no longer issued, we keep a hard reference now
388            issuedConnections.remove(entry.getWeakRef());
389
390            RouteSpecificPool rospl = getRoutePool(route, true);
391
392            if (reusable) {
393                rospl.freeEntry(entry);
394                freeConnections.add(entry);
395                idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
396            } else {
397                rospl.dropEntry();
398                numConnections--;
399            }
400
401            notifyWaitingThread(rospl);
402
403        } finally {
404            poolLock.unlock();
405        }
406
407    } // freeEntry
408
409
410
411    /**
412     * If available, get a free pool entry for a route.
413     *
414     * @param rospl       the route-specific pool from which to get an entry
415     *
416     * @return  an available pool entry for the given route, or
417     *          <code>null</code> if none is available
418     */
419    protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
420
421        BasicPoolEntry entry = null;
422        poolLock.lock();
423        try {
424            boolean done = false;
425            while(!done) {
426
427                entry = rospl.allocEntry(state);
428
429                if (entry != null) {
430                    if (log.isDebugEnabled()) {
431                        log.debug("Getting free connection"
432                                + " [" + rospl.getRoute() + "][" + state + "]");
433
434                    }
435                    freeConnections.remove(entry);
436                    boolean valid = idleConnHandler.remove(entry.getConnection());
437                    if(!valid) {
438                        // If the free entry isn't valid anymore, get rid of it
439                        // and loop to find another one that might be valid.
440                        if(log.isDebugEnabled())
441                            log.debug("Closing expired free connection"
442                                    + " [" + rospl.getRoute() + "][" + state + "]");
443                        closeConnection(entry.getConnection());
444                        // We use dropEntry instead of deleteEntry because the entry
445                        // is no longer "free" (we just allocated it), and deleteEntry
446                        // can only be used to delete free entries.
447                        rospl.dropEntry();
448                        numConnections--;
449                    } else {
450                        issuedConnections.add(entry.getWeakRef());
451                        done = true;
452                    }
453
454                } else {
455                    done = true;
456                    if (log.isDebugEnabled()) {
457                        log.debug("No free connections"
458                                + " [" + rospl.getRoute() + "][" + state + "]");
459                    }
460                }
461            }
462        } finally {
463            poolLock.unlock();
464        }
465
466        return entry;
467    }
468
469
470    /**
471     * Creates a new pool entry.
472     * This method assumes that the new connection will be handed
473     * out immediately.
474     *
475     * @param rospl       the route-specific pool for which to create the entry
476     * @param op        the operator for creating a connection
477     *
478     * @return  the new pool entry for a new connection
479     */
480    protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
481                                         ClientConnectionOperator op) {
482
483        if (log.isDebugEnabled()) {
484            log.debug("Creating new connection [" + rospl.getRoute() + "]");
485        }
486
487        // the entry will create the connection when needed
488        BasicPoolEntry entry =
489            new BasicPoolEntry(op, rospl.getRoute(), refQueue);
490
491        poolLock.lock();
492        try {
493
494            rospl.createdEntry(entry);
495            numConnections++;
496
497            issuedConnections.add(entry.getWeakRef());
498
499        } finally {
500            poolLock.unlock();
501        }
502
503        return entry;
504    }
505
506
507    /**
508     * Deletes a given pool entry.
509     * This closes the pooled connection and removes all references,
510     * so that it can be GCed.
511     *
512     * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
513     * It is assumed that the caller has already handled this step.</p>
514     * <!-- @@@ is that a good idea? or rather fix it? -->
515     *
516     * @param entry         the pool entry for the connection to delete
517     */
518    protected void deleteEntry(BasicPoolEntry entry) {
519
520        HttpRoute route = entry.getPlannedRoute();
521
522        if (log.isDebugEnabled()) {
523            log.debug("Deleting connection"
524                    + " [" + route + "][" + entry.getState() + "]");
525        }
526
527        poolLock.lock();
528        try {
529
530            closeConnection(entry.getConnection());
531
532            RouteSpecificPool rospl = getRoutePool(route, true);
533            rospl.deleteEntry(entry);
534            numConnections--;
535            if (rospl.isUnused()) {
536                routeToPool.remove(route);
537            }
538
539            idleConnHandler.remove(entry.getConnection());// not idle, but dead
540
541        } finally {
542            poolLock.unlock();
543        }
544    }
545
546
547    /**
548     * Delete an old, free pool entry to make room for a new one.
549     * Used to replace pool entries with ones for a different route.
550     */
551    protected void deleteLeastUsedEntry() {
552
553        try {
554            poolLock.lock();
555
556            //@@@ with get() instead of remove, we could
557            //@@@ leave the removing to deleteEntry()
558            BasicPoolEntry entry = freeConnections.remove();
559
560            if (entry != null) {
561                deleteEntry(entry);
562            } else if (log.isDebugEnabled()) {
563                log.debug("No free connection to delete.");
564            }
565
566        } finally {
567            poolLock.unlock();
568        }
569    }
570
571
572    // non-javadoc, see base class AbstractConnPool
573    @Override
574    protected void handleLostEntry(HttpRoute route) {
575
576        poolLock.lock();
577        try {
578
579            RouteSpecificPool rospl = getRoutePool(route, true);
580            rospl.dropEntry();
581            if (rospl.isUnused()) {
582                routeToPool.remove(route);
583            }
584
585            numConnections--;
586            notifyWaitingThread(rospl);
587
588        } finally {
589            poolLock.unlock();
590        }
591    }
592
593
594    /**
595     * Notifies a waiting thread that a connection is available.
596     * This will wake a thread waiting in the specific route pool,
597     * if there is one.
598     * Otherwise, a thread in the connection pool will be notified.
599     *
600     * @param rospl     the pool in which to notify, or <code>null</code>
601     */
602    protected void notifyWaitingThread(RouteSpecificPool rospl) {
603
604        //@@@ while this strategy provides for best connection re-use,
605        //@@@ is it fair? only do this if the connection is open?
606        // Find the thread we are going to notify. We want to ensure that
607        // each waiting thread is only interrupted once, so we will remove
608        // it from all wait queues before interrupting.
609        WaitingThread waitingThread = null;
610
611        poolLock.lock();
612        try {
613
614            if ((rospl != null) && rospl.hasThread()) {
615                if (log.isDebugEnabled()) {
616                    log.debug("Notifying thread waiting on pool" +
617                            " [" + rospl.getRoute() + "]");
618                }
619                waitingThread = rospl.nextThread();
620            } else if (!waitingThreads.isEmpty()) {
621                if (log.isDebugEnabled()) {
622                    log.debug("Notifying thread waiting on any pool");
623                }
624                waitingThread = waitingThreads.remove();
625            } else if (log.isDebugEnabled()) {
626                log.debug("Notifying no-one, there are no waiting threads");
627            }
628
629            if (waitingThread != null) {
630                waitingThread.wakeup();
631            }
632
633        } finally {
634            poolLock.unlock();
635        }
636    }
637
638
639    //@@@ revise this cleanup stuff
640    //@@@ move method to base class when deleteEntry() is fixed
641    // non-javadoc, see base class AbstractConnPool
642    @Override
643    public void deleteClosedConnections() {
644
645        poolLock.lock();
646        try {
647
648            Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
649            while (iter.hasNext()) {
650                BasicPoolEntry entry = iter.next();
651                if (!entry.getConnection().isOpen()) {
652                    iter.remove();
653                    deleteEntry(entry);
654                }
655            }
656
657        } finally {
658            poolLock.unlock();
659        }
660    }
661
662
663    // non-javadoc, see base class AbstractConnPool
664    @Override
665    public void shutdown() {
666
667        poolLock.lock();
668        try {
669
670            super.shutdown();
671
672            // close all free connections
673            //@@@ move this to base class?
674            Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
675            while (ibpe.hasNext()) {
676                BasicPoolEntry entry = ibpe.next();
677                ibpe.remove();
678                closeConnection(entry.getConnection());
679            }
680
681            // wake up all waiting threads
682            Iterator<WaitingThread> iwth = waitingThreads.iterator();
683            while (iwth.hasNext()) {
684                WaitingThread waiter = iwth.next();
685                iwth.remove();
686                waiter.wakeup();
687            }
688
689            routeToPool.clear();
690
691        } finally {
692            poolLock.unlock();
693        }
694    }
695
696
697} // class ConnPoolByRoute
698
699