/* * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $ * $Revision: 677240 $ * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $ * * ==================================================================== * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * */ package org.apache.http.impl.conn.tsccm; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Queue; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.ClientConnectionOperator; import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.params.ConnPerRoute; import org.apache.http.conn.params.ConnManagerParams; import org.apache.http.params.HttpParams; /** * A connection pool that maintains connections by route. * This class is derived from MultiThreadedHttpConnectionManager * in HttpClient 3.x, see there for original authors. It implements the same * algorithm for connection re-use and connection-per-host enforcement: * * Note that access to the pool datastructures is synchronized via the * {@link AbstractConnPool#poolLock poolLock} in the base class, * not via synchronized methods. * * @author Roland Weber * @author Michael Becke * @author and others * * @deprecated Please use {@link java.net.URL#openConnection} instead. * Please visit this webpage * for further details. */ @Deprecated public class ConnPoolByRoute extends AbstractConnPool { private final Log log = LogFactory.getLog(getClass()); /** Connection operator for this pool */ protected final ClientConnectionOperator operator; /** The list of free connections */ protected Queue freeConnections; /** The list of WaitingThreads waiting for a connection */ protected Queue waitingThreads; /** * A map of route-specific pools. * Keys are of class {@link HttpRoute}, * values of class {@link RouteSpecificPool}. */ protected final Map routeToPool; protected final int maxTotalConnections; private final ConnPerRoute connPerRoute; /** * Creates a new connection pool, managed by route. */ public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { super(); if (operator == null) { throw new IllegalArgumentException("Connection operator may not be null"); } this.operator = operator; freeConnections = createFreeConnQueue(); waitingThreads = createWaitingThreadQueue(); routeToPool = createRouteToPoolMap(); maxTotalConnections = ConnManagerParams .getMaxTotalConnections(params); connPerRoute = ConnManagerParams .getMaxConnectionsPerRoute(params); } /** * Creates the queue for {@link #freeConnections}. * Called once by the constructor. * * @return a queue */ protected Queue createFreeConnQueue() { return new LinkedList(); } /** * Creates the queue for {@link #waitingThreads}. * Called once by the constructor. * * @return a queue */ protected Queue createWaitingThreadQueue() { return new LinkedList(); } /** * Creates the map for {@link #routeToPool}. * Called once by the constructor. * * @return a map */ protected Map createRouteToPoolMap() { return new HashMap(); } /** * Creates a new route-specific pool. * Called by {@link #getRoutePool} when necessary. * * @param route the route * * @return the new pool */ protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route)); } /** * Creates a new waiting thread. * Called by {@link #getRoutePool} when necessary. * * @param cond the condition to wait for * @param rospl the route specific pool, or null * * @return a waiting thread representation */ protected WaitingThread newWaitingThread(Condition cond, RouteSpecificPool rospl) { return new WaitingThread(cond, rospl); } /** * Get a route-specific pool of available connections. * * @param route the route * @param create whether to create the pool if it doesn't exist * * @return the pool for the argument route, * never null if create is true */ protected RouteSpecificPool getRoutePool(HttpRoute route, boolean create) { RouteSpecificPool rospl = null; poolLock.lock(); try { rospl = routeToPool.get(route); if ((rospl == null) && create) { // no pool for this route yet (or anymore) rospl = newRouteSpecificPool(route); routeToPool.put(route, rospl); } } finally { poolLock.unlock(); } return rospl; } //@@@ consider alternatives for gathering statistics public int getConnectionsInPool(HttpRoute route) { poolLock.lock(); try { // don't allow a pool to be created here! RouteSpecificPool rospl = getRoutePool(route, false); return (rospl != null) ? rospl.getEntryCount() : 0; } finally { poolLock.unlock(); } } @Override public PoolEntryRequest requestPoolEntry( final HttpRoute route, final Object state) { final WaitingThreadAborter aborter = new WaitingThreadAborter(); return new PoolEntryRequest() { public void abortRequest() { poolLock.lock(); try { aborter.abort(); } finally { poolLock.unlock(); } } public BasicPoolEntry getPoolEntry( long timeout, TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException { return getEntryBlocking(route, state, timeout, tunit, aborter); } }; } /** * Obtains a pool entry with a connection within the given timeout. * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} * must be called before blocking, to allow the thread to be interrupted. * * @param route the route for which to get the connection * @param timeout the timeout, 0 or negative for no timeout * @param tunit the unit for the timeout, * may be null only if there is no timeout * @param aborter an object which can abort a {@link WaitingThread}. * * @return pool entry holding a connection for the route * * @throws ConnectionPoolTimeoutException * if the timeout expired * @throws InterruptedException * if the calling thread was interrupted */ protected BasicPoolEntry getEntryBlocking( HttpRoute route, Object state, long timeout, TimeUnit tunit, WaitingThreadAborter aborter) throws ConnectionPoolTimeoutException, InterruptedException { Date deadline = null; if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); } BasicPoolEntry entry = null; poolLock.lock(); try { RouteSpecificPool rospl = getRoutePool(route, true); WaitingThread waitingThread = null; while (entry == null) { if (isShutDown) { throw new IllegalStateException ("Connection pool shut down."); } if (log.isDebugEnabled()) { log.debug("Total connections kept alive: " + freeConnections.size()); log.debug("Total issued connections: " + issuedConnections.size()); log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections); } // the cases to check for: // - have a free connection for that route // - allowed to create a free connection for that route // - can delete and replace a free connection for another route // - need to wait for one of the things above to come true entry = getFreeEntry(rospl, state); if (entry != null) { break; } boolean hasCapacity = rospl.getCapacity() > 0; if (log.isDebugEnabled()) { log.debug("Available capacity: " + rospl.getCapacity() + " out of " + rospl.getMaxEntries() + " [" + route + "][" + state + "]"); } if (hasCapacity && numConnections < maxTotalConnections) { entry = createEntry(rospl, operator); } else if (hasCapacity && !freeConnections.isEmpty()) { deleteLeastUsedEntry(); entry = createEntry(rospl, operator); } else { if (log.isDebugEnabled()) { log.debug("Need to wait for connection" + " [" + route + "][" + state + "]"); } if (waitingThread == null) { waitingThread = newWaitingThread(poolLock.newCondition(), rospl); aborter.setWaitingThread(waitingThread); } boolean success = false; try { rospl.queueThread(waitingThread); waitingThreads.add(waitingThread); success = waitingThread.await(deadline); } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. rospl.removeThread(waitingThread); waitingThreads.remove(waitingThread); } // check for spurious wakeup vs. timeout if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) { throw new ConnectionPoolTimeoutException ("Timeout waiting for connection"); } } } // while no entry } finally { poolLock.unlock(); } return entry; } // getEntry // non-javadoc, see base class AbstractConnPool @Override public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { HttpRoute route = entry.getPlannedRoute(); if (log.isDebugEnabled()) { log.debug("Freeing connection" + " [" + route + "][" + entry.getState() + "]"); } poolLock.lock(); try { if (isShutDown) { // the pool is shut down, release the // connection's resources and get out of here closeConnection(entry.getConnection()); return; } // no longer issued, we keep a hard reference now issuedConnections.remove(entry.getWeakRef()); RouteSpecificPool rospl = getRoutePool(route, true); if (reusable) { rospl.freeEntry(entry); freeConnections.add(entry); idleConnHandler.add(entry.getConnection(), validDuration, timeUnit); } else { rospl.dropEntry(); numConnections--; } notifyWaitingThread(rospl); } finally { poolLock.unlock(); } } // freeEntry /** * If available, get a free pool entry for a route. * * @param rospl the route-specific pool from which to get an entry * * @return an available pool entry for the given route, or * null if none is available */ protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { BasicPoolEntry entry = null; poolLock.lock(); try { boolean done = false; while(!done) { entry = rospl.allocEntry(state); if (entry != null) { if (log.isDebugEnabled()) { log.debug("Getting free connection" + " [" + rospl.getRoute() + "][" + state + "]"); } freeConnections.remove(entry); boolean valid = idleConnHandler.remove(entry.getConnection()); if(!valid) { // If the free entry isn't valid anymore, get rid of it // and loop to find another one that might be valid. if(log.isDebugEnabled()) log.debug("Closing expired free connection" + " [" + rospl.getRoute() + "][" + state + "]"); closeConnection(entry.getConnection()); // We use dropEntry instead of deleteEntry because the entry // is no longer "free" (we just allocated it), and deleteEntry // can only be used to delete free entries. rospl.dropEntry(); numConnections--; } else { issuedConnections.add(entry.getWeakRef()); done = true; } } else { done = true; if (log.isDebugEnabled()) { log.debug("No free connections" + " [" + rospl.getRoute() + "][" + state + "]"); } } } } finally { poolLock.unlock(); } return entry; } /** * Creates a new pool entry. * This method assumes that the new connection will be handed * out immediately. * * @param rospl the route-specific pool for which to create the entry * @param op the operator for creating a connection * * @return the new pool entry for a new connection */ protected BasicPoolEntry createEntry(RouteSpecificPool rospl, ClientConnectionOperator op) { if (log.isDebugEnabled()) { log.debug("Creating new connection [" + rospl.getRoute() + "]"); } // the entry will create the connection when needed BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), refQueue); poolLock.lock(); try { rospl.createdEntry(entry); numConnections++; issuedConnections.add(entry.getWeakRef()); } finally { poolLock.unlock(); } return entry; } /** * Deletes a given pool entry. * This closes the pooled connection and removes all references, * so that it can be GCed. * *

Note: Does not remove the entry from the freeConnections list. * It is assumed that the caller has already handled this step.

* * * @param entry the pool entry for the connection to delete */ protected void deleteEntry(BasicPoolEntry entry) { HttpRoute route = entry.getPlannedRoute(); if (log.isDebugEnabled()) { log.debug("Deleting connection" + " [" + route + "][" + entry.getState() + "]"); } poolLock.lock(); try { closeConnection(entry.getConnection()); RouteSpecificPool rospl = getRoutePool(route, true); rospl.deleteEntry(entry); numConnections--; if (rospl.isUnused()) { routeToPool.remove(route); } idleConnHandler.remove(entry.getConnection());// not idle, but dead } finally { poolLock.unlock(); } } /** * Delete an old, free pool entry to make room for a new one. * Used to replace pool entries with ones for a different route. */ protected void deleteLeastUsedEntry() { try { poolLock.lock(); //@@@ with get() instead of remove, we could //@@@ leave the removing to deleteEntry() BasicPoolEntry entry = freeConnections.remove(); if (entry != null) { deleteEntry(entry); } else if (log.isDebugEnabled()) { log.debug("No free connection to delete."); } } finally { poolLock.unlock(); } } // non-javadoc, see base class AbstractConnPool @Override protected void handleLostEntry(HttpRoute route) { poolLock.lock(); try { RouteSpecificPool rospl = getRoutePool(route, true); rospl.dropEntry(); if (rospl.isUnused()) { routeToPool.remove(route); } numConnections--; notifyWaitingThread(rospl); } finally { poolLock.unlock(); } } /** * Notifies a waiting thread that a connection is available. * This will wake a thread waiting in the specific route pool, * if there is one. * Otherwise, a thread in the connection pool will be notified. * * @param rospl the pool in which to notify, or null */ protected void notifyWaitingThread(RouteSpecificPool rospl) { //@@@ while this strategy provides for best connection re-use, //@@@ is it fair? only do this if the connection is open? // Find the thread we are going to notify. We want to ensure that // each waiting thread is only interrupted once, so we will remove // it from all wait queues before interrupting. WaitingThread waitingThread = null; poolLock.lock(); try { if ((rospl != null) && rospl.hasThread()) { if (log.isDebugEnabled()) { log.debug("Notifying thread waiting on pool" + " [" + rospl.getRoute() + "]"); } waitingThread = rospl.nextThread(); } else if (!waitingThreads.isEmpty()) { if (log.isDebugEnabled()) { log.debug("Notifying thread waiting on any pool"); } waitingThread = waitingThreads.remove(); } else if (log.isDebugEnabled()) { log.debug("Notifying no-one, there are no waiting threads"); } if (waitingThread != null) { waitingThread.wakeup(); } } finally { poolLock.unlock(); } } //@@@ revise this cleanup stuff //@@@ move method to base class when deleteEntry() is fixed // non-javadoc, see base class AbstractConnPool @Override public void deleteClosedConnections() { poolLock.lock(); try { Iterator iter = freeConnections.iterator(); while (iter.hasNext()) { BasicPoolEntry entry = iter.next(); if (!entry.getConnection().isOpen()) { iter.remove(); deleteEntry(entry); } } } finally { poolLock.unlock(); } } // non-javadoc, see base class AbstractConnPool @Override public void shutdown() { poolLock.lock(); try { super.shutdown(); // close all free connections //@@@ move this to base class? Iterator ibpe = freeConnections.iterator(); while (ibpe.hasNext()) { BasicPoolEntry entry = ibpe.next(); ibpe.remove(); closeConnection(entry.getConnection()); } // wake up all waiting threads Iterator iwth = waitingThreads.iterator(); while (iwth.hasNext()) { WaitingThread waiter = iwth.next(); iwth.remove(); waiter.wakeup(); } routeToPool.clear(); } finally { poolLock.unlock(); } } } // class ConnPoolByRoute