1/* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.volley; 18 19import android.os.Handler; 20import android.os.Looper; 21 22import java.util.HashMap; 23import java.util.HashSet; 24import java.util.LinkedList; 25import java.util.Map; 26import java.util.Queue; 27import java.util.Set; 28import java.util.concurrent.PriorityBlockingQueue; 29import java.util.concurrent.atomic.AtomicInteger; 30 31/** 32 * A request dispatch queue with a thread pool of dispatchers. 33 * 34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch, 35 * resolving from either cache or network on a worker thread, and then delivering 36 * a parsed response on the main thread. 37 */ 38@SuppressWarnings("rawtypes") 39public class RequestQueue { 40 41 /** Used for generating monotonically-increasing sequence numbers for requests. */ 42 private AtomicInteger mSequenceGenerator = new AtomicInteger(); 43 44 /** 45 * Staging area for requests that already have a duplicate request in flight. 46 * 47 * <ul> 48 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache 49 * key.</li> 50 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request 51 * is <em>not</em> contained in that list. Is null if no requests are staged.</li> 52 * </ul> 53 */ 54 private final Map<String, Queue<Request>> mWaitingRequests = 55 new HashMap<String, Queue<Request>>(); 56 57 /** 58 * The set of all requests currently being processed by this RequestQueue. A Request 59 * will be in this set if it is waiting in any queue or currently being processed by 60 * any dispatcher. 61 */ 62 private final Set<Request> mCurrentRequests = new HashSet<Request>(); 63 64 /** The cache triage queue. */ 65 private final PriorityBlockingQueue<Request> mCacheQueue = 66 new PriorityBlockingQueue<Request>(); 67 68 /** The queue of requests that are actually going out to the network. */ 69 private final PriorityBlockingQueue<Request> mNetworkQueue = 70 new PriorityBlockingQueue<Request>(); 71 72 /** Number of network request dispatcher threads to start. */ 73 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4; 74 75 /** Cache interface for retrieving and storing respones. */ 76 private final Cache mCache; 77 78 /** Network interface for performing requests. */ 79 private final Network mNetwork; 80 81 /** Response delivery mechanism. */ 82 private final ResponseDelivery mDelivery; 83 84 /** The network dispatchers. */ 85 private NetworkDispatcher[] mDispatchers; 86 87 /** The cache dispatcher. */ 88 private CacheDispatcher mCacheDispatcher; 89 90 /** 91 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 92 * 93 * @param cache A Cache to use for persisting responses to disk 94 * @param network A Network interface for performing HTTP requests 95 * @param threadPoolSize Number of network dispatcher threads to create 96 * @param delivery A ResponseDelivery interface for posting responses and errors 97 */ 98 public RequestQueue(Cache cache, Network network, int threadPoolSize, 99 ResponseDelivery delivery) { 100 mCache = cache; 101 mNetwork = network; 102 mDispatchers = new NetworkDispatcher[threadPoolSize]; 103 mDelivery = delivery; 104 } 105 106 /** 107 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 108 * 109 * @param cache A Cache to use for persisting responses to disk 110 * @param network A Network interface for performing HTTP requests 111 * @param threadPoolSize Number of network dispatcher threads to create 112 */ 113 public RequestQueue(Cache cache, Network network, int threadPoolSize) { 114 this(cache, network, threadPoolSize, 115 new ExecutorDelivery(new Handler(Looper.getMainLooper()))); 116 } 117 118 /** 119 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 120 * 121 * @param cache A Cache to use for persisting responses to disk 122 * @param network A Network interface for performing HTTP requests 123 */ 124 public RequestQueue(Cache cache, Network network) { 125 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE); 126 } 127 128 /** 129 * Starts the dispatchers in this queue. 130 */ 131 public void start() { 132 stop(); // Make sure any currently running dispatchers are stopped. 133 // Create the cache dispatcher and start it. 134 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); 135 mCacheDispatcher.start(); 136 137 // Create network dispatchers (and corresponding threads) up to the pool size. 138 for (int i = 0; i < mDispatchers.length; i++) { 139 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, 140 mCache, mDelivery); 141 mDispatchers[i] = networkDispatcher; 142 networkDispatcher.start(); 143 } 144 } 145 146 /** 147 * Stops the cache and network dispatchers. 148 */ 149 public void stop() { 150 if (mCacheDispatcher != null) { 151 mCacheDispatcher.quit(); 152 } 153 for (int i = 0; i < mDispatchers.length; i++) { 154 if (mDispatchers[i] != null) { 155 mDispatchers[i].quit(); 156 } 157 } 158 } 159 160 /** 161 * Gets a sequence number. 162 */ 163 public int getSequenceNumber() { 164 return mSequenceGenerator.incrementAndGet(); 165 } 166 167 /** 168 * A simple predicate or filter interface for Requests, for use by 169 * {@link RequestQueue#cancelAll(RequestFilter)}. 170 */ 171 public interface RequestFilter { 172 public boolean apply(Request<?> request); 173 } 174 175 /** 176 * Cancels all requests in this queue for which the given filter applies. 177 * @param filter The filtering function to use 178 */ 179 public void cancelAll(RequestFilter filter) { 180 synchronized (mCurrentRequests) { 181 for (Request<?> request : mCurrentRequests) { 182 if (filter.apply(request)) { 183 request.cancel(); 184 } 185 } 186 } 187 } 188 189 /** 190 * Cancels all requests in this queue with the given tag. Tag must be non-null 191 * and equality is by identity. 192 */ 193 public void cancelAll(final Object tag) { 194 if (tag == null) { 195 throw new IllegalArgumentException("Cannot cancelAll with a null tag"); 196 } 197 cancelAll(new RequestFilter() { 198 @Override 199 public boolean apply(Request<?> request) { 200 return request.getTag() == tag; 201 } 202 }); 203 } 204 205 /** 206 * Adds a Request to the dispatch queue. 207 * @param request The request to service 208 * @return The passed-in request 209 */ 210 public Request add(Request request) { 211 // Tag the request as belonging to this queue and add it to the set of current requests. 212 request.setRequestQueue(this); 213 synchronized (mCurrentRequests) { 214 mCurrentRequests.add(request); 215 } 216 217 // Process requests in the order they are added. 218 request.setSequence(getSequenceNumber()); 219 request.addMarker("add-to-queue"); 220 221 // If the request is uncacheable, skip the cache queue and go straight to the network. 222 if (!request.shouldCache()) { 223 mNetworkQueue.add(request); 224 return request; 225 } 226 227 // Insert request into stage if there's already a request with the same cache key in flight. 228 synchronized (mWaitingRequests) { 229 String cacheKey = request.getCacheKey(); 230 if (mWaitingRequests.containsKey(cacheKey)) { 231 // There is already a request in flight. Queue up. 232 Queue<Request> stagedRequests = mWaitingRequests.get(cacheKey); 233 if (stagedRequests == null) { 234 stagedRequests = new LinkedList<Request>(); 235 } 236 stagedRequests.add(request); 237 mWaitingRequests.put(cacheKey, stagedRequests); 238 if (VolleyLog.DEBUG) { 239 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); 240 } 241 } else { 242 // Insert 'null' queue for this cacheKey, indicating there is now a request in 243 // flight. 244 mWaitingRequests.put(cacheKey, null); 245 mCacheQueue.add(request); 246 } 247 return request; 248 } 249 } 250 251 /** 252 * Called from {@link Request#finish(String)}, indicating that processing of the given request 253 * has finished. 254 * 255 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if 256 * <code>request.shouldCache()</code>.</p> 257 */ 258 void finish(Request request) { 259 // Remove from the set of requests currently being processed. 260 synchronized (mCurrentRequests) { 261 mCurrentRequests.remove(request); 262 } 263 264 if (request.shouldCache()) { 265 synchronized (mWaitingRequests) { 266 String cacheKey = request.getCacheKey(); 267 Queue<Request> waitingRequests = mWaitingRequests.remove(cacheKey); 268 if (waitingRequests != null) { 269 if (VolleyLog.DEBUG) { 270 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.", 271 waitingRequests.size(), cacheKey); 272 } 273 // Process all queued up requests. They won't be considered as in flight, but 274 // that's not a problem as the cache has been primed by 'request'. 275 mCacheQueue.addAll(waitingRequests); 276 } 277 } 278 } 279 } 280} 281