1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7#include "base/message_loop/message_loop.h"
8#include "base/prefs/pref_service.h"
9#include "base/rand_util.h"
10#include "base/strings/string_number_conversions.h"
11#include "base/strings/stringprintf.h"
12#include "chrome/browser/drive/event_logger.h"
13#include "chrome/common/pref_names.h"
14#include "content/public/browser/browser_thread.h"
15#include "google_apis/drive/drive_api_parser.h"
16
17using content::BrowserThread;
18
19namespace drive {
20
21namespace {
22
23// All jobs are retried at maximum of kMaxRetryCount when they fail due to
24// throttling or server error.  The delay before retrying a job is shared among
25// jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
26//
27// According to the API documentation, kMaxRetryCount should be the same as
28// kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29// But currently multiplied by 2 to ensure upload related jobs retried for a
30// sufficient number of times. crbug.com/269918
31const int kMaxThrottleCount = 4;
32const int kMaxRetryCount = 2 * kMaxThrottleCount;
33
34// GetDefaultValue returns a value constructed by the default constructor.
35template<typename T> struct DefaultValueCreator {
36  static T GetDefaultValue() { return T(); }
37};
38template<typename T> struct DefaultValueCreator<const T&> {
39  static T GetDefaultValue() { return T(); }
40};
41
42// Helper of CreateErrorRunCallback implementation.
43// Provides:
44// - ResultType; the type of the Callback which should be returned by
45//     CreateErrorRunCallback.
46// - Run(): a static function which takes the original |callback| and |error|,
47//     and runs the |callback|.Run() with the error code and default values
48//     for remaining arguments.
49template<typename CallbackType> struct CreateErrorRunCallbackHelper;
50
51// CreateErrorRunCallback with two arguments.
52template<typename P1>
53struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
54  static void Run(
55      const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
56      google_apis::GDataErrorCode error) {
57    callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
58  }
59};
60
61// Returns a callback with the tail parameter bound to its default value.
62// In other words, returned_callback.Run(error) runs callback.Run(error, T()).
63template<typename CallbackType>
64base::Callback<void(google_apis::GDataErrorCode)>
65CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
66  return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
67}
68
69// Parameter struct for RunUploadNewFile.
70struct UploadNewFileParams {
71  std::string parent_resource_id;
72  base::FilePath local_file_path;
73  std::string title;
74  std::string content_type;
75  DriveUploader::UploadNewFileOptions options;
76  UploadCompletionCallback callback;
77  google_apis::ProgressCallback progress_callback;
78};
79
80// Helper function to work around the arity limitation of base::Bind.
81google_apis::CancelCallback RunUploadNewFile(
82    DriveUploaderInterface* uploader,
83    const UploadNewFileParams& params) {
84  return uploader->UploadNewFile(params.parent_resource_id,
85                                 params.local_file_path,
86                                 params.title,
87                                 params.content_type,
88                                 params.options,
89                                 params.callback,
90                                 params.progress_callback);
91}
92
93// Parameter struct for RunUploadExistingFile.
94struct UploadExistingFileParams {
95  std::string resource_id;
96  base::FilePath local_file_path;
97  std::string content_type;
98  DriveUploader::UploadExistingFileOptions options;
99  std::string etag;
100  UploadCompletionCallback callback;
101  google_apis::ProgressCallback progress_callback;
102};
103
104// Helper function to work around the arity limitation of base::Bind.
105google_apis::CancelCallback RunUploadExistingFile(
106    DriveUploaderInterface* uploader,
107    const UploadExistingFileParams& params) {
108  return uploader->UploadExistingFile(params.resource_id,
109                                      params.local_file_path,
110                                      params.content_type,
111                                      params.options,
112                                      params.callback,
113                                      params.progress_callback);
114}
115
116// Parameter struct for RunResumeUploadFile.
117struct ResumeUploadFileParams {
118  GURL upload_location;
119  base::FilePath local_file_path;
120  std::string content_type;
121  UploadCompletionCallback callback;
122  google_apis::ProgressCallback progress_callback;
123};
124
125// Helper function to adjust the return type.
126google_apis::CancelCallback RunResumeUploadFile(
127    DriveUploaderInterface* uploader,
128    const ResumeUploadFileParams& params) {
129  return uploader->ResumeUploadFile(params.upload_location,
130                                    params.local_file_path,
131                                    params.content_type,
132                                    params.callback,
133                                    params.progress_callback);
134}
135
136}  // namespace
137
138// Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
139const int JobScheduler::kMaxJobCount[] = {
140  5,  // METADATA_QUEUE
141  1,  // FILE_QUEUE
142};
143
144JobScheduler::JobEntry::JobEntry(JobType type)
145    : job_info(type),
146      context(ClientContext(USER_INITIATED)),
147      retry_count(0) {
148  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
149}
150
151JobScheduler::JobEntry::~JobEntry() {
152  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153}
154
155struct JobScheduler::ResumeUploadParams {
156  base::FilePath drive_file_path;
157  base::FilePath local_file_path;
158  std::string content_type;
159};
160
161JobScheduler::JobScheduler(
162    PrefService* pref_service,
163    EventLogger* logger,
164    DriveServiceInterface* drive_service,
165    base::SequencedTaskRunner* blocking_task_runner)
166    : throttle_count_(0),
167      wait_until_(base::Time::Now()),
168      disable_throttling_(false),
169      logger_(logger),
170      drive_service_(drive_service),
171      uploader_(new DriveUploader(drive_service, blocking_task_runner)),
172      pref_service_(pref_service),
173      weak_ptr_factory_(this) {
174  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
175
176  for (int i = 0; i < NUM_QUEUES; ++i)
177    queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
178
179  net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
180}
181
182JobScheduler::~JobScheduler() {
183  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
184
185  size_t num_queued_jobs = 0;
186  for (int i = 0; i < NUM_QUEUES; ++i)
187    num_queued_jobs += queue_[i]->GetNumberOfJobs();
188  DCHECK_EQ(num_queued_jobs, job_map_.size());
189
190  net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
191}
192
193std::vector<JobInfo> JobScheduler::GetJobInfoList() {
194  std::vector<JobInfo> job_info_list;
195  for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
196    job_info_list.push_back(iter.GetCurrentValue()->job_info);
197  return job_info_list;
198}
199
200void JobScheduler::AddObserver(JobListObserver* observer) {
201  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
202  observer_list_.AddObserver(observer);
203}
204
205void JobScheduler::RemoveObserver(JobListObserver* observer) {
206  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
207  observer_list_.RemoveObserver(observer);
208}
209
210void JobScheduler::CancelJob(JobID job_id) {
211  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
212
213  JobEntry* job = job_map_.Lookup(job_id);
214  if (job) {
215    if (job->job_info.state == STATE_RUNNING) {
216      // If the job is running an HTTP request, cancel it via |cancel_callback|
217      // returned from the request, and wait for termination in the normal
218      // callback handler, OnJobDone.
219      if (!job->cancel_callback.is_null())
220        job->cancel_callback.Run();
221    } else {
222      AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
223    }
224  }
225}
226
227void JobScheduler::CancelAllJobs() {
228  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
229
230  // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
231  // removable during iteration.
232  for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
233    CancelJob(iter.GetCurrentKey());
234}
235
236void JobScheduler::GetAboutResource(
237    const google_apis::AboutResourceCallback& callback) {
238  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
239  DCHECK(!callback.is_null());
240
241  JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
242  new_job->task = base::Bind(
243      &DriveServiceInterface::GetAboutResource,
244      base::Unretained(drive_service_),
245      base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
246                 weak_ptr_factory_.GetWeakPtr(),
247                 new_job->job_info.job_id,
248                 callback));
249  new_job->abort_callback = CreateErrorRunCallback(callback);
250  StartJob(new_job);
251}
252
253void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
254  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
255  DCHECK(!callback.is_null());
256
257  JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
258  new_job->task = base::Bind(
259      &DriveServiceInterface::GetAppList,
260      base::Unretained(drive_service_),
261      base::Bind(&JobScheduler::OnGetAppListJobDone,
262                 weak_ptr_factory_.GetWeakPtr(),
263                 new_job->job_info.job_id,
264                 callback));
265  new_job->abort_callback = CreateErrorRunCallback(callback);
266  StartJob(new_job);
267}
268
269void JobScheduler::GetAllFileList(
270    const google_apis::FileListCallback& callback) {
271  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
272  DCHECK(!callback.is_null());
273
274  JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
275  new_job->task = base::Bind(
276      &DriveServiceInterface::GetAllFileList,
277      base::Unretained(drive_service_),
278      base::Bind(&JobScheduler::OnGetFileListJobDone,
279                 weak_ptr_factory_.GetWeakPtr(),
280                 new_job->job_info.job_id,
281                 callback));
282  new_job->abort_callback = CreateErrorRunCallback(callback);
283  StartJob(new_job);
284}
285
286void JobScheduler::GetFileListInDirectory(
287    const std::string& directory_resource_id,
288    const google_apis::FileListCallback& callback) {
289  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
290  DCHECK(!callback.is_null());
291
292  JobEntry* new_job = CreateNewJob(
293      TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
294  new_job->task = base::Bind(
295      &DriveServiceInterface::GetFileListInDirectory,
296      base::Unretained(drive_service_),
297      directory_resource_id,
298      base::Bind(&JobScheduler::OnGetFileListJobDone,
299                 weak_ptr_factory_.GetWeakPtr(),
300                 new_job->job_info.job_id,
301                 callback));
302  new_job->abort_callback = CreateErrorRunCallback(callback);
303  StartJob(new_job);
304}
305
306void JobScheduler::Search(const std::string& search_query,
307                          const google_apis::FileListCallback& callback) {
308  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
309  DCHECK(!callback.is_null());
310
311  JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
312  new_job->task = base::Bind(
313      &DriveServiceInterface::Search,
314      base::Unretained(drive_service_),
315      search_query,
316      base::Bind(&JobScheduler::OnGetFileListJobDone,
317                 weak_ptr_factory_.GetWeakPtr(),
318                 new_job->job_info.job_id,
319                 callback));
320  new_job->abort_callback = CreateErrorRunCallback(callback);
321  StartJob(new_job);
322}
323
324void JobScheduler::GetChangeList(
325    int64 start_changestamp,
326    const google_apis::ChangeListCallback& callback) {
327  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328  DCHECK(!callback.is_null());
329
330  JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
331  new_job->task = base::Bind(
332      &DriveServiceInterface::GetChangeList,
333      base::Unretained(drive_service_),
334      start_changestamp,
335      base::Bind(&JobScheduler::OnGetChangeListJobDone,
336                 weak_ptr_factory_.GetWeakPtr(),
337                 new_job->job_info.job_id,
338                 callback));
339  new_job->abort_callback = CreateErrorRunCallback(callback);
340  StartJob(new_job);
341}
342
343void JobScheduler::GetRemainingChangeList(
344    const GURL& next_link,
345    const google_apis::ChangeListCallback& callback) {
346  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
347  DCHECK(!callback.is_null());
348
349  JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
350  new_job->task = base::Bind(
351      &DriveServiceInterface::GetRemainingChangeList,
352      base::Unretained(drive_service_),
353      next_link,
354      base::Bind(&JobScheduler::OnGetChangeListJobDone,
355                 weak_ptr_factory_.GetWeakPtr(),
356                 new_job->job_info.job_id,
357                 callback));
358  new_job->abort_callback = CreateErrorRunCallback(callback);
359  StartJob(new_job);
360}
361
362void JobScheduler::GetRemainingFileList(
363    const GURL& next_link,
364    const google_apis::FileListCallback& callback) {
365  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
366  DCHECK(!callback.is_null());
367
368  JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
369  new_job->task = base::Bind(
370      &DriveServiceInterface::GetRemainingFileList,
371      base::Unretained(drive_service_),
372      next_link,
373      base::Bind(&JobScheduler::OnGetFileListJobDone,
374                 weak_ptr_factory_.GetWeakPtr(),
375                 new_job->job_info.job_id,
376                 callback));
377  new_job->abort_callback = CreateErrorRunCallback(callback);
378  StartJob(new_job);
379}
380
381void JobScheduler::GetFileResource(
382    const std::string& resource_id,
383    const ClientContext& context,
384    const google_apis::FileResourceCallback& callback) {
385  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
386  DCHECK(!callback.is_null());
387
388  JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
389  new_job->context = context;
390  new_job->task = base::Bind(
391      &DriveServiceInterface::GetFileResource,
392      base::Unretained(drive_service_),
393      resource_id,
394      base::Bind(&JobScheduler::OnGetFileResourceJobDone,
395                 weak_ptr_factory_.GetWeakPtr(),
396                 new_job->job_info.job_id,
397                 callback));
398  new_job->abort_callback = CreateErrorRunCallback(callback);
399  StartJob(new_job);
400}
401
402void JobScheduler::GetShareUrl(
403    const std::string& resource_id,
404    const GURL& embed_origin,
405    const ClientContext& context,
406    const google_apis::GetShareUrlCallback& callback) {
407  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
408  DCHECK(!callback.is_null());
409
410  JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
411  new_job->context = context;
412  new_job->task = base::Bind(
413      &DriveServiceInterface::GetShareUrl,
414      base::Unretained(drive_service_),
415      resource_id,
416      embed_origin,
417      base::Bind(&JobScheduler::OnGetShareUrlJobDone,
418                 weak_ptr_factory_.GetWeakPtr(),
419                 new_job->job_info.job_id,
420                 callback));
421  new_job->abort_callback = CreateErrorRunCallback(callback);
422  StartJob(new_job);
423}
424
425void JobScheduler::TrashResource(
426    const std::string& resource_id,
427    const ClientContext& context,
428    const google_apis::EntryActionCallback& callback) {
429  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
430  DCHECK(!callback.is_null());
431
432  JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
433  new_job->context = context;
434  new_job->task = base::Bind(
435      &DriveServiceInterface::TrashResource,
436      base::Unretained(drive_service_),
437      resource_id,
438      base::Bind(&JobScheduler::OnEntryActionJobDone,
439                 weak_ptr_factory_.GetWeakPtr(),
440                 new_job->job_info.job_id,
441                 callback));
442  new_job->abort_callback = callback;
443  StartJob(new_job);
444}
445
446void JobScheduler::CopyResource(
447    const std::string& resource_id,
448    const std::string& parent_resource_id,
449    const std::string& new_title,
450    const base::Time& last_modified,
451    const google_apis::FileResourceCallback& callback) {
452  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
453  DCHECK(!callback.is_null());
454
455  JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
456  new_job->task = base::Bind(
457      &DriveServiceInterface::CopyResource,
458      base::Unretained(drive_service_),
459      resource_id,
460      parent_resource_id,
461      new_title,
462      last_modified,
463      base::Bind(&JobScheduler::OnGetFileResourceJobDone,
464                 weak_ptr_factory_.GetWeakPtr(),
465                 new_job->job_info.job_id,
466                 callback));
467  new_job->abort_callback = CreateErrorRunCallback(callback);
468  StartJob(new_job);
469}
470
471void JobScheduler::UpdateResource(
472    const std::string& resource_id,
473    const std::string& parent_resource_id,
474    const std::string& new_title,
475    const base::Time& last_modified,
476    const base::Time& last_viewed_by_me,
477    const ClientContext& context,
478    const google_apis::FileResourceCallback& callback) {
479  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
480  DCHECK(!callback.is_null());
481
482  JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
483  new_job->context = context;
484  new_job->task = base::Bind(
485      &DriveServiceInterface::UpdateResource,
486      base::Unretained(drive_service_),
487      resource_id,
488      parent_resource_id,
489      new_title,
490      last_modified,
491      last_viewed_by_me,
492      base::Bind(&JobScheduler::OnGetFileResourceJobDone,
493                 weak_ptr_factory_.GetWeakPtr(),
494                 new_job->job_info.job_id,
495                 callback));
496  new_job->abort_callback = CreateErrorRunCallback(callback);
497  StartJob(new_job);
498}
499
500void JobScheduler::AddResourceToDirectory(
501    const std::string& parent_resource_id,
502    const std::string& resource_id,
503    const google_apis::EntryActionCallback& callback) {
504  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
505  DCHECK(!callback.is_null());
506
507  JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
508  new_job->task = base::Bind(
509      &DriveServiceInterface::AddResourceToDirectory,
510      base::Unretained(drive_service_),
511      parent_resource_id,
512      resource_id,
513      base::Bind(&JobScheduler::OnEntryActionJobDone,
514                 weak_ptr_factory_.GetWeakPtr(),
515                 new_job->job_info.job_id,
516                 callback));
517  new_job->abort_callback = callback;
518  StartJob(new_job);
519}
520
521void JobScheduler::RemoveResourceFromDirectory(
522    const std::string& parent_resource_id,
523    const std::string& resource_id,
524    const ClientContext& context,
525    const google_apis::EntryActionCallback& callback) {
526  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
527
528  JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
529  new_job->context = context;
530  new_job->task = base::Bind(
531      &DriveServiceInterface::RemoveResourceFromDirectory,
532      base::Unretained(drive_service_),
533      parent_resource_id,
534      resource_id,
535      base::Bind(&JobScheduler::OnEntryActionJobDone,
536                 weak_ptr_factory_.GetWeakPtr(),
537                 new_job->job_info.job_id,
538                 callback));
539  new_job->abort_callback = callback;
540  StartJob(new_job);
541}
542
543void JobScheduler::AddNewDirectory(
544    const std::string& parent_resource_id,
545    const std::string& directory_title,
546    const DriveServiceInterface::AddNewDirectoryOptions& options,
547    const ClientContext& context,
548    const google_apis::FileResourceCallback& callback) {
549  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
550
551  JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
552  new_job->context = context;
553  new_job->task = base::Bind(
554      &DriveServiceInterface::AddNewDirectory,
555      base::Unretained(drive_service_),
556      parent_resource_id,
557      directory_title,
558      options,
559      base::Bind(&JobScheduler::OnGetFileResourceJobDone,
560                 weak_ptr_factory_.GetWeakPtr(),
561                 new_job->job_info.job_id,
562                 callback));
563  new_job->abort_callback = CreateErrorRunCallback(callback);
564  StartJob(new_job);
565}
566
567JobID JobScheduler::DownloadFile(
568    const base::FilePath& virtual_path,
569    int64 expected_file_size,
570    const base::FilePath& local_cache_path,
571    const std::string& resource_id,
572    const ClientContext& context,
573    const google_apis::DownloadActionCallback& download_action_callback,
574    const google_apis::GetContentCallback& get_content_callback) {
575  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
576
577  JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
578  new_job->job_info.file_path = virtual_path;
579  new_job->job_info.num_total_bytes = expected_file_size;
580  new_job->context = context;
581  new_job->task = base::Bind(
582      &DriveServiceInterface::DownloadFile,
583      base::Unretained(drive_service_),
584      local_cache_path,
585      resource_id,
586      base::Bind(&JobScheduler::OnDownloadActionJobDone,
587                 weak_ptr_factory_.GetWeakPtr(),
588                 new_job->job_info.job_id,
589                 download_action_callback),
590      get_content_callback,
591      base::Bind(&JobScheduler::UpdateProgress,
592                 weak_ptr_factory_.GetWeakPtr(),
593                 new_job->job_info.job_id));
594  new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
595  StartJob(new_job);
596  return new_job->job_info.job_id;
597}
598
599void JobScheduler::UploadNewFile(
600    const std::string& parent_resource_id,
601    const base::FilePath& drive_file_path,
602    const base::FilePath& local_file_path,
603    const std::string& title,
604    const std::string& content_type,
605    const DriveUploader::UploadNewFileOptions& options,
606    const ClientContext& context,
607    const google_apis::FileResourceCallback& callback) {
608  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
609
610  JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
611  new_job->job_info.file_path = drive_file_path;
612  new_job->context = context;
613
614  UploadNewFileParams params;
615  params.parent_resource_id = parent_resource_id;
616  params.local_file_path = local_file_path;
617  params.title = title;
618  params.content_type = content_type;
619  params.options = options;
620
621  ResumeUploadParams resume_params;
622  resume_params.local_file_path = params.local_file_path;
623  resume_params.content_type = params.content_type;
624
625  params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
626                               weak_ptr_factory_.GetWeakPtr(),
627                               new_job->job_info.job_id,
628                               resume_params,
629                               callback);
630  params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
631                                        weak_ptr_factory_.GetWeakPtr(),
632                                        new_job->job_info.job_id);
633  new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
634  new_job->abort_callback = CreateErrorRunCallback(callback);
635  StartJob(new_job);
636}
637
638void JobScheduler::UploadExistingFile(
639    const std::string& resource_id,
640    const base::FilePath& drive_file_path,
641    const base::FilePath& local_file_path,
642    const std::string& content_type,
643    const DriveUploader::UploadExistingFileOptions& options,
644    const ClientContext& context,
645    const google_apis::FileResourceCallback& callback) {
646  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
647
648  JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
649  new_job->job_info.file_path = drive_file_path;
650  new_job->context = context;
651
652  UploadExistingFileParams params;
653  params.resource_id = resource_id;
654  params.local_file_path = local_file_path;
655  params.content_type = content_type;
656  params.options = options;
657
658  ResumeUploadParams resume_params;
659  resume_params.local_file_path = params.local_file_path;
660  resume_params.content_type = params.content_type;
661
662  params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
663                               weak_ptr_factory_.GetWeakPtr(),
664                               new_job->job_info.job_id,
665                               resume_params,
666                               callback);
667  params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
668                                        weak_ptr_factory_.GetWeakPtr(),
669                                        new_job->job_info.job_id);
670  new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
671  new_job->abort_callback = CreateErrorRunCallback(callback);
672  StartJob(new_job);
673}
674
675void JobScheduler::AddPermission(
676    const std::string& resource_id,
677    const std::string& email,
678    google_apis::drive::PermissionRole role,
679    const google_apis::EntryActionCallback& callback) {
680  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
681  DCHECK(!callback.is_null());
682
683  JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
684  new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
685                             base::Unretained(drive_service_),
686                             resource_id,
687                             email,
688                             role,
689                             base::Bind(&JobScheduler::OnEntryActionJobDone,
690                                        weak_ptr_factory_.GetWeakPtr(),
691                                        new_job->job_info.job_id,
692                                        callback));
693  new_job->abort_callback = callback;
694  StartJob(new_job);
695}
696
697JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
698  JobEntry* job = new JobEntry(type);
699  job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
700  return job;
701}
702
703void JobScheduler::StartJob(JobEntry* job) {
704  DCHECK(!job->task.is_null());
705
706  QueueJob(job->job_info.job_id);
707  NotifyJobAdded(job->job_info);
708  DoJobLoop(GetJobQueueType(job->job_info.job_type));
709}
710
711void JobScheduler::QueueJob(JobID job_id) {
712  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
713
714  JobEntry* job_entry = job_map_.Lookup(job_id);
715  DCHECK(job_entry);
716  const JobInfo& job_info = job_entry->job_info;
717
718  const QueueType queue_type = GetJobQueueType(job_info.job_type);
719  queue_[queue_type]->Push(job_id, job_entry->context.type);
720
721  const std::string retry_prefix = job_entry->retry_count > 0 ?
722      base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
723  logger_->Log(logging::LOG_INFO,
724               "Job queued%s: %s - %s",
725               retry_prefix.c_str(),
726               job_info.ToString().c_str(),
727               GetQueueInfo(queue_type).c_str());
728}
729
730void JobScheduler::DoJobLoop(QueueType queue_type) {
731  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
732
733  const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
734
735  // Abort all USER_INITAITED jobs when not accepted.
736  if (accepted_priority < USER_INITIATED) {
737    std::vector<JobID> jobs;
738    queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
739    for (size_t i = 0; i < jobs.size(); ++i) {
740      JobEntry* job = job_map_.Lookup(jobs[i]);
741      DCHECK(job);
742      AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
743    }
744  }
745
746  // Wait when throttled.
747  const base::Time now = base::Time::Now();
748  if (now < wait_until_) {
749    base::MessageLoopProxy::current()->PostDelayedTask(
750        FROM_HERE,
751        base::Bind(&JobScheduler::DoJobLoop,
752                   weak_ptr_factory_.GetWeakPtr(),
753                   queue_type),
754        wait_until_ - now);
755    return;
756  }
757
758  // Run the job with the highest priority in the queue.
759  JobID job_id = -1;
760  if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
761    return;
762
763  JobEntry* entry = job_map_.Lookup(job_id);
764  DCHECK(entry);
765
766  JobInfo* job_info = &entry->job_info;
767  job_info->state = STATE_RUNNING;
768  job_info->start_time = now;
769  NotifyJobUpdated(*job_info);
770
771  entry->cancel_callback = entry->task.Run();
772
773  UpdateWait();
774
775  logger_->Log(logging::LOG_INFO,
776               "Job started: %s - %s",
777               job_info->ToString().c_str(),
778               GetQueueInfo(queue_type).c_str());
779}
780
781int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
782  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
783
784  const int kNoJobShouldRun = -1;
785
786  // Should stop if Drive was disabled while running the fetch loop.
787  if (pref_service_->GetBoolean(prefs::kDisableDrive))
788    return kNoJobShouldRun;
789
790  // Should stop if the network is not online.
791  if (net::NetworkChangeNotifier::IsOffline())
792    return kNoJobShouldRun;
793
794  // For the file queue, if it is on cellular network, only user initiated
795  // operations are allowed to start.
796  if (queue_type == FILE_QUEUE &&
797      pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
798      net::NetworkChangeNotifier::IsConnectionCellular(
799          net::NetworkChangeNotifier::GetConnectionType()))
800    return USER_INITIATED;
801
802  // Otherwise, every operations including background tasks are allowed.
803  return BACKGROUND;
804}
805
806void JobScheduler::UpdateWait() {
807  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
808
809  if (disable_throttling_ || throttle_count_ == 0)
810    return;
811
812  // Exponential backoff: https://developers.google.com/drive/handle-errors.
813  base::TimeDelta delay =
814      base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
815      base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
816  VLOG(1) << "Throttling for " << delay.InMillisecondsF();
817
818  wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
819}
820
821bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
822  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
823
824  JobEntry* job_entry = job_map_.Lookup(job_id);
825  DCHECK(job_entry);
826  JobInfo* job_info = &job_entry->job_info;
827  QueueType queue_type = GetJobQueueType(job_info->job_type);
828  queue_[queue_type]->MarkFinished(job_id);
829
830  const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
831  bool success = (GDataToFileError(error) == FILE_ERROR_OK);
832  logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
833               "Job done: %s => %s (elapsed time: %sms) - %s",
834               job_info->ToString().c_str(),
835               GDataErrorCodeToString(error).c_str(),
836               base::Int64ToString(elapsed.InMilliseconds()).c_str(),
837               GetQueueInfo(queue_type).c_str());
838
839  // Retry, depending on the error.
840  const bool is_server_error =
841      error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
842      error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
843  if (is_server_error) {
844    if (throttle_count_ < kMaxThrottleCount)
845      ++throttle_count_;
846    UpdateWait();
847  } else {
848    throttle_count_ = 0;
849  }
850
851  const bool should_retry =
852      is_server_error && job_entry->retry_count < kMaxRetryCount;
853  if (should_retry) {
854    job_entry->cancel_callback.Reset();
855    job_info->state = STATE_RETRY;
856    NotifyJobUpdated(*job_info);
857
858    ++job_entry->retry_count;
859
860    // Requeue the job.
861    QueueJob(job_id);
862  } else {
863    NotifyJobDone(*job_info, error);
864    // The job has finished, no retry will happen in the scheduler. Now we can
865    // remove the job info from the map.
866    job_map_.Remove(job_id);
867  }
868
869  // Post a task to continue the job loop.  This allows us to finish handling
870  // the current job before starting the next one.
871  base::MessageLoopProxy::current()->PostTask(FROM_HERE,
872      base::Bind(&JobScheduler::DoJobLoop,
873                 weak_ptr_factory_.GetWeakPtr(),
874                 queue_type));
875  return !should_retry;
876}
877
878void JobScheduler::OnGetFileListJobDone(
879    JobID job_id,
880    const google_apis::FileListCallback& callback,
881    google_apis::GDataErrorCode error,
882    scoped_ptr<google_apis::FileList> file_list) {
883  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
884  DCHECK(!callback.is_null());
885
886  if (OnJobDone(job_id, error))
887    callback.Run(error, file_list.Pass());
888}
889
890void JobScheduler::OnGetChangeListJobDone(
891    JobID job_id,
892    const google_apis::ChangeListCallback& callback,
893    google_apis::GDataErrorCode error,
894    scoped_ptr<google_apis::ChangeList> change_list) {
895  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
896  DCHECK(!callback.is_null());
897
898  if (OnJobDone(job_id, error))
899    callback.Run(error, change_list.Pass());
900}
901
902void JobScheduler::OnGetFileResourceJobDone(
903    JobID job_id,
904    const google_apis::FileResourceCallback& callback,
905    google_apis::GDataErrorCode error,
906    scoped_ptr<google_apis::FileResource> entry) {
907  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
908  DCHECK(!callback.is_null());
909
910  if (OnJobDone(job_id, error))
911    callback.Run(error, entry.Pass());
912}
913
914void JobScheduler::OnGetAboutResourceJobDone(
915    JobID job_id,
916    const google_apis::AboutResourceCallback& callback,
917    google_apis::GDataErrorCode error,
918    scoped_ptr<google_apis::AboutResource> about_resource) {
919  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
920  DCHECK(!callback.is_null());
921
922  if (OnJobDone(job_id, error))
923    callback.Run(error, about_resource.Pass());
924}
925
926void JobScheduler::OnGetShareUrlJobDone(
927    JobID job_id,
928    const google_apis::GetShareUrlCallback& callback,
929    google_apis::GDataErrorCode error,
930    const GURL& share_url) {
931  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
932  DCHECK(!callback.is_null());
933
934  if (OnJobDone(job_id, error))
935    callback.Run(error, share_url);
936}
937
938void JobScheduler::OnGetAppListJobDone(
939    JobID job_id,
940    const google_apis::AppListCallback& callback,
941    google_apis::GDataErrorCode error,
942    scoped_ptr<google_apis::AppList> app_list) {
943  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
944  DCHECK(!callback.is_null());
945
946  if (OnJobDone(job_id, error))
947    callback.Run(error, app_list.Pass());
948}
949
950void JobScheduler::OnEntryActionJobDone(
951    JobID job_id,
952    const google_apis::EntryActionCallback& callback,
953    google_apis::GDataErrorCode error) {
954  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
955  DCHECK(!callback.is_null());
956
957  if (OnJobDone(job_id, error))
958    callback.Run(error);
959}
960
961void JobScheduler::OnDownloadActionJobDone(
962    JobID job_id,
963    const google_apis::DownloadActionCallback& callback,
964    google_apis::GDataErrorCode error,
965    const base::FilePath& temp_file) {
966  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
967  DCHECK(!callback.is_null());
968
969  if (OnJobDone(job_id, error))
970    callback.Run(error, temp_file);
971}
972
973void JobScheduler::OnUploadCompletionJobDone(
974    JobID job_id,
975    const ResumeUploadParams& resume_params,
976    const google_apis::FileResourceCallback& callback,
977    google_apis::GDataErrorCode error,
978    const GURL& upload_location,
979    scoped_ptr<google_apis::FileResource> entry) {
980  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
981  DCHECK(!callback.is_null());
982
983  if (!upload_location.is_empty()) {
984    // If upload_location is available, update the task to resume the
985    // upload process from the terminated point.
986    // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
987    // so OnJobDone called below will be in charge to re-queue the job.
988    JobEntry* job_entry = job_map_.Lookup(job_id);
989    DCHECK(job_entry);
990
991    ResumeUploadFileParams params;
992    params.upload_location = upload_location;
993    params.local_file_path = resume_params.local_file_path;
994    params.content_type = resume_params.content_type;
995    params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
996                                 weak_ptr_factory_.GetWeakPtr(),
997                                 job_id,
998                                 job_entry->task,
999                                 callback);
1000    params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1001                                          weak_ptr_factory_.GetWeakPtr(),
1002                                          job_id);
1003    job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1004  }
1005
1006  if (OnJobDone(job_id, error))
1007    callback.Run(error, entry.Pass());
1008}
1009
1010void JobScheduler::OnResumeUploadFileDone(
1011    JobID job_id,
1012    const base::Callback<google_apis::CancelCallback()>& original_task,
1013    const google_apis::FileResourceCallback& callback,
1014    google_apis::GDataErrorCode error,
1015    const GURL& upload_location,
1016    scoped_ptr<google_apis::FileResource> entry) {
1017  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1018  DCHECK(!original_task.is_null());
1019  DCHECK(!callback.is_null());
1020
1021  if (upload_location.is_empty()) {
1022    // If upload_location is not available, we should discard it and stop trying
1023    // to resume. Restore the original task.
1024    JobEntry* job_entry = job_map_.Lookup(job_id);
1025    DCHECK(job_entry);
1026    job_entry->task = original_task;
1027  }
1028
1029  if (OnJobDone(job_id, error))
1030    callback.Run(error, entry.Pass());
1031}
1032
1033void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1034  JobEntry* job_entry = job_map_.Lookup(job_id);
1035  DCHECK(job_entry);
1036
1037  job_entry->job_info.num_completed_bytes = progress;
1038  if (total != -1)
1039    job_entry->job_info.num_total_bytes = total;
1040  NotifyJobUpdated(job_entry->job_info);
1041}
1042
1043void JobScheduler::OnConnectionTypeChanged(
1044    net::NetworkChangeNotifier::ConnectionType type) {
1045  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1046
1047  // Resume the job loop.
1048  // Note that we don't need to check the network connection status as it will
1049  // be checked in GetCurrentAcceptedPriority().
1050  for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1051    DoJobLoop(static_cast<QueueType>(i));
1052}
1053
1054JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1055  switch (type) {
1056    case TYPE_GET_ABOUT_RESOURCE:
1057    case TYPE_GET_APP_LIST:
1058    case TYPE_GET_ALL_RESOURCE_LIST:
1059    case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1060    case TYPE_SEARCH:
1061    case TYPE_GET_CHANGE_LIST:
1062    case TYPE_GET_REMAINING_CHANGE_LIST:
1063    case TYPE_GET_REMAINING_FILE_LIST:
1064    case TYPE_GET_RESOURCE_ENTRY:
1065    case TYPE_GET_SHARE_URL:
1066    case TYPE_TRASH_RESOURCE:
1067    case TYPE_COPY_RESOURCE:
1068    case TYPE_UPDATE_RESOURCE:
1069    case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1070    case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1071    case TYPE_ADD_NEW_DIRECTORY:
1072    case TYPE_CREATE_FILE:
1073    case TYPE_ADD_PERMISSION:
1074      return METADATA_QUEUE;
1075
1076    case TYPE_DOWNLOAD_FILE:
1077    case TYPE_UPLOAD_NEW_FILE:
1078    case TYPE_UPLOAD_EXISTING_FILE:
1079      return FILE_QUEUE;
1080  }
1081  NOTREACHED();
1082  return FILE_QUEUE;
1083}
1084
1085void JobScheduler::AbortNotRunningJob(JobEntry* job,
1086                                      google_apis::GDataErrorCode error) {
1087  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1088
1089  const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1090  const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1091  logger_->Log(logging::LOG_INFO,
1092               "Job aborted: %s => %s (elapsed time: %sms) - %s",
1093               job->job_info.ToString().c_str(),
1094               GDataErrorCodeToString(error).c_str(),
1095               base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1096               GetQueueInfo(queue_type).c_str());
1097
1098  base::Callback<void(google_apis::GDataErrorCode)> callback =
1099      job->abort_callback;
1100  queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1101  NotifyJobDone(job->job_info, error);
1102  job_map_.Remove(job->job_info.job_id);
1103  base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1104                                              base::Bind(callback, error));
1105}
1106
1107void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1108  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1109  FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1110}
1111
1112void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1113                                 google_apis::GDataErrorCode error) {
1114  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1115  FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1116                    OnJobDone(job_info, GDataToFileError(error)));
1117}
1118
1119void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1120  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1121  FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1122}
1123
1124std::string JobScheduler::GetQueueInfo(QueueType type) const {
1125  return QueueTypeToString(type) + " " + queue_[type]->ToString();
1126}
1127
1128// static
1129std::string JobScheduler::QueueTypeToString(QueueType type) {
1130  switch (type) {
1131    case METADATA_QUEUE:
1132      return "METADATA_QUEUE";
1133    case FILE_QUEUE:
1134      return "FILE_QUEUE";
1135    case NUM_QUEUES:
1136      break;  // This value is just a sentinel. Should never be used.
1137  }
1138  NOTREACHED();
1139  return "";
1140}
1141
1142}  // namespace drive
1143