multi_threaded_proxy_resolver.cc revision 513209b27ff55e2841eac0e4120199c23acce758
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/proxy/multi_threaded_proxy_resolver.h"
6
7#include "base/message_loop.h"
8#include "base/string_util.h"
9#include "base/stringprintf.h"
10#include "base/thread.h"
11#include "net/base/capturing_net_log.h"
12#include "net/base/net_errors.h"
13#include "net/proxy/proxy_info.h"
14
15// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
16//               data when SetPacScript fails. That will reclaim memory when
17//               testing bogus scripts.
18
19namespace net {
20
21namespace {
22
23class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
24 public:
25  explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
26  void PurgeMemory() { resolver_->PurgeMemory(); }
27 private:
28  friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
29  ~PurgeMemoryTask() {}
30  ProxyResolver* resolver_;
31};
32
33}  // namespace
34
35// An "executor" is a job-runner for PAC requests. It encapsulates a worker
36// thread and a synchronous ProxyResolver (which will be operated on said
37// thread.)
38class MultiThreadedProxyResolver::Executor
39    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
40 public:
41  // |coordinator| must remain valid throughout our lifetime. It is used to
42  // signal when the executor is ready to receive work by calling
43  // |coordinator->OnExecutorReady()|.
44  // The constructor takes ownership of |resolver|.
45  // |thread_number| is an identifier used when naming the worker thread.
46  Executor(MultiThreadedProxyResolver* coordinator,
47           ProxyResolver* resolver,
48           int thread_number);
49
50  // Submit a job to this executor.
51  void StartJob(Job* job);
52
53  // Callback for when a job has completed running on the executor's thread.
54  void OnJobCompleted(Job* job);
55
56  // Cleanup the executor. Cancels all outstanding work, and frees the thread
57  // and resolver.
58  void Destroy();
59
60  void PurgeMemory();
61
62  // Returns the outstanding job, or NULL.
63  Job* outstanding_job() const { return outstanding_job_.get(); }
64
65  ProxyResolver* resolver() { return resolver_.get(); }
66
67  int thread_number() const { return thread_number_; }
68
69 private:
70  friend class base::RefCountedThreadSafe<Executor>;
71  ~Executor();
72
73  MultiThreadedProxyResolver* coordinator_;
74  const int thread_number_;
75
76  // The currently active job for this executor (either a SetPacScript or
77  // GetProxyForURL task).
78  scoped_refptr<Job> outstanding_job_;
79
80  // The synchronous resolver implementation.
81  scoped_ptr<ProxyResolver> resolver_;
82
83  // The thread where |resolver_| is run on.
84  // Note that declaration ordering is important here. |thread_| needs to be
85  // destroyed *before* |resolver_|, in case |resolver_| is currently
86  // executing on |thread_|.
87  scoped_ptr<base::Thread> thread_;
88};
89
90// MultiThreadedProxyResolver::Job ---------------------------------------------
91
92class MultiThreadedProxyResolver::Job
93    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
94 public:
95  // Identifies the subclass of Job (only being used for debugging purposes).
96  enum Type {
97    TYPE_GET_PROXY_FOR_URL,
98    TYPE_SET_PAC_SCRIPT,
99    TYPE_SET_PAC_SCRIPT_INTERNAL,
100  };
101
102  Job(Type type, CompletionCallback* user_callback)
103      : type_(type),
104        user_callback_(user_callback),
105        executor_(NULL),
106        was_cancelled_(false) {
107  }
108
109  void set_executor(Executor* executor) {
110    executor_ = executor;
111  }
112
113  // The "executor" is the job runner that is scheduling this job. If
114  // this job has not been submitted to an executor yet, this will be
115  // NULL (and we know it hasn't started yet).
116  Executor* executor() {
117    return executor_;
118  }
119
120  // Mark the job as having been cancelled.
121  void Cancel() {
122    was_cancelled_ = true;
123  }
124
125  // Returns true if Cancel() has been called.
126  bool was_cancelled() const { return was_cancelled_; }
127
128  Type type() const { return type_; }
129
130  // Returns true if this job still has a user callback. Some jobs
131  // do not have a user callback, because they were helper jobs
132  // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
133  //
134  // Otherwise jobs that correspond with user-initiated work will
135  // have a non-NULL callback up until the callback is run.
136  bool has_user_callback() const { return user_callback_ != NULL; }
137
138  // This method is called when the job is inserted into a wait queue
139  // because no executors were ready to accept it.
140  virtual void WaitingForThread() {}
141
142  // This method is called just before the job is posted to the work thread.
143  virtual void FinishedWaitingForThread() {}
144
145  // This method is called on the worker thread to do the job's work. On
146  // completion, implementors are expected to call OnJobCompleted() on
147  // |origin_loop|.
148  virtual void Run(MessageLoop* origin_loop) = 0;
149
150 protected:
151  void OnJobCompleted() {
152    // |executor_| will be NULL if the executor has already been deleted.
153    if (executor_)
154      executor_->OnJobCompleted(this);
155  }
156
157  void RunUserCallback(int result) {
158    DCHECK(has_user_callback());
159    CompletionCallback* callback = user_callback_;
160    // Null the callback so has_user_callback() will now return false.
161    user_callback_ = NULL;
162    callback->Run(result);
163  }
164
165  friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
166
167  virtual ~Job() {}
168
169 private:
170  const Type type_;
171  CompletionCallback* user_callback_;
172  Executor* executor_;
173  bool was_cancelled_;
174};
175
176// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
177
178// Runs on the worker thread to call ProxyResolver::SetPacScript.
179class MultiThreadedProxyResolver::SetPacScriptJob
180    : public MultiThreadedProxyResolver::Job {
181 public:
182  SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
183                  CompletionCallback* callback)
184    : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
185          callback),
186      script_data_(script_data) {
187  }
188
189  // Runs on the worker thread.
190  virtual void Run(MessageLoop* origin_loop) {
191    ProxyResolver* resolver = executor()->resolver();
192    int rv = resolver->SetPacScript(script_data_, NULL);
193
194    DCHECK_NE(rv, ERR_IO_PENDING);
195    origin_loop->PostTask(
196        FROM_HERE,
197        NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
198  }
199
200 private:
201  // Runs the completion callback on the origin thread.
202  void RequestComplete(int result_code) {
203    // The task may have been cancelled after it was started.
204    if (!was_cancelled() && has_user_callback()) {
205      RunUserCallback(result_code);
206    }
207    OnJobCompleted();
208  }
209
210  const scoped_refptr<ProxyResolverScriptData> script_data_;
211};
212
213// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
214
215class MultiThreadedProxyResolver::GetProxyForURLJob
216    : public MultiThreadedProxyResolver::Job {
217 public:
218  // |url|         -- the URL of the query.
219  // |results|     -- the structure to fill with proxy resolve results.
220  GetProxyForURLJob(const GURL& url,
221                    ProxyInfo* results,
222                    CompletionCallback* callback,
223                    const BoundNetLog& net_log)
224      : Job(TYPE_GET_PROXY_FOR_URL, callback),
225        results_(results),
226        net_log_(net_log),
227        url_(url),
228        was_waiting_for_thread_(false) {
229    DCHECK(callback);
230  }
231
232  BoundNetLog* net_log() { return &net_log_; }
233
234  virtual void WaitingForThread() {
235    was_waiting_for_thread_ = true;
236    net_log_.BeginEvent(
237        NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
238  }
239
240  virtual void FinishedWaitingForThread() {
241    DCHECK(executor());
242
243    if (was_waiting_for_thread_) {
244      net_log_.EndEvent(
245          NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
246    }
247
248    net_log_.AddEvent(
249        NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
250        make_scoped_refptr(new NetLogIntegerParameter(
251            "thread_number", executor()->thread_number())));
252  }
253
254  // Runs on the worker thread.
255  virtual void Run(MessageLoop* origin_loop) {
256    const size_t kNetLogBound = 50u;
257    worker_log_.reset(new CapturingNetLog(kNetLogBound));
258    BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
259
260    ProxyResolver* resolver = executor()->resolver();
261    int rv = resolver->GetProxyForURL(
262        url_, &results_buf_, NULL, NULL, bound_worker_log);
263    DCHECK_NE(rv, ERR_IO_PENDING);
264
265    origin_loop->PostTask(
266        FROM_HERE,
267        NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
268  }
269
270 private:
271  // Runs the completion callback on the origin thread.
272  void QueryComplete(int result_code) {
273    // The Job may have been cancelled after it was started.
274    if (!was_cancelled()) {
275      // Merge the load log that was generated on the worker thread, into the
276      // main log.
277      CapturingBoundNetLog bound_worker_log(NetLog::Source(),
278                                            worker_log_.release());
279      bound_worker_log.AppendTo(net_log_);
280
281      if (result_code >= OK) {  // Note: unit-tests use values > 0.
282        results_->Use(results_buf_);
283      }
284      RunUserCallback(result_code);
285    }
286    OnJobCompleted();
287  }
288
289  // Must only be used on the "origin" thread.
290  ProxyInfo* results_;
291  BoundNetLog net_log_;
292  const GURL url_;
293
294  // Usable from within DoQuery on the worker thread.
295  ProxyInfo results_buf_;
296
297  // Used to pass the captured events between DoQuery [worker thread] and
298  // QueryComplete [origin thread].
299  scoped_ptr<CapturingNetLog> worker_log_;
300
301  bool was_waiting_for_thread_;
302};
303
304// MultiThreadedProxyResolver::Executor ----------------------------------------
305
306MultiThreadedProxyResolver::Executor::Executor(
307    MultiThreadedProxyResolver* coordinator,
308    ProxyResolver* resolver,
309    int thread_number)
310    : coordinator_(coordinator),
311      thread_number_(thread_number),
312      resolver_(resolver) {
313  DCHECK(coordinator);
314  DCHECK(resolver);
315  // Start up the thread.
316  // Note that it is safe to pass a temporary C-String to Thread(), as it will
317  // make a copy.
318  std::string thread_name =
319      base::StringPrintf("PAC thread #%d", thread_number);
320  thread_.reset(new base::Thread(thread_name.c_str()));
321  thread_->Start();
322}
323
324void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
325  DCHECK(!outstanding_job_);
326  outstanding_job_ = job;
327
328  // Run the job. Once it has completed (regardless of whether it was
329  // cancelled), it will invoke OnJobCompleted() on this thread.
330  job->set_executor(this);
331  job->FinishedWaitingForThread();
332  thread_->message_loop()->PostTask(
333      FROM_HERE,
334      NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
335}
336
337void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
338  DCHECK_EQ(job, outstanding_job_.get());
339  outstanding_job_ = NULL;
340  coordinator_->OnExecutorReady(this);
341}
342
343void MultiThreadedProxyResolver::Executor::Destroy() {
344  DCHECK(coordinator_);
345
346  // Give the resolver an opportunity to shutdown from THIS THREAD before
347  // joining on the resolver thread. This allows certain implementations
348  // to avoid deadlocks.
349  resolver_->Shutdown();
350
351  // Join the worker thread.
352  thread_.reset();
353
354  // Cancel any outstanding job.
355  if (outstanding_job_) {
356    outstanding_job_->Cancel();
357    // Orphan the job (since this executor may be deleted soon).
358    outstanding_job_->set_executor(NULL);
359  }
360
361  // It is now safe to free the ProxyResolver, since all the tasks that
362  // were using it on the resolver thread have completed.
363  resolver_.reset();
364
365  // Null some stuff as a precaution.
366  coordinator_ = NULL;
367  outstanding_job_ = NULL;
368}
369
370void MultiThreadedProxyResolver::Executor::PurgeMemory() {
371  scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
372  thread_->message_loop()->PostTask(
373      FROM_HERE,
374      NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
375}
376
377MultiThreadedProxyResolver::Executor::~Executor() {
378  // The important cleanup happens as part of Destroy(), which should always be
379  // called first.
380  DCHECK(!coordinator_) << "Destroy() was not called";
381  DCHECK(!thread_.get());
382  DCHECK(!resolver_.get());
383  DCHECK(!outstanding_job_);
384}
385
386// MultiThreadedProxyResolver --------------------------------------------------
387
388MultiThreadedProxyResolver::MultiThreadedProxyResolver(
389    ProxyResolverFactory* resolver_factory,
390    size_t max_num_threads)
391    : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
392      resolver_factory_(resolver_factory),
393      max_num_threads_(max_num_threads) {
394  DCHECK_GE(max_num_threads, 1u);
395}
396
397MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
398  // We will cancel all outstanding requests.
399  pending_jobs_.clear();
400  ReleaseAllExecutors();
401}
402
403int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
404                                               ProxyInfo* results,
405                                               CompletionCallback* callback,
406                                               RequestHandle* request,
407                                               const BoundNetLog& net_log) {
408  DCHECK(CalledOnValidThread());
409  DCHECK(callback);
410  DCHECK(current_script_data_.get())
411      << "Resolver is un-initialized. Must call SetPacScript() first!";
412
413  scoped_refptr<GetProxyForURLJob> job(
414      new GetProxyForURLJob(url, results, callback, net_log));
415
416  // Completion will be notified through |callback|, unless the caller cancels
417  // the request using |request|.
418  if (request)
419    *request = reinterpret_cast<RequestHandle>(job.get());
420
421  // If there is an executor that is ready to run this request, submit it!
422  Executor* executor = FindIdleExecutor();
423  if (executor) {
424    DCHECK_EQ(0u, pending_jobs_.size());
425    executor->StartJob(job);
426    return ERR_IO_PENDING;
427  }
428
429  // Otherwise queue this request. (We will schedule it to a thread once one
430  // becomes available).
431  job->WaitingForThread();
432  pending_jobs_.push_back(job);
433
434  // If we haven't already reached the thread limit, provision a new thread to
435  // drain the requests more quickly.
436  if (executors_.size() < max_num_threads_) {
437    executor = AddNewExecutor();
438    executor->StartJob(
439        new SetPacScriptJob(current_script_data_, NULL));
440  }
441
442  return ERR_IO_PENDING;
443}
444
445void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
446  DCHECK(CalledOnValidThread());
447  DCHECK(req);
448
449  Job* job = reinterpret_cast<Job*>(req);
450  DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
451
452  if (job->executor()) {
453    // If the job was already submitted to the executor, just mark it
454    // as cancelled so the user callback isn't run on completion.
455    job->Cancel();
456  } else {
457    // Otherwise the job is just sitting in a queue.
458    PendingJobsQueue::iterator it =
459        std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
460    DCHECK(it != pending_jobs_.end());
461    pending_jobs_.erase(it);
462  }
463}
464
465void MultiThreadedProxyResolver::CancelSetPacScript() {
466  DCHECK(CalledOnValidThread());
467  DCHECK_EQ(0u, pending_jobs_.size());
468  DCHECK_EQ(1u, executors_.size());
469  DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
470            executors_[0]->outstanding_job()->type());
471
472  // Defensively clear some data which shouldn't be getting used
473  // anymore.
474  current_script_data_ = NULL;
475
476  ReleaseAllExecutors();
477}
478
479void MultiThreadedProxyResolver::PurgeMemory() {
480  DCHECK(CalledOnValidThread());
481  for (ExecutorList::iterator it = executors_.begin();
482       it != executors_.end(); ++it) {
483    Executor* executor = *it;
484    executor->PurgeMemory();
485  }
486}
487
488int MultiThreadedProxyResolver::SetPacScript(
489    const scoped_refptr<ProxyResolverScriptData>& script_data,
490    CompletionCallback* callback) {
491  DCHECK(CalledOnValidThread());
492  DCHECK(callback);
493
494  // Save the script details, so we can provision new executors later.
495  current_script_data_ = script_data;
496
497  // The user should not have any outstanding requests when they call
498  // SetPacScript().
499  CheckNoOutstandingUserRequests();
500
501  // Destroy all of the current threads and their proxy resolvers.
502  ReleaseAllExecutors();
503
504  // Provision a new executor, and run the SetPacScript request. On completion
505  // notification will be sent through |callback|.
506  Executor* executor = AddNewExecutor();
507  executor->StartJob(new SetPacScriptJob(script_data, callback));
508  return ERR_IO_PENDING;
509}
510
511void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
512  DCHECK(CalledOnValidThread());
513  CHECK_EQ(0u, pending_jobs_.size());
514
515  for (ExecutorList::const_iterator it = executors_.begin();
516       it != executors_.end(); ++it) {
517    const Executor* executor = *it;
518    Job* job = executor->outstanding_job();
519    // The "has_user_callback()" is to exclude jobs for which the callback
520    // has already been invoked, or was not user-initiated (as in the case of
521    // lazy thread provisions). User-initiated jobs may !has_user_callback()
522    // when the callback has already been run. (Since we only clear the
523    // outstanding job AFTER the callback has been invoked, it is possible
524    // for a new request to be started from within the callback).
525    CHECK(!job || job->was_cancelled() || !job->has_user_callback());
526  }
527}
528
529void MultiThreadedProxyResolver::ReleaseAllExecutors() {
530  DCHECK(CalledOnValidThread());
531  for (ExecutorList::iterator it = executors_.begin();
532       it != executors_.end(); ++it) {
533    Executor* executor = *it;
534    executor->Destroy();
535  }
536  executors_.clear();
537}
538
539MultiThreadedProxyResolver::Executor*
540MultiThreadedProxyResolver::FindIdleExecutor() {
541  DCHECK(CalledOnValidThread());
542  for (ExecutorList::iterator it = executors_.begin();
543       it != executors_.end(); ++it) {
544    Executor* executor = *it;
545    if (!executor->outstanding_job())
546      return executor;
547  }
548  return NULL;
549}
550
551MultiThreadedProxyResolver::Executor*
552MultiThreadedProxyResolver::AddNewExecutor() {
553  DCHECK(CalledOnValidThread());
554  DCHECK_LT(executors_.size(), max_num_threads_);
555  // The "thread number" is used to give the thread a unique name.
556  int thread_number = executors_.size();
557  ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
558  Executor* executor = new Executor(
559      this, resolver, thread_number);
560  executors_.push_back(make_scoped_refptr(executor));
561  return executor;
562}
563
564void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
565  DCHECK(CalledOnValidThread());
566  if (pending_jobs_.empty())
567    return;
568
569  // Get the next job to process (FIFO). Transfer it from the pending queue
570  // to the executor.
571  scoped_refptr<Job> job = pending_jobs_.front();
572  pending_jobs_.pop_front();
573  executor->StartJob(job);
574}
575
576}  // namespace net
577