1/* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.volley; 18 19import android.os.Handler; 20import android.os.Looper; 21 22import java.util.HashMap; 23import java.util.HashSet; 24import java.util.LinkedList; 25import java.util.Map; 26import java.util.Queue; 27import java.util.Set; 28import java.util.concurrent.PriorityBlockingQueue; 29import java.util.concurrent.atomic.AtomicInteger; 30 31/** 32 * A request dispatch queue with a thread pool of dispatchers. 33 * 34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch, 35 * resolving from either cache or network on a worker thread, and then delivering 36 * a parsed response on the main thread. 37 */ 38@SuppressWarnings("rawtypes") 39public class RequestQueue { 40 41 /** Used for generating monotonically-increasing sequence numbers for requests. */ 42 private AtomicInteger mSequenceGenerator = new AtomicInteger(); 43 44 /** 45 * Staging area for requests that already have a duplicate request in flight. 46 * 47 * <ul> 48 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache 49 * key.</li> 50 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request 51 * is <em>not</em> contained in that list. Is null if no requests are staged.</li> 52 * </ul> 53 */ 54 private final Map<String, Queue<Request>> mWaitingRequests = 55 new HashMap<String, Queue<Request>>(); 56 57 /** 58 * The set of all requests currently being processed by this RequestQueue. A Request 59 * will be in this set if it is waiting in any queue or currently being processed by 60 * any dispatcher. 61 */ 62 private final Set<Request> mCurrentRequests = new HashSet<Request>(); 63 64 /** The cache triage queue. */ 65 private final PriorityBlockingQueue<Request> mCacheQueue = 66 new PriorityBlockingQueue<Request>(); 67 68 /** The queue of requests that are actually going out to the network. */ 69 private final PriorityBlockingQueue<Request> mNetworkQueue = 70 new PriorityBlockingQueue<Request>(); 71 72 /** Number of network request dispatcher threads to start. */ 73 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4; 74 75 /** Cache interface for retrieving and storing respones. */ 76 private final Cache mCache; 77 78 /** Network interface for performing requests. */ 79 private final Network mNetwork; 80 81 /** Response delivery mechanism. */ 82 private final ResponseDelivery mDelivery; 83 84 /** The network dispatchers. */ 85 private NetworkDispatcher[] mDispatchers; 86 87 /** The cache dispatcher. */ 88 private CacheDispatcher mCacheDispatcher; 89 90 /** 91 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 92 * 93 * @param cache A Cache to use for persisting responses to disk 94 * @param network A Network interface for performing HTTP requests 95 * @param threadPoolSize Number of network dispatcher threads to create 96 * @param delivery A ResponseDelivery interface for posting responses and errors 97 */ 98 public RequestQueue(Cache cache, Network network, int threadPoolSize, 99 ResponseDelivery delivery) { 100 mCache = cache; 101 mNetwork = network; 102 mDispatchers = new NetworkDispatcher[threadPoolSize]; 103 mDelivery = delivery; 104 } 105 106 /** 107 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 108 * 109 * @param cache A Cache to use for persisting responses to disk 110 * @param network A Network interface for performing HTTP requests 111 * @param threadPoolSize Number of network dispatcher threads to create 112 */ 113 public RequestQueue(Cache cache, Network network, int threadPoolSize) { 114 this(cache, network, threadPoolSize, 115 new ExecutorDelivery(new Handler(Looper.getMainLooper()))); 116 } 117 118 /** 119 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 120 * 121 * @param cache A Cache to use for persisting responses to disk 122 * @param network A Network interface for performing HTTP requests 123 */ 124 public RequestQueue(Cache cache, Network network) { 125 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE); 126 } 127 128 /** 129 * Starts the dispatchers in this queue. 130 */ 131 public void start() { 132 stop(); // Make sure any currently running dispatchers are stopped. 133 // Create the cache dispatcher and start it. 134 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); 135 mCacheDispatcher.start(); 136 137 // Create network dispatchers (and corresponding threads) up to the pool size. 138 for (int i = 0; i < mDispatchers.length; i++) { 139 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, 140 mCache, mDelivery); 141 mDispatchers[i] = networkDispatcher; 142 networkDispatcher.start(); 143 } 144 } 145 146 /** 147 * Stops the cache and network dispatchers. 148 */ 149 public void stop() { 150 if (mCacheDispatcher != null) { 151 mCacheDispatcher.quit(); 152 } 153 for (int i = 0; i < mDispatchers.length; i++) { 154 if (mDispatchers[i] != null) { 155 mDispatchers[i].quit(); 156 } 157 } 158 } 159 160 /** 161 * Gets a sequence number. 162 */ 163 public int getSequenceNumber() { 164 return mSequenceGenerator.incrementAndGet(); 165 } 166 167 /** 168 * Gets the {@link Cache} instance being used. 169 */ 170 public Cache getCache() { 171 return mCache; 172 } 173 174 /** 175 * A simple predicate or filter interface for Requests, for use by 176 * {@link RequestQueue#cancelAll(RequestFilter)}. 177 */ 178 public interface RequestFilter { 179 public boolean apply(Request<?> request); 180 } 181 182 /** 183 * Cancels all requests in this queue for which the given filter applies. 184 * @param filter The filtering function to use 185 */ 186 public void cancelAll(RequestFilter filter) { 187 synchronized (mCurrentRequests) { 188 for (Request<?> request : mCurrentRequests) { 189 if (filter.apply(request)) { 190 request.cancel(); 191 } 192 } 193 } 194 } 195 196 /** 197 * Cancels all requests in this queue with the given tag. Tag must be non-null 198 * and equality is by identity. 199 */ 200 public void cancelAll(final Object tag) { 201 if (tag == null) { 202 throw new IllegalArgumentException("Cannot cancelAll with a null tag"); 203 } 204 cancelAll(new RequestFilter() { 205 @Override 206 public boolean apply(Request<?> request) { 207 return request.getTag() == tag; 208 } 209 }); 210 } 211 212 /** 213 * Adds a Request to the dispatch queue. 214 * @param request The request to service 215 * @return The passed-in request 216 */ 217 public Request add(Request request) { 218 // Tag the request as belonging to this queue and add it to the set of current requests. 219 request.setRequestQueue(this); 220 synchronized (mCurrentRequests) { 221 mCurrentRequests.add(request); 222 } 223 224 // Process requests in the order they are added. 225 request.setSequence(getSequenceNumber()); 226 request.addMarker("add-to-queue"); 227 228 // If the request is uncacheable, skip the cache queue and go straight to the network. 229 if (!request.shouldCache()) { 230 mNetworkQueue.add(request); 231 return request; 232 } 233 234 // Insert request into stage if there's already a request with the same cache key in flight. 235 synchronized (mWaitingRequests) { 236 String cacheKey = request.getCacheKey(); 237 if (mWaitingRequests.containsKey(cacheKey)) { 238 // There is already a request in flight. Queue up. 239 Queue<Request> stagedRequests = mWaitingRequests.get(cacheKey); 240 if (stagedRequests == null) { 241 stagedRequests = new LinkedList<Request>(); 242 } 243 stagedRequests.add(request); 244 mWaitingRequests.put(cacheKey, stagedRequests); 245 if (VolleyLog.DEBUG) { 246 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); 247 } 248 } else { 249 // Insert 'null' queue for this cacheKey, indicating there is now a request in 250 // flight. 251 mWaitingRequests.put(cacheKey, null); 252 mCacheQueue.add(request); 253 } 254 return request; 255 } 256 } 257 258 /** 259 * Called from {@link Request#finish(String)}, indicating that processing of the given request 260 * has finished. 261 * 262 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if 263 * <code>request.shouldCache()</code>.</p> 264 */ 265 void finish(Request request) { 266 // Remove from the set of requests currently being processed. 267 synchronized (mCurrentRequests) { 268 mCurrentRequests.remove(request); 269 } 270 271 if (request.shouldCache()) { 272 synchronized (mWaitingRequests) { 273 String cacheKey = request.getCacheKey(); 274 Queue<Request> waitingRequests = mWaitingRequests.remove(cacheKey); 275 if (waitingRequests != null) { 276 if (VolleyLog.DEBUG) { 277 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.", 278 waitingRequests.size(), cacheKey); 279 } 280 // Process all queued up requests. They won't be considered as in flight, but 281 // that's not a problem as the cache has been primed by 'request'. 282 mCacheQueue.addAll(waitingRequests); 283 } 284 } 285 } 286 } 287} 288