1/*
2 * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
3 * $Revision: 677240 $
4 * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
5 *
6 * ====================================================================
7 *
8 *  Licensed to the Apache Software Foundation (ASF) under one or more
9 *  contributor license agreements.  See the NOTICE file distributed with
10 *  this work for additional information regarding copyright ownership.
11 *  The ASF licenses this file to You under the Apache License, Version 2.0
12 *  (the "License"); you may not use this file except in compliance with
13 *  the License.  You may obtain a copy of the License at
14 *
15 *      http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 * ====================================================================
23 *
24 * This software consists of voluntary contributions made by many
25 * individuals on behalf of the Apache Software Foundation.  For more
26 * information on the Apache Software Foundation, please see
27 * <http://www.apache.org/>.
28 *
29 */
30
31package org.apache.http.impl.conn.tsccm;
32
33import java.util.Date;
34import java.util.HashMap;
35import java.util.Iterator;
36import java.util.Queue;
37import java.util.LinkedList;
38import java.util.Map;
39import java.util.concurrent.locks.Condition;
40import java.util.concurrent.TimeUnit;
41
42import org.apache.commons.logging.Log;
43import org.apache.commons.logging.LogFactory;
44import org.apache.http.conn.routing.HttpRoute;
45import org.apache.http.conn.ClientConnectionOperator;
46import org.apache.http.conn.ConnectionPoolTimeoutException;
47import org.apache.http.conn.params.ConnPerRoute;
48import org.apache.http.conn.params.ConnManagerParams;
49import org.apache.http.params.HttpParams;
50
51
52/**
53 * A connection pool that maintains connections by route.
54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
55 * in HttpClient 3.x, see there for original authors. It implements the same
56 * algorithm for connection re-use and connection-per-host enforcement:
57 * <ul>
58 * <li>connections are re-used only for the exact same route</li>
59 * <li>connection limits are enforced per route rather than per host</li>
60 * </ul>
61 * Note that access to the pool datastructures is synchronized via the
62 * {@link AbstractConnPool#poolLock poolLock} in the base class,
63 * not via <code>synchronized</code> methods.
64 *
65 * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
66 * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
67 * @author and others
68 *
69 * @deprecated Please use {@link java.net.URL#openConnection} instead.
70 *     Please visit <a href="http://android-developers.blogspot.com/2011/09/androids-http-clients.html">this webpage</a>
71 *     for further details.
72 */
73@Deprecated
74public class ConnPoolByRoute extends AbstractConnPool {
75
76    private final Log log = LogFactory.getLog(getClass());
77
78    /** Connection operator for this pool */
79    protected final ClientConnectionOperator operator;
80
81    /** The list of free connections */
82    protected Queue<BasicPoolEntry> freeConnections;
83
84    /** The list of WaitingThreads waiting for a connection */
85    protected Queue<WaitingThread> waitingThreads;
86
87    /**
88     * A map of route-specific pools.
89     * Keys are of class {@link HttpRoute},
90     * values of class {@link RouteSpecificPool}.
91     */
92    protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
93
94    protected final int maxTotalConnections;
95
96    private final ConnPerRoute connPerRoute;
97
98    /**
99     * Creates a new connection pool, managed by route.
100     */
101    public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
102        super();
103        if (operator == null) {
104            throw new IllegalArgumentException("Connection operator may not be null");
105        }
106        this.operator = operator;
107
108        freeConnections = createFreeConnQueue();
109        waitingThreads  = createWaitingThreadQueue();
110        routeToPool     = createRouteToPoolMap();
111        maxTotalConnections = ConnManagerParams
112            .getMaxTotalConnections(params);
113        connPerRoute = ConnManagerParams
114            .getMaxConnectionsPerRoute(params);
115    }
116
117
118    /**
119     * Creates the queue for {@link #freeConnections}.
120     * Called once by the constructor.
121     *
122     * @return  a queue
123     */
124    protected Queue<BasicPoolEntry> createFreeConnQueue() {
125        return new LinkedList<BasicPoolEntry>();
126    }
127
128    /**
129     * Creates the queue for {@link #waitingThreads}.
130     * Called once by the constructor.
131     *
132     * @return  a queue
133     */
134    protected Queue<WaitingThread> createWaitingThreadQueue() {
135        return new LinkedList<WaitingThread>();
136    }
137
138    /**
139     * Creates the map for {@link #routeToPool}.
140     * Called once by the constructor.
141     *
142     * @return  a map
143     */
144    protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
145        return new HashMap<HttpRoute, RouteSpecificPool>();
146    }
147
148
149    /**
150     * Creates a new route-specific pool.
151     * Called by {@link #getRoutePool} when necessary.
152     *
153     * @param route     the route
154     *
155     * @return  the new pool
156     */
157    protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
158        return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
159    }
160
161
162    /**
163     * Creates a new waiting thread.
164     * Called by {@link #getRoutePool} when necessary.
165     *
166     * @param cond      the condition to wait for
167     * @param rospl     the route specific pool, or <code>null</code>
168     *
169     * @return  a waiting thread representation
170     */
171    protected WaitingThread newWaitingThread(Condition cond,
172                                             RouteSpecificPool rospl) {
173        return new WaitingThread(cond, rospl);
174    }
175
176
177    /**
178     * Get a route-specific pool of available connections.
179     *
180     * @param route   the route
181     * @param create    whether to create the pool if it doesn't exist
182     *
183     * @return  the pool for the argument route,
184     *     never <code>null</code> if <code>create</code> is <code>true</code>
185     */
186    protected RouteSpecificPool getRoutePool(HttpRoute route,
187                                             boolean create) {
188        RouteSpecificPool rospl = null;
189        poolLock.lock();
190        try {
191
192            rospl = routeToPool.get(route);
193            if ((rospl == null) && create) {
194                // no pool for this route yet (or anymore)
195                rospl = newRouteSpecificPool(route);
196                routeToPool.put(route, rospl);
197            }
198
199        } finally {
200            poolLock.unlock();
201        }
202
203        return rospl;
204    }
205
206
207    //@@@ consider alternatives for gathering statistics
208    public int getConnectionsInPool(HttpRoute route) {
209
210        poolLock.lock();
211        try {
212            // don't allow a pool to be created here!
213            RouteSpecificPool rospl = getRoutePool(route, false);
214            return (rospl != null) ? rospl.getEntryCount() : 0;
215
216        } finally {
217            poolLock.unlock();
218        }
219    }
220
221    @Override
222    public PoolEntryRequest requestPoolEntry(
223            final HttpRoute route,
224            final Object state) {
225
226        final WaitingThreadAborter aborter = new WaitingThreadAborter();
227
228        return new PoolEntryRequest() {
229
230            public void abortRequest() {
231                poolLock.lock();
232                try {
233                    aborter.abort();
234                } finally {
235                    poolLock.unlock();
236                }
237            }
238
239            public BasicPoolEntry getPoolEntry(
240                    long timeout,
241                    TimeUnit tunit)
242                        throws InterruptedException, ConnectionPoolTimeoutException {
243                return getEntryBlocking(route, state, timeout, tunit, aborter);
244            }
245
246        };
247    }
248
249    /**
250     * Obtains a pool entry with a connection within the given timeout.
251     * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
252     * must be called before blocking, to allow the thread to be interrupted.
253     *
254     * @param route     the route for which to get the connection
255     * @param timeout   the timeout, 0 or negative for no timeout
256     * @param tunit     the unit for the <code>timeout</code>,
257     *                  may be <code>null</code> only if there is no timeout
258     * @param aborter   an object which can abort a {@link WaitingThread}.
259     *
260     * @return  pool entry holding a connection for the route
261     *
262     * @throws ConnectionPoolTimeoutException
263     *         if the timeout expired
264     * @throws InterruptedException
265     *         if the calling thread was interrupted
266     */
267    protected BasicPoolEntry getEntryBlocking(
268                                   HttpRoute route, Object state,
269                                   long timeout, TimeUnit tunit,
270                                   WaitingThreadAborter aborter)
271        throws ConnectionPoolTimeoutException, InterruptedException {
272
273        Date deadline = null;
274        if (timeout > 0) {
275            deadline = new Date
276                (System.currentTimeMillis() + tunit.toMillis(timeout));
277        }
278
279        BasicPoolEntry entry = null;
280        poolLock.lock();
281        try {
282
283            RouteSpecificPool rospl = getRoutePool(route, true);
284            WaitingThread waitingThread = null;
285
286            while (entry == null) {
287
288                if (isShutDown) {
289                    throw new IllegalStateException
290                        ("Connection pool shut down.");
291                }
292
293                if (log.isDebugEnabled()) {
294                    log.debug("Total connections kept alive: " + freeConnections.size());
295                    log.debug("Total issued connections: " + issuedConnections.size());
296                    log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
297                }
298
299                // the cases to check for:
300                // - have a free connection for that route
301                // - allowed to create a free connection for that route
302                // - can delete and replace a free connection for another route
303                // - need to wait for one of the things above to come true
304
305                entry = getFreeEntry(rospl, state);
306                if (entry != null) {
307                    break;
308                }
309
310                boolean hasCapacity = rospl.getCapacity() > 0;
311
312                if (log.isDebugEnabled()) {
313                    log.debug("Available capacity: " + rospl.getCapacity()
314                            + " out of " + rospl.getMaxEntries()
315                            + " [" + route + "][" + state + "]");
316                }
317
318                if (hasCapacity && numConnections < maxTotalConnections) {
319
320                    entry = createEntry(rospl, operator);
321
322                } else if (hasCapacity && !freeConnections.isEmpty()) {
323
324                    deleteLeastUsedEntry();
325                    entry = createEntry(rospl, operator);
326
327                } else {
328
329                    if (log.isDebugEnabled()) {
330                        log.debug("Need to wait for connection" +
331                                " [" + route + "][" + state + "]");
332                    }
333
334                    if (waitingThread == null) {
335                        waitingThread =
336                            newWaitingThread(poolLock.newCondition(), rospl);
337                        aborter.setWaitingThread(waitingThread);
338                    }
339
340                    boolean success = false;
341                    try {
342                        rospl.queueThread(waitingThread);
343                        waitingThreads.add(waitingThread);
344                        success = waitingThread.await(deadline);
345
346                    } finally {
347                        // In case of 'success', we were woken up by the
348                        // connection pool and should now have a connection
349                        // waiting for us, or else we're shutting down.
350                        // Just continue in the loop, both cases are checked.
351                        rospl.removeThread(waitingThread);
352                        waitingThreads.remove(waitingThread);
353                    }
354
355                    // check for spurious wakeup vs. timeout
356                    if (!success && (deadline != null) &&
357                        (deadline.getTime() <= System.currentTimeMillis())) {
358                        throw new ConnectionPoolTimeoutException
359                            ("Timeout waiting for connection");
360                    }
361                }
362            } // while no entry
363
364        } finally {
365            poolLock.unlock();
366        }
367
368        return entry;
369
370    } // getEntry
371
372
373    // non-javadoc, see base class AbstractConnPool
374    @Override
375    public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
376
377        HttpRoute route = entry.getPlannedRoute();
378        if (log.isDebugEnabled()) {
379            log.debug("Freeing connection" +
380                    " [" + route + "][" + entry.getState() + "]");
381        }
382
383        poolLock.lock();
384        try {
385            if (isShutDown) {
386                // the pool is shut down, release the
387                // connection's resources and get out of here
388                closeConnection(entry.getConnection());
389                return;
390            }
391
392            // no longer issued, we keep a hard reference now
393            issuedConnections.remove(entry.getWeakRef());
394
395            RouteSpecificPool rospl = getRoutePool(route, true);
396
397            if (reusable) {
398                rospl.freeEntry(entry);
399                freeConnections.add(entry);
400                idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
401            } else {
402                rospl.dropEntry();
403                numConnections--;
404            }
405
406            notifyWaitingThread(rospl);
407
408        } finally {
409            poolLock.unlock();
410        }
411
412    } // freeEntry
413
414
415
416    /**
417     * If available, get a free pool entry for a route.
418     *
419     * @param rospl       the route-specific pool from which to get an entry
420     *
421     * @return  an available pool entry for the given route, or
422     *          <code>null</code> if none is available
423     */
424    protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
425
426        BasicPoolEntry entry = null;
427        poolLock.lock();
428        try {
429            boolean done = false;
430            while(!done) {
431
432                entry = rospl.allocEntry(state);
433
434                if (entry != null) {
435                    if (log.isDebugEnabled()) {
436                        log.debug("Getting free connection"
437                                + " [" + rospl.getRoute() + "][" + state + "]");
438
439                    }
440                    freeConnections.remove(entry);
441                    boolean valid = idleConnHandler.remove(entry.getConnection());
442                    if(!valid) {
443                        // If the free entry isn't valid anymore, get rid of it
444                        // and loop to find another one that might be valid.
445                        if(log.isDebugEnabled())
446                            log.debug("Closing expired free connection"
447                                    + " [" + rospl.getRoute() + "][" + state + "]");
448                        closeConnection(entry.getConnection());
449                        // We use dropEntry instead of deleteEntry because the entry
450                        // is no longer "free" (we just allocated it), and deleteEntry
451                        // can only be used to delete free entries.
452                        rospl.dropEntry();
453                        numConnections--;
454                    } else {
455                        issuedConnections.add(entry.getWeakRef());
456                        done = true;
457                    }
458
459                } else {
460                    done = true;
461                    if (log.isDebugEnabled()) {
462                        log.debug("No free connections"
463                                + " [" + rospl.getRoute() + "][" + state + "]");
464                    }
465                }
466            }
467        } finally {
468            poolLock.unlock();
469        }
470
471        return entry;
472    }
473
474
475    /**
476     * Creates a new pool entry.
477     * This method assumes that the new connection will be handed
478     * out immediately.
479     *
480     * @param rospl       the route-specific pool for which to create the entry
481     * @param op        the operator for creating a connection
482     *
483     * @return  the new pool entry for a new connection
484     */
485    protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
486                                         ClientConnectionOperator op) {
487
488        if (log.isDebugEnabled()) {
489            log.debug("Creating new connection [" + rospl.getRoute() + "]");
490        }
491
492        // the entry will create the connection when needed
493        BasicPoolEntry entry =
494            new BasicPoolEntry(op, rospl.getRoute(), refQueue);
495
496        poolLock.lock();
497        try {
498
499            rospl.createdEntry(entry);
500            numConnections++;
501
502            issuedConnections.add(entry.getWeakRef());
503
504        } finally {
505            poolLock.unlock();
506        }
507
508        return entry;
509    }
510
511
512    /**
513     * Deletes a given pool entry.
514     * This closes the pooled connection and removes all references,
515     * so that it can be GCed.
516     *
517     * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
518     * It is assumed that the caller has already handled this step.</p>
519     * <!-- @@@ is that a good idea? or rather fix it? -->
520     *
521     * @param entry         the pool entry for the connection to delete
522     */
523    protected void deleteEntry(BasicPoolEntry entry) {
524
525        HttpRoute route = entry.getPlannedRoute();
526
527        if (log.isDebugEnabled()) {
528            log.debug("Deleting connection"
529                    + " [" + route + "][" + entry.getState() + "]");
530        }
531
532        poolLock.lock();
533        try {
534
535            closeConnection(entry.getConnection());
536
537            RouteSpecificPool rospl = getRoutePool(route, true);
538            rospl.deleteEntry(entry);
539            numConnections--;
540            if (rospl.isUnused()) {
541                routeToPool.remove(route);
542            }
543
544            idleConnHandler.remove(entry.getConnection());// not idle, but dead
545
546        } finally {
547            poolLock.unlock();
548        }
549    }
550
551
552    /**
553     * Delete an old, free pool entry to make room for a new one.
554     * Used to replace pool entries with ones for a different route.
555     */
556    protected void deleteLeastUsedEntry() {
557
558        try {
559            poolLock.lock();
560
561            //@@@ with get() instead of remove, we could
562            //@@@ leave the removing to deleteEntry()
563            BasicPoolEntry entry = freeConnections.remove();
564
565            if (entry != null) {
566                deleteEntry(entry);
567            } else if (log.isDebugEnabled()) {
568                log.debug("No free connection to delete.");
569            }
570
571        } finally {
572            poolLock.unlock();
573        }
574    }
575
576
577    // non-javadoc, see base class AbstractConnPool
578    @Override
579    protected void handleLostEntry(HttpRoute route) {
580
581        poolLock.lock();
582        try {
583
584            RouteSpecificPool rospl = getRoutePool(route, true);
585            rospl.dropEntry();
586            if (rospl.isUnused()) {
587                routeToPool.remove(route);
588            }
589
590            numConnections--;
591            notifyWaitingThread(rospl);
592
593        } finally {
594            poolLock.unlock();
595        }
596    }
597
598
599    /**
600     * Notifies a waiting thread that a connection is available.
601     * This will wake a thread waiting in the specific route pool,
602     * if there is one.
603     * Otherwise, a thread in the connection pool will be notified.
604     *
605     * @param rospl     the pool in which to notify, or <code>null</code>
606     */
607    protected void notifyWaitingThread(RouteSpecificPool rospl) {
608
609        //@@@ while this strategy provides for best connection re-use,
610        //@@@ is it fair? only do this if the connection is open?
611        // Find the thread we are going to notify. We want to ensure that
612        // each waiting thread is only interrupted once, so we will remove
613        // it from all wait queues before interrupting.
614        WaitingThread waitingThread = null;
615
616        poolLock.lock();
617        try {
618
619            if ((rospl != null) && rospl.hasThread()) {
620                if (log.isDebugEnabled()) {
621                    log.debug("Notifying thread waiting on pool" +
622                            " [" + rospl.getRoute() + "]");
623                }
624                waitingThread = rospl.nextThread();
625            } else if (!waitingThreads.isEmpty()) {
626                if (log.isDebugEnabled()) {
627                    log.debug("Notifying thread waiting on any pool");
628                }
629                waitingThread = waitingThreads.remove();
630            } else if (log.isDebugEnabled()) {
631                log.debug("Notifying no-one, there are no waiting threads");
632            }
633
634            if (waitingThread != null) {
635                waitingThread.wakeup();
636            }
637
638        } finally {
639            poolLock.unlock();
640        }
641    }
642
643
644    //@@@ revise this cleanup stuff
645    //@@@ move method to base class when deleteEntry() is fixed
646    // non-javadoc, see base class AbstractConnPool
647    @Override
648    public void deleteClosedConnections() {
649
650        poolLock.lock();
651        try {
652
653            Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
654            while (iter.hasNext()) {
655                BasicPoolEntry entry = iter.next();
656                if (!entry.getConnection().isOpen()) {
657                    iter.remove();
658                    deleteEntry(entry);
659                }
660            }
661
662        } finally {
663            poolLock.unlock();
664        }
665    }
666
667
668    // non-javadoc, see base class AbstractConnPool
669    @Override
670    public void shutdown() {
671
672        poolLock.lock();
673        try {
674
675            super.shutdown();
676
677            // close all free connections
678            //@@@ move this to base class?
679            Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
680            while (ibpe.hasNext()) {
681                BasicPoolEntry entry = ibpe.next();
682                ibpe.remove();
683                closeConnection(entry.getConnection());
684            }
685
686            // wake up all waiting threads
687            Iterator<WaitingThread> iwth = waitingThreads.iterator();
688            while (iwth.hasNext()) {
689                WaitingThread waiter = iwth.next();
690                iwth.remove();
691                waiter.wakeup();
692            }
693
694            routeToPool.clear();
695
696        } finally {
697            poolLock.unlock();
698        }
699    }
700
701
702} // class ConnPoolByRoute
703
704