multi_threaded_proxy_resolver.cc revision 513209b27ff55e2841eac0e4120199c23acce758
1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/proxy/multi_threaded_proxy_resolver.h" 6 7#include "base/message_loop.h" 8#include "base/string_util.h" 9#include "base/stringprintf.h" 10#include "base/thread.h" 11#include "net/base/capturing_net_log.h" 12#include "net/base/net_errors.h" 13#include "net/proxy/proxy_info.h" 14 15// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script 16// data when SetPacScript fails. That will reclaim memory when 17// testing bogus scripts. 18 19namespace net { 20 21namespace { 22 23class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { 24 public: 25 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} 26 void PurgeMemory() { resolver_->PurgeMemory(); } 27 private: 28 friend class base::RefCountedThreadSafe<PurgeMemoryTask>; 29 ~PurgeMemoryTask() {} 30 ProxyResolver* resolver_; 31}; 32 33} // namespace 34 35// An "executor" is a job-runner for PAC requests. It encapsulates a worker 36// thread and a synchronous ProxyResolver (which will be operated on said 37// thread.) 38class MultiThreadedProxyResolver::Executor 39 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { 40 public: 41 // |coordinator| must remain valid throughout our lifetime. It is used to 42 // signal when the executor is ready to receive work by calling 43 // |coordinator->OnExecutorReady()|. 44 // The constructor takes ownership of |resolver|. 45 // |thread_number| is an identifier used when naming the worker thread. 46 Executor(MultiThreadedProxyResolver* coordinator, 47 ProxyResolver* resolver, 48 int thread_number); 49 50 // Submit a job to this executor. 51 void StartJob(Job* job); 52 53 // Callback for when a job has completed running on the executor's thread. 54 void OnJobCompleted(Job* job); 55 56 // Cleanup the executor. Cancels all outstanding work, and frees the thread 57 // and resolver. 58 void Destroy(); 59 60 void PurgeMemory(); 61 62 // Returns the outstanding job, or NULL. 63 Job* outstanding_job() const { return outstanding_job_.get(); } 64 65 ProxyResolver* resolver() { return resolver_.get(); } 66 67 int thread_number() const { return thread_number_; } 68 69 private: 70 friend class base::RefCountedThreadSafe<Executor>; 71 ~Executor(); 72 73 MultiThreadedProxyResolver* coordinator_; 74 const int thread_number_; 75 76 // The currently active job for this executor (either a SetPacScript or 77 // GetProxyForURL task). 78 scoped_refptr<Job> outstanding_job_; 79 80 // The synchronous resolver implementation. 81 scoped_ptr<ProxyResolver> resolver_; 82 83 // The thread where |resolver_| is run on. 84 // Note that declaration ordering is important here. |thread_| needs to be 85 // destroyed *before* |resolver_|, in case |resolver_| is currently 86 // executing on |thread_|. 87 scoped_ptr<base::Thread> thread_; 88}; 89 90// MultiThreadedProxyResolver::Job --------------------------------------------- 91 92class MultiThreadedProxyResolver::Job 93 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { 94 public: 95 // Identifies the subclass of Job (only being used for debugging purposes). 96 enum Type { 97 TYPE_GET_PROXY_FOR_URL, 98 TYPE_SET_PAC_SCRIPT, 99 TYPE_SET_PAC_SCRIPT_INTERNAL, 100 }; 101 102 Job(Type type, CompletionCallback* user_callback) 103 : type_(type), 104 user_callback_(user_callback), 105 executor_(NULL), 106 was_cancelled_(false) { 107 } 108 109 void set_executor(Executor* executor) { 110 executor_ = executor; 111 } 112 113 // The "executor" is the job runner that is scheduling this job. If 114 // this job has not been submitted to an executor yet, this will be 115 // NULL (and we know it hasn't started yet). 116 Executor* executor() { 117 return executor_; 118 } 119 120 // Mark the job as having been cancelled. 121 void Cancel() { 122 was_cancelled_ = true; 123 } 124 125 // Returns true if Cancel() has been called. 126 bool was_cancelled() const { return was_cancelled_; } 127 128 Type type() const { return type_; } 129 130 // Returns true if this job still has a user callback. Some jobs 131 // do not have a user callback, because they were helper jobs 132 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). 133 // 134 // Otherwise jobs that correspond with user-initiated work will 135 // have a non-NULL callback up until the callback is run. 136 bool has_user_callback() const { return user_callback_ != NULL; } 137 138 // This method is called when the job is inserted into a wait queue 139 // because no executors were ready to accept it. 140 virtual void WaitingForThread() {} 141 142 // This method is called just before the job is posted to the work thread. 143 virtual void FinishedWaitingForThread() {} 144 145 // This method is called on the worker thread to do the job's work. On 146 // completion, implementors are expected to call OnJobCompleted() on 147 // |origin_loop|. 148 virtual void Run(MessageLoop* origin_loop) = 0; 149 150 protected: 151 void OnJobCompleted() { 152 // |executor_| will be NULL if the executor has already been deleted. 153 if (executor_) 154 executor_->OnJobCompleted(this); 155 } 156 157 void RunUserCallback(int result) { 158 DCHECK(has_user_callback()); 159 CompletionCallback* callback = user_callback_; 160 // Null the callback so has_user_callback() will now return false. 161 user_callback_ = NULL; 162 callback->Run(result); 163 } 164 165 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; 166 167 virtual ~Job() {} 168 169 private: 170 const Type type_; 171 CompletionCallback* user_callback_; 172 Executor* executor_; 173 bool was_cancelled_; 174}; 175 176// MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- 177 178// Runs on the worker thread to call ProxyResolver::SetPacScript. 179class MultiThreadedProxyResolver::SetPacScriptJob 180 : public MultiThreadedProxyResolver::Job { 181 public: 182 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, 183 CompletionCallback* callback) 184 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, 185 callback), 186 script_data_(script_data) { 187 } 188 189 // Runs on the worker thread. 190 virtual void Run(MessageLoop* origin_loop) { 191 ProxyResolver* resolver = executor()->resolver(); 192 int rv = resolver->SetPacScript(script_data_, NULL); 193 194 DCHECK_NE(rv, ERR_IO_PENDING); 195 origin_loop->PostTask( 196 FROM_HERE, 197 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); 198 } 199 200 private: 201 // Runs the completion callback on the origin thread. 202 void RequestComplete(int result_code) { 203 // The task may have been cancelled after it was started. 204 if (!was_cancelled() && has_user_callback()) { 205 RunUserCallback(result_code); 206 } 207 OnJobCompleted(); 208 } 209 210 const scoped_refptr<ProxyResolverScriptData> script_data_; 211}; 212 213// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ 214 215class MultiThreadedProxyResolver::GetProxyForURLJob 216 : public MultiThreadedProxyResolver::Job { 217 public: 218 // |url| -- the URL of the query. 219 // |results| -- the structure to fill with proxy resolve results. 220 GetProxyForURLJob(const GURL& url, 221 ProxyInfo* results, 222 CompletionCallback* callback, 223 const BoundNetLog& net_log) 224 : Job(TYPE_GET_PROXY_FOR_URL, callback), 225 results_(results), 226 net_log_(net_log), 227 url_(url), 228 was_waiting_for_thread_(false) { 229 DCHECK(callback); 230 } 231 232 BoundNetLog* net_log() { return &net_log_; } 233 234 virtual void WaitingForThread() { 235 was_waiting_for_thread_ = true; 236 net_log_.BeginEvent( 237 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); 238 } 239 240 virtual void FinishedWaitingForThread() { 241 DCHECK(executor()); 242 243 if (was_waiting_for_thread_) { 244 net_log_.EndEvent( 245 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); 246 } 247 248 net_log_.AddEvent( 249 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, 250 make_scoped_refptr(new NetLogIntegerParameter( 251 "thread_number", executor()->thread_number()))); 252 } 253 254 // Runs on the worker thread. 255 virtual void Run(MessageLoop* origin_loop) { 256 const size_t kNetLogBound = 50u; 257 worker_log_.reset(new CapturingNetLog(kNetLogBound)); 258 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); 259 260 ProxyResolver* resolver = executor()->resolver(); 261 int rv = resolver->GetProxyForURL( 262 url_, &results_buf_, NULL, NULL, bound_worker_log); 263 DCHECK_NE(rv, ERR_IO_PENDING); 264 265 origin_loop->PostTask( 266 FROM_HERE, 267 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); 268 } 269 270 private: 271 // Runs the completion callback on the origin thread. 272 void QueryComplete(int result_code) { 273 // The Job may have been cancelled after it was started. 274 if (!was_cancelled()) { 275 // Merge the load log that was generated on the worker thread, into the 276 // main log. 277 CapturingBoundNetLog bound_worker_log(NetLog::Source(), 278 worker_log_.release()); 279 bound_worker_log.AppendTo(net_log_); 280 281 if (result_code >= OK) { // Note: unit-tests use values > 0. 282 results_->Use(results_buf_); 283 } 284 RunUserCallback(result_code); 285 } 286 OnJobCompleted(); 287 } 288 289 // Must only be used on the "origin" thread. 290 ProxyInfo* results_; 291 BoundNetLog net_log_; 292 const GURL url_; 293 294 // Usable from within DoQuery on the worker thread. 295 ProxyInfo results_buf_; 296 297 // Used to pass the captured events between DoQuery [worker thread] and 298 // QueryComplete [origin thread]. 299 scoped_ptr<CapturingNetLog> worker_log_; 300 301 bool was_waiting_for_thread_; 302}; 303 304// MultiThreadedProxyResolver::Executor ---------------------------------------- 305 306MultiThreadedProxyResolver::Executor::Executor( 307 MultiThreadedProxyResolver* coordinator, 308 ProxyResolver* resolver, 309 int thread_number) 310 : coordinator_(coordinator), 311 thread_number_(thread_number), 312 resolver_(resolver) { 313 DCHECK(coordinator); 314 DCHECK(resolver); 315 // Start up the thread. 316 // Note that it is safe to pass a temporary C-String to Thread(), as it will 317 // make a copy. 318 std::string thread_name = 319 base::StringPrintf("PAC thread #%d", thread_number); 320 thread_.reset(new base::Thread(thread_name.c_str())); 321 thread_->Start(); 322} 323 324void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { 325 DCHECK(!outstanding_job_); 326 outstanding_job_ = job; 327 328 // Run the job. Once it has completed (regardless of whether it was 329 // cancelled), it will invoke OnJobCompleted() on this thread. 330 job->set_executor(this); 331 job->FinishedWaitingForThread(); 332 thread_->message_loop()->PostTask( 333 FROM_HERE, 334 NewRunnableMethod(job, &Job::Run, MessageLoop::current())); 335} 336 337void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { 338 DCHECK_EQ(job, outstanding_job_.get()); 339 outstanding_job_ = NULL; 340 coordinator_->OnExecutorReady(this); 341} 342 343void MultiThreadedProxyResolver::Executor::Destroy() { 344 DCHECK(coordinator_); 345 346 // Give the resolver an opportunity to shutdown from THIS THREAD before 347 // joining on the resolver thread. This allows certain implementations 348 // to avoid deadlocks. 349 resolver_->Shutdown(); 350 351 // Join the worker thread. 352 thread_.reset(); 353 354 // Cancel any outstanding job. 355 if (outstanding_job_) { 356 outstanding_job_->Cancel(); 357 // Orphan the job (since this executor may be deleted soon). 358 outstanding_job_->set_executor(NULL); 359 } 360 361 // It is now safe to free the ProxyResolver, since all the tasks that 362 // were using it on the resolver thread have completed. 363 resolver_.reset(); 364 365 // Null some stuff as a precaution. 366 coordinator_ = NULL; 367 outstanding_job_ = NULL; 368} 369 370void MultiThreadedProxyResolver::Executor::PurgeMemory() { 371 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); 372 thread_->message_loop()->PostTask( 373 FROM_HERE, 374 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); 375} 376 377MultiThreadedProxyResolver::Executor::~Executor() { 378 // The important cleanup happens as part of Destroy(), which should always be 379 // called first. 380 DCHECK(!coordinator_) << "Destroy() was not called"; 381 DCHECK(!thread_.get()); 382 DCHECK(!resolver_.get()); 383 DCHECK(!outstanding_job_); 384} 385 386// MultiThreadedProxyResolver -------------------------------------------------- 387 388MultiThreadedProxyResolver::MultiThreadedProxyResolver( 389 ProxyResolverFactory* resolver_factory, 390 size_t max_num_threads) 391 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), 392 resolver_factory_(resolver_factory), 393 max_num_threads_(max_num_threads) { 394 DCHECK_GE(max_num_threads, 1u); 395} 396 397MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { 398 // We will cancel all outstanding requests. 399 pending_jobs_.clear(); 400 ReleaseAllExecutors(); 401} 402 403int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, 404 ProxyInfo* results, 405 CompletionCallback* callback, 406 RequestHandle* request, 407 const BoundNetLog& net_log) { 408 DCHECK(CalledOnValidThread()); 409 DCHECK(callback); 410 DCHECK(current_script_data_.get()) 411 << "Resolver is un-initialized. Must call SetPacScript() first!"; 412 413 scoped_refptr<GetProxyForURLJob> job( 414 new GetProxyForURLJob(url, results, callback, net_log)); 415 416 // Completion will be notified through |callback|, unless the caller cancels 417 // the request using |request|. 418 if (request) 419 *request = reinterpret_cast<RequestHandle>(job.get()); 420 421 // If there is an executor that is ready to run this request, submit it! 422 Executor* executor = FindIdleExecutor(); 423 if (executor) { 424 DCHECK_EQ(0u, pending_jobs_.size()); 425 executor->StartJob(job); 426 return ERR_IO_PENDING; 427 } 428 429 // Otherwise queue this request. (We will schedule it to a thread once one 430 // becomes available). 431 job->WaitingForThread(); 432 pending_jobs_.push_back(job); 433 434 // If we haven't already reached the thread limit, provision a new thread to 435 // drain the requests more quickly. 436 if (executors_.size() < max_num_threads_) { 437 executor = AddNewExecutor(); 438 executor->StartJob( 439 new SetPacScriptJob(current_script_data_, NULL)); 440 } 441 442 return ERR_IO_PENDING; 443} 444 445void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { 446 DCHECK(CalledOnValidThread()); 447 DCHECK(req); 448 449 Job* job = reinterpret_cast<Job*>(req); 450 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); 451 452 if (job->executor()) { 453 // If the job was already submitted to the executor, just mark it 454 // as cancelled so the user callback isn't run on completion. 455 job->Cancel(); 456 } else { 457 // Otherwise the job is just sitting in a queue. 458 PendingJobsQueue::iterator it = 459 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); 460 DCHECK(it != pending_jobs_.end()); 461 pending_jobs_.erase(it); 462 } 463} 464 465void MultiThreadedProxyResolver::CancelSetPacScript() { 466 DCHECK(CalledOnValidThread()); 467 DCHECK_EQ(0u, pending_jobs_.size()); 468 DCHECK_EQ(1u, executors_.size()); 469 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, 470 executors_[0]->outstanding_job()->type()); 471 472 // Defensively clear some data which shouldn't be getting used 473 // anymore. 474 current_script_data_ = NULL; 475 476 ReleaseAllExecutors(); 477} 478 479void MultiThreadedProxyResolver::PurgeMemory() { 480 DCHECK(CalledOnValidThread()); 481 for (ExecutorList::iterator it = executors_.begin(); 482 it != executors_.end(); ++it) { 483 Executor* executor = *it; 484 executor->PurgeMemory(); 485 } 486} 487 488int MultiThreadedProxyResolver::SetPacScript( 489 const scoped_refptr<ProxyResolverScriptData>& script_data, 490 CompletionCallback* callback) { 491 DCHECK(CalledOnValidThread()); 492 DCHECK(callback); 493 494 // Save the script details, so we can provision new executors later. 495 current_script_data_ = script_data; 496 497 // The user should not have any outstanding requests when they call 498 // SetPacScript(). 499 CheckNoOutstandingUserRequests(); 500 501 // Destroy all of the current threads and their proxy resolvers. 502 ReleaseAllExecutors(); 503 504 // Provision a new executor, and run the SetPacScript request. On completion 505 // notification will be sent through |callback|. 506 Executor* executor = AddNewExecutor(); 507 executor->StartJob(new SetPacScriptJob(script_data, callback)); 508 return ERR_IO_PENDING; 509} 510 511void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { 512 DCHECK(CalledOnValidThread()); 513 CHECK_EQ(0u, pending_jobs_.size()); 514 515 for (ExecutorList::const_iterator it = executors_.begin(); 516 it != executors_.end(); ++it) { 517 const Executor* executor = *it; 518 Job* job = executor->outstanding_job(); 519 // The "has_user_callback()" is to exclude jobs for which the callback 520 // has already been invoked, or was not user-initiated (as in the case of 521 // lazy thread provisions). User-initiated jobs may !has_user_callback() 522 // when the callback has already been run. (Since we only clear the 523 // outstanding job AFTER the callback has been invoked, it is possible 524 // for a new request to be started from within the callback). 525 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); 526 } 527} 528 529void MultiThreadedProxyResolver::ReleaseAllExecutors() { 530 DCHECK(CalledOnValidThread()); 531 for (ExecutorList::iterator it = executors_.begin(); 532 it != executors_.end(); ++it) { 533 Executor* executor = *it; 534 executor->Destroy(); 535 } 536 executors_.clear(); 537} 538 539MultiThreadedProxyResolver::Executor* 540MultiThreadedProxyResolver::FindIdleExecutor() { 541 DCHECK(CalledOnValidThread()); 542 for (ExecutorList::iterator it = executors_.begin(); 543 it != executors_.end(); ++it) { 544 Executor* executor = *it; 545 if (!executor->outstanding_job()) 546 return executor; 547 } 548 return NULL; 549} 550 551MultiThreadedProxyResolver::Executor* 552MultiThreadedProxyResolver::AddNewExecutor() { 553 DCHECK(CalledOnValidThread()); 554 DCHECK_LT(executors_.size(), max_num_threads_); 555 // The "thread number" is used to give the thread a unique name. 556 int thread_number = executors_.size(); 557 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); 558 Executor* executor = new Executor( 559 this, resolver, thread_number); 560 executors_.push_back(make_scoped_refptr(executor)); 561 return executor; 562} 563 564void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { 565 DCHECK(CalledOnValidThread()); 566 if (pending_jobs_.empty()) 567 return; 568 569 // Get the next job to process (FIFO). Transfer it from the pending queue 570 // to the executor. 571 scoped_refptr<Job> job = pending_jobs_.front(); 572 pending_jobs_.pop_front(); 573 executor->StartJob(job); 574} 575 576} // namespace net 577