1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/chromeos/drive/job_scheduler.h"
6
7#include "base/message_loop/message_loop.h"
8#include "base/prefs/pref_service.h"
9#include "base/rand_util.h"
10#include "base/strings/string_number_conversions.h"
11#include "base/strings/stringprintf.h"
12#include "chrome/browser/chromeos/drive/file_system_util.h"
13#include "chrome/browser/chromeos/drive/logging.h"
14#include "chrome/browser/google_apis/drive_api_parser.h"
15#include "chrome/browser/google_apis/task_util.h"
16#include "chrome/common/pref_names.h"
17#include "content/public/browser/browser_thread.h"
18
19using content::BrowserThread;
20
21namespace drive {
22
23namespace {
24
25const int kMaxThrottleCount = 4;
26
27// According to the API documentation, this should be the same as
28// kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
29// But currently multiplied by 2 to ensure upload related jobs retried for a
30// sufficient number of times. crbug.com/269918
31const int kMaxRetryCount = 2*kMaxThrottleCount;
32
33// Parameter struct for RunUploadNewFile.
34struct UploadNewFileParams {
35  std::string parent_resource_id;
36  base::FilePath local_file_path;
37  std::string title;
38  std::string content_type;
39  UploadCompletionCallback callback;
40  google_apis::ProgressCallback progress_callback;
41};
42
43// Helper function to work around the arity limitation of base::Bind.
44google_apis::CancelCallback RunUploadNewFile(
45    DriveUploaderInterface* uploader,
46    const UploadNewFileParams& params) {
47  return uploader->UploadNewFile(params.parent_resource_id,
48                                 params.local_file_path,
49                                 params.title,
50                                 params.content_type,
51                                 params.callback,
52                                 params.progress_callback);
53}
54
55// Parameter struct for RunUploadExistingFile.
56struct UploadExistingFileParams {
57  std::string resource_id;
58  base::FilePath local_file_path;
59  std::string content_type;
60  std::string etag;
61  UploadCompletionCallback callback;
62  google_apis::ProgressCallback progress_callback;
63};
64
65// Helper function to work around the arity limitation of base::Bind.
66google_apis::CancelCallback RunUploadExistingFile(
67    DriveUploaderInterface* uploader,
68    const UploadExistingFileParams& params) {
69  return uploader->UploadExistingFile(params.resource_id,
70                                      params.local_file_path,
71                                      params.content_type,
72                                      params.etag,
73                                      params.callback,
74                                      params.progress_callback);
75}
76
77// Parameter struct for RunResumeUploadFile.
78struct ResumeUploadFileParams {
79  GURL upload_location;
80  base::FilePath local_file_path;
81  std::string content_type;
82  UploadCompletionCallback callback;
83  google_apis::ProgressCallback progress_callback;
84};
85
86// Helper function to adjust the return type.
87google_apis::CancelCallback RunResumeUploadFile(
88    DriveUploaderInterface* uploader,
89    const ResumeUploadFileParams& params) {
90  return uploader->ResumeUploadFile(params.upload_location,
91                                    params.local_file_path,
92                                    params.content_type,
93                                    params.callback,
94                                    params.progress_callback);
95}
96
97}  // namespace
98
99const int JobScheduler::kMaxJobCount[] = {
100  5,  // METADATA_QUEUE
101  1,  // FILE_QUEUE
102};
103
104JobScheduler::JobEntry::JobEntry(JobType type)
105    : job_info(type),
106      context(ClientContext(USER_INITIATED)),
107      retry_count(0) {
108  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
109}
110
111JobScheduler::JobEntry::~JobEntry() {
112  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
113}
114
115struct JobScheduler::ResumeUploadParams {
116  base::FilePath drive_file_path;
117  base::FilePath local_file_path;
118  std::string content_type;
119};
120
121JobScheduler::JobScheduler(
122    PrefService* pref_service,
123    DriveServiceInterface* drive_service,
124    base::SequencedTaskRunner* blocking_task_runner)
125    : throttle_count_(0),
126      wait_until_(base::Time::Now()),
127      disable_throttling_(false),
128      drive_service_(drive_service),
129      uploader_(new DriveUploader(drive_service, blocking_task_runner)),
130      pref_service_(pref_service),
131      weak_ptr_factory_(this) {
132  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
133
134  for (int i = 0; i < NUM_QUEUES; ++i)
135    queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
136
137  net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
138}
139
140JobScheduler::~JobScheduler() {
141  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
142
143  size_t num_queued_jobs = 0;
144  for (int i = 0; i < NUM_QUEUES; ++i)
145    num_queued_jobs += queue_[i]->GetNumberOfJobs();
146  DCHECK_EQ(num_queued_jobs, job_map_.size());
147
148  net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
149}
150
151std::vector<JobInfo> JobScheduler::GetJobInfoList() {
152  std::vector<JobInfo> job_info_list;
153  for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
154    job_info_list.push_back(iter.GetCurrentValue()->job_info);
155  return job_info_list;
156}
157
158void JobScheduler::AddObserver(JobListObserver* observer) {
159  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
160  observer_list_.AddObserver(observer);
161}
162
163void JobScheduler::RemoveObserver(JobListObserver* observer) {
164  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
165  observer_list_.RemoveObserver(observer);
166}
167
168void JobScheduler::CancelJob(JobID job_id) {
169  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
170
171  JobEntry* job = job_map_.Lookup(job_id);
172  if (job) {
173    if (job->job_info.state == STATE_RUNNING) {
174      // If the job is running an HTTP request, cancel it via |cancel_callback|
175      // returned from the request, and wait for termination in the normal
176      // callback handler, OnJobDone.
177      if (!job->cancel_callback.is_null())
178        job->cancel_callback.Run();
179    } else {
180      AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
181    }
182  }
183}
184
185void JobScheduler::CancelAllJobs() {
186  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
187
188  // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
189  // removable during iteration.
190  for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
191    CancelJob(iter.GetCurrentKey());
192}
193
194void JobScheduler::GetAboutResource(
195    const google_apis::GetAboutResourceCallback& callback) {
196  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
197  DCHECK(!callback.is_null());
198
199  JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
200  new_job->task = base::Bind(
201      &DriveServiceInterface::GetAboutResource,
202      base::Unretained(drive_service_),
203      base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
204                 weak_ptr_factory_.GetWeakPtr(),
205                 new_job->job_info.job_id,
206                 callback));
207  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
208  StartJob(new_job);
209}
210
211void JobScheduler::GetAppList(
212    const google_apis::GetAppListCallback& callback) {
213  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
214  DCHECK(!callback.is_null());
215
216  JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
217  new_job->task = base::Bind(
218      &DriveServiceInterface::GetAppList,
219      base::Unretained(drive_service_),
220      base::Bind(&JobScheduler::OnGetAppListJobDone,
221                 weak_ptr_factory_.GetWeakPtr(),
222                 new_job->job_info.job_id,
223                 callback));
224  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
225  StartJob(new_job);
226}
227
228void JobScheduler::GetAllResourceList(
229    const google_apis::GetResourceListCallback& callback) {
230  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
231  DCHECK(!callback.is_null());
232
233  JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
234  new_job->task = base::Bind(
235      &DriveServiceInterface::GetAllResourceList,
236      base::Unretained(drive_service_),
237      base::Bind(&JobScheduler::OnGetResourceListJobDone,
238                 weak_ptr_factory_.GetWeakPtr(),
239                 new_job->job_info.job_id,
240                 callback));
241  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
242  StartJob(new_job);
243}
244
245void JobScheduler::GetResourceListInDirectory(
246    const std::string& directory_resource_id,
247    const google_apis::GetResourceListCallback& callback) {
248  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
249  DCHECK(!callback.is_null());
250
251  JobEntry* new_job = CreateNewJob(
252      TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
253  new_job->task = base::Bind(
254      &DriveServiceInterface::GetResourceListInDirectory,
255      base::Unretained(drive_service_),
256      directory_resource_id,
257      base::Bind(&JobScheduler::OnGetResourceListJobDone,
258                 weak_ptr_factory_.GetWeakPtr(),
259                 new_job->job_info.job_id,
260                 callback));
261  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
262  StartJob(new_job);
263}
264
265void JobScheduler::Search(
266    const std::string& search_query,
267    const google_apis::GetResourceListCallback& callback) {
268  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
269  DCHECK(!callback.is_null());
270
271  JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
272  new_job->task = base::Bind(
273      &DriveServiceInterface::Search,
274      base::Unretained(drive_service_),
275      search_query,
276      base::Bind(&JobScheduler::OnGetResourceListJobDone,
277                 weak_ptr_factory_.GetWeakPtr(),
278                 new_job->job_info.job_id,
279                 callback));
280  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
281  StartJob(new_job);
282}
283
284void JobScheduler::GetChangeList(
285    int64 start_changestamp,
286    const google_apis::GetResourceListCallback& callback) {
287  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
288  DCHECK(!callback.is_null());
289
290  JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
291  new_job->task = base::Bind(
292      &DriveServiceInterface::GetChangeList,
293      base::Unretained(drive_service_),
294      start_changestamp,
295      base::Bind(&JobScheduler::OnGetResourceListJobDone,
296                 weak_ptr_factory_.GetWeakPtr(),
297                 new_job->job_info.job_id,
298                 callback));
299  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
300  StartJob(new_job);
301}
302
303void JobScheduler::ContinueGetResourceList(
304    const GURL& next_url,
305    const google_apis::GetResourceListCallback& callback) {
306  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
307  DCHECK(!callback.is_null());
308
309  JobEntry* new_job = CreateNewJob(TYPE_CONTINUE_GET_RESOURCE_LIST);
310  new_job->task = base::Bind(
311      &DriveServiceInterface::ContinueGetResourceList,
312      base::Unretained(drive_service_),
313      next_url,
314      base::Bind(&JobScheduler::OnGetResourceListJobDone,
315                 weak_ptr_factory_.GetWeakPtr(),
316                 new_job->job_info.job_id,
317                 callback));
318  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
319  StartJob(new_job);
320}
321
322void JobScheduler::GetResourceEntry(
323    const std::string& resource_id,
324    const ClientContext& context,
325    const google_apis::GetResourceEntryCallback& callback) {
326  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
327  DCHECK(!callback.is_null());
328
329  JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
330  new_job->context = context;
331  new_job->task = base::Bind(
332      &DriveServiceInterface::GetResourceEntry,
333      base::Unretained(drive_service_),
334      resource_id,
335      base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
336                 weak_ptr_factory_.GetWeakPtr(),
337                 new_job->job_info.job_id,
338                 callback));
339  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
340  StartJob(new_job);
341}
342
343void JobScheduler::GetShareUrl(
344    const std::string& resource_id,
345    const GURL& embed_origin,
346    const ClientContext& context,
347    const google_apis::GetShareUrlCallback& callback) {
348  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
349  DCHECK(!callback.is_null());
350
351  JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
352  new_job->context = context;
353  new_job->task = base::Bind(
354      &DriveServiceInterface::GetShareUrl,
355      base::Unretained(drive_service_),
356      resource_id,
357      embed_origin,
358      base::Bind(&JobScheduler::OnGetShareUrlJobDone,
359                 weak_ptr_factory_.GetWeakPtr(),
360                 new_job->job_info.job_id,
361                 callback));
362  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
363  StartJob(new_job);
364}
365
366void JobScheduler::DeleteResource(
367    const std::string& resource_id,
368    const google_apis::EntryActionCallback& callback) {
369  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
370  DCHECK(!callback.is_null());
371
372  JobEntry* new_job = CreateNewJob(TYPE_DELETE_RESOURCE);
373  new_job->task = base::Bind(
374      &DriveServiceInterface::DeleteResource,
375      base::Unretained(drive_service_),
376      resource_id,
377      "",  // etag
378      base::Bind(&JobScheduler::OnEntryActionJobDone,
379                 weak_ptr_factory_.GetWeakPtr(),
380                 new_job->job_info.job_id,
381                 callback));
382  new_job->abort_callback = callback;
383  StartJob(new_job);
384}
385
386void JobScheduler::CopyResource(
387    const std::string& resource_id,
388    const std::string& parent_resource_id,
389    const std::string& new_title,
390    const google_apis::GetResourceEntryCallback& callback) {
391  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
392  DCHECK(!callback.is_null());
393
394  JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
395  new_job->task = base::Bind(
396      &DriveServiceInterface::CopyResource,
397      base::Unretained(drive_service_),
398      resource_id,
399      parent_resource_id,
400      new_title,
401      base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
402                 weak_ptr_factory_.GetWeakPtr(),
403                 new_job->job_info.job_id,
404                 callback));
405  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
406  StartJob(new_job);
407}
408
409void JobScheduler::CopyHostedDocument(
410    const std::string& resource_id,
411    const std::string& new_title,
412    const google_apis::GetResourceEntryCallback& callback) {
413  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
414  DCHECK(!callback.is_null());
415
416  JobEntry* new_job = CreateNewJob(TYPE_COPY_HOSTED_DOCUMENT);
417  new_job->task = base::Bind(
418      &DriveServiceInterface::CopyHostedDocument,
419      base::Unretained(drive_service_),
420      resource_id,
421      new_title,
422      base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
423                 weak_ptr_factory_.GetWeakPtr(),
424                 new_job->job_info.job_id,
425                 callback));
426  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
427  StartJob(new_job);
428}
429
430void JobScheduler::RenameResource(
431    const std::string& resource_id,
432    const std::string& new_title,
433    const google_apis::EntryActionCallback& callback) {
434  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
435  DCHECK(!callback.is_null());
436
437  JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
438  new_job->task = base::Bind(
439      &DriveServiceInterface::RenameResource,
440      base::Unretained(drive_service_),
441      resource_id,
442      new_title,
443      base::Bind(&JobScheduler::OnEntryActionJobDone,
444                 weak_ptr_factory_.GetWeakPtr(),
445                 new_job->job_info.job_id,
446                 callback));
447  new_job->abort_callback = callback;
448  StartJob(new_job);
449}
450
451void JobScheduler::TouchResource(
452    const std::string& resource_id,
453    const base::Time& modified_date,
454    const base::Time& last_viewed_by_me_date,
455    const google_apis::GetResourceEntryCallback& callback) {
456  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
457  DCHECK(!callback.is_null());
458
459  JobEntry* new_job = CreateNewJob(TYPE_TOUCH_RESOURCE);
460  new_job->task = base::Bind(
461      &DriveServiceInterface::TouchResource,
462      base::Unretained(drive_service_),
463      resource_id,
464      modified_date,
465      last_viewed_by_me_date,
466      base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
467                 weak_ptr_factory_.GetWeakPtr(),
468                 new_job->job_info.job_id,
469                 callback));
470  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
471  StartJob(new_job);
472}
473
474void JobScheduler::AddResourceToDirectory(
475    const std::string& parent_resource_id,
476    const std::string& resource_id,
477    const google_apis::EntryActionCallback& callback) {
478  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
479  DCHECK(!callback.is_null());
480
481  JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
482  new_job->task = base::Bind(
483      &DriveServiceInterface::AddResourceToDirectory,
484      base::Unretained(drive_service_),
485      parent_resource_id,
486      resource_id,
487      base::Bind(&JobScheduler::OnEntryActionJobDone,
488                 weak_ptr_factory_.GetWeakPtr(),
489                 new_job->job_info.job_id,
490                 callback));
491  new_job->abort_callback = callback;
492  StartJob(new_job);
493}
494
495void JobScheduler::RemoveResourceFromDirectory(
496    const std::string& parent_resource_id,
497    const std::string& resource_id,
498    const google_apis::EntryActionCallback& callback) {
499  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
500
501  JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
502  new_job->task = base::Bind(
503      &DriveServiceInterface::RemoveResourceFromDirectory,
504      base::Unretained(drive_service_),
505      parent_resource_id,
506      resource_id,
507      base::Bind(&JobScheduler::OnEntryActionJobDone,
508                 weak_ptr_factory_.GetWeakPtr(),
509                 new_job->job_info.job_id,
510                 callback));
511  new_job->abort_callback = callback;
512  StartJob(new_job);
513}
514
515void JobScheduler::AddNewDirectory(
516    const std::string& parent_resource_id,
517    const std::string& directory_title,
518    const google_apis::GetResourceEntryCallback& callback) {
519  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
520
521  JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
522  new_job->task = base::Bind(
523      &DriveServiceInterface::AddNewDirectory,
524      base::Unretained(drive_service_),
525      parent_resource_id,
526      directory_title,
527      base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
528                 weak_ptr_factory_.GetWeakPtr(),
529                 new_job->job_info.job_id,
530                 callback));
531  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
532  StartJob(new_job);
533}
534
535JobID JobScheduler::DownloadFile(
536    const base::FilePath& virtual_path,
537    const base::FilePath& local_cache_path,
538    const std::string& resource_id,
539    const ClientContext& context,
540    const google_apis::DownloadActionCallback& download_action_callback,
541    const google_apis::GetContentCallback& get_content_callback) {
542  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
543
544  JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
545  new_job->job_info.file_path = virtual_path;
546  new_job->context = context;
547  new_job->task = base::Bind(
548      &DriveServiceInterface::DownloadFile,
549      base::Unretained(drive_service_),
550      local_cache_path,
551      resource_id,
552      base::Bind(&JobScheduler::OnDownloadActionJobDone,
553                 weak_ptr_factory_.GetWeakPtr(),
554                 new_job->job_info.job_id,
555                 download_action_callback),
556      get_content_callback,
557      base::Bind(&JobScheduler::UpdateProgress,
558                 weak_ptr_factory_.GetWeakPtr(),
559                 new_job->job_info.job_id));
560  new_job->abort_callback =
561      google_apis::CreateErrorRunCallback(download_action_callback);
562  StartJob(new_job);
563  return new_job->job_info.job_id;
564}
565
566void JobScheduler::UploadNewFile(
567    const std::string& parent_resource_id,
568    const base::FilePath& drive_file_path,
569    const base::FilePath& local_file_path,
570    const std::string& title,
571    const std::string& content_type,
572    const ClientContext& context,
573    const google_apis::GetResourceEntryCallback& callback) {
574  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
575
576  JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
577  new_job->job_info.file_path = drive_file_path;
578  new_job->context = context;
579
580  UploadNewFileParams params;
581  params.parent_resource_id = parent_resource_id;
582  params.local_file_path = local_file_path;
583  params.title = title;
584  params.content_type = content_type;
585
586  ResumeUploadParams resume_params;
587  resume_params.local_file_path = params.local_file_path;
588  resume_params.content_type = params.content_type;
589
590  params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
591                               weak_ptr_factory_.GetWeakPtr(),
592                               new_job->job_info.job_id,
593                               resume_params,
594                               callback);
595  params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
596                                        weak_ptr_factory_.GetWeakPtr(),
597                                        new_job->job_info.job_id);
598  new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
599  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
600  StartJob(new_job);
601}
602
603void JobScheduler::UploadExistingFile(
604    const std::string& resource_id,
605    const base::FilePath& drive_file_path,
606    const base::FilePath& local_file_path,
607    const std::string& content_type,
608    const std::string& etag,
609    const ClientContext& context,
610    const google_apis::GetResourceEntryCallback& callback) {
611  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
612
613  JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
614  new_job->job_info.file_path = drive_file_path;
615  new_job->context = context;
616
617  UploadExistingFileParams params;
618  params.resource_id = resource_id;
619  params.local_file_path = local_file_path;
620  params.content_type = content_type;
621  params.etag = etag;
622
623  ResumeUploadParams resume_params;
624  resume_params.local_file_path = params.local_file_path;
625  resume_params.content_type = params.content_type;
626
627  params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
628                               weak_ptr_factory_.GetWeakPtr(),
629                               new_job->job_info.job_id,
630                               resume_params,
631                               callback);
632  params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
633                                        weak_ptr_factory_.GetWeakPtr(),
634                                        new_job->job_info.job_id);
635  new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
636  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
637  StartJob(new_job);
638}
639
640void JobScheduler::CreateFile(
641    const std::string& parent_resource_id,
642    const base::FilePath& drive_file_path,
643    const std::string& title,
644    const std::string& content_type,
645    const ClientContext& context,
646    const google_apis::GetResourceEntryCallback& callback) {
647  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
648
649  const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
650
651  JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
652  new_job->job_info.file_path = drive_file_path;
653  new_job->context = context;
654
655  UploadNewFileParams params;
656  params.parent_resource_id = parent_resource_id;
657  params.local_file_path = kDevNull;  // Upload an empty file.
658  params.title = title;
659  params.content_type = content_type;
660
661  ResumeUploadParams resume_params;
662  resume_params.local_file_path = params.local_file_path;
663  resume_params.content_type = params.content_type;
664
665  params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
666                               weak_ptr_factory_.GetWeakPtr(),
667                               new_job->job_info.job_id,
668                               resume_params,
669                               callback);
670  params.progress_callback = google_apis::ProgressCallback();
671
672  new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
673  new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
674  StartJob(new_job);
675}
676
677JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
678  JobEntry* job = new JobEntry(type);
679  job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
680  return job;
681}
682
683void JobScheduler::StartJob(JobEntry* job) {
684  DCHECK(!job->task.is_null());
685
686  QueueJob(job->job_info.job_id);
687  NotifyJobAdded(job->job_info);
688  DoJobLoop(GetJobQueueType(job->job_info.job_type));
689}
690
691void JobScheduler::QueueJob(JobID job_id) {
692  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
693
694  JobEntry* job_entry = job_map_.Lookup(job_id);
695  DCHECK(job_entry);
696  const JobInfo& job_info = job_entry->job_info;
697
698  QueueType queue_type = GetJobQueueType(job_info.job_type);
699  queue_[queue_type]->Push(job_id, job_entry->context.type);
700
701  const std::string retry_prefix = job_entry->retry_count > 0 ?
702      base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
703  util::Log(logging::LOG_INFO,
704            "Job queued%s: %s - %s",
705            retry_prefix.c_str(),
706            job_info.ToString().c_str(),
707            GetQueueInfo(queue_type).c_str());
708}
709
710void JobScheduler::DoJobLoop(QueueType queue_type) {
711  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
712
713  const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
714
715  // Abort all USER_INITAITED jobs when not accepted.
716  if (accepted_priority < USER_INITIATED) {
717    std::vector<JobID> jobs;
718    queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
719    for (size_t i = 0; i < jobs.size(); ++i) {
720      JobEntry* job = job_map_.Lookup(jobs[i]);
721      DCHECK(job);
722      AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
723    }
724  }
725
726  // Wait when throttled.
727  const base::Time now = base::Time::Now();
728  if (now < wait_until_) {
729    base::MessageLoopProxy::current()->PostDelayedTask(
730        FROM_HERE,
731        base::Bind(&JobScheduler::DoJobLoop,
732                   weak_ptr_factory_.GetWeakPtr(),
733                   queue_type),
734        wait_until_ - now);
735    return;
736  }
737
738  // Run the job with the highest priority in the queue.
739  JobID job_id = -1;
740  if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
741    return;
742
743  JobEntry* entry = job_map_.Lookup(job_id);
744  DCHECK(entry);
745
746  JobInfo* job_info = &entry->job_info;
747  job_info->state = STATE_RUNNING;
748  job_info->start_time = now;
749  NotifyJobUpdated(*job_info);
750
751  entry->cancel_callback = entry->task.Run();
752
753  UpdateWait();
754
755  util::Log(logging::LOG_INFO,
756            "Job started: %s - %s",
757            job_info->ToString().c_str(),
758            GetQueueInfo(queue_type).c_str());
759}
760
761int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
762  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
763
764  const int kNoJobShouldRun = -1;
765
766  // Should stop if Drive was disabled while running the fetch loop.
767  if (pref_service_->GetBoolean(prefs::kDisableDrive))
768    return kNoJobShouldRun;
769
770  // Should stop if the network is not online.
771  if (net::NetworkChangeNotifier::IsOffline())
772    return kNoJobShouldRun;
773
774  // For the file queue, if it is on cellular network, only user initiated
775  // operations are allowed to start.
776  if (queue_type == FILE_QUEUE &&
777      pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
778      net::NetworkChangeNotifier::IsConnectionCellular(
779          net::NetworkChangeNotifier::GetConnectionType()))
780    return USER_INITIATED;
781
782  // Otherwise, every operations including background tasks are allowed.
783  return BACKGROUND;
784}
785
786void JobScheduler::UpdateWait() {
787  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
788
789  if (disable_throttling_ || throttle_count_ == 0)
790    return;
791
792  // Exponential backoff: https://developers.google.com/drive/handle-errors.
793  base::TimeDelta delay =
794      base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
795      base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
796  VLOG(1) << "Throttling for " << delay.InMillisecondsF();
797
798  wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
799}
800
801bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
802  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
803
804  JobEntry* job_entry = job_map_.Lookup(job_id);
805  DCHECK(job_entry);
806  JobInfo* job_info = &job_entry->job_info;
807  QueueType queue_type = GetJobQueueType(job_info->job_type);
808  queue_[queue_type]->MarkFinished(job_id);
809
810  const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
811  bool success = (GDataToFileError(error) == FILE_ERROR_OK);
812  util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
813            "Job done: %s => %s (elapsed time: %sms) - %s",
814            job_info->ToString().c_str(),
815            GDataErrorCodeToString(error).c_str(),
816            base::Int64ToString(elapsed.InMilliseconds()).c_str(),
817            GetQueueInfo(queue_type).c_str());
818
819  // Retry, depending on the error.
820  const bool is_server_error =
821      error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
822      error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
823  if (is_server_error) {
824    if (throttle_count_ < kMaxThrottleCount)
825      ++throttle_count_;
826    UpdateWait();
827  } else {
828    throttle_count_ = 0;
829  }
830
831  const bool should_retry =
832      is_server_error && job_entry->retry_count < kMaxRetryCount;
833  if (should_retry) {
834    job_entry->cancel_callback.Reset();
835    job_info->state = STATE_RETRY;
836    NotifyJobUpdated(*job_info);
837
838    ++job_entry->retry_count;
839
840    // Requeue the job.
841    QueueJob(job_id);
842  } else {
843    NotifyJobDone(*job_info, error);
844    // The job has finished, no retry will happen in the scheduler. Now we can
845    // remove the job info from the map.
846    job_map_.Remove(job_id);
847  }
848
849  // Post a task to continue the job loop.  This allows us to finish handling
850  // the current job before starting the next one.
851  base::MessageLoopProxy::current()->PostTask(FROM_HERE,
852      base::Bind(&JobScheduler::DoJobLoop,
853                 weak_ptr_factory_.GetWeakPtr(),
854                 queue_type));
855  return !should_retry;
856}
857
858void JobScheduler::OnGetResourceListJobDone(
859    JobID job_id,
860    const google_apis::GetResourceListCallback& callback,
861    google_apis::GDataErrorCode error,
862    scoped_ptr<google_apis::ResourceList> resource_list) {
863  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
864  DCHECK(!callback.is_null());
865
866  if (OnJobDone(job_id, error))
867    callback.Run(error, resource_list.Pass());
868}
869
870void JobScheduler::OnGetResourceEntryJobDone(
871    JobID job_id,
872    const google_apis::GetResourceEntryCallback& callback,
873    google_apis::GDataErrorCode error,
874    scoped_ptr<google_apis::ResourceEntry> entry) {
875  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
876  DCHECK(!callback.is_null());
877
878  if (OnJobDone(job_id, error))
879    callback.Run(error, entry.Pass());
880}
881
882void JobScheduler::OnGetAboutResourceJobDone(
883    JobID job_id,
884    const google_apis::GetAboutResourceCallback& callback,
885    google_apis::GDataErrorCode error,
886    scoped_ptr<google_apis::AboutResource> about_resource) {
887  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
888  DCHECK(!callback.is_null());
889
890  if (OnJobDone(job_id, error))
891    callback.Run(error, about_resource.Pass());
892}
893
894void JobScheduler::OnGetShareUrlJobDone(
895    JobID job_id,
896    const google_apis::GetShareUrlCallback& callback,
897    google_apis::GDataErrorCode error,
898    const GURL& share_url) {
899  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
900  DCHECK(!callback.is_null());
901
902  if (OnJobDone(job_id, error))
903    callback.Run(error, share_url);
904}
905
906void JobScheduler::OnGetAppListJobDone(
907    JobID job_id,
908    const google_apis::GetAppListCallback& callback,
909    google_apis::GDataErrorCode error,
910    scoped_ptr<google_apis::AppList> app_list) {
911  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
912  DCHECK(!callback.is_null());
913
914  if (OnJobDone(job_id, error))
915    callback.Run(error, app_list.Pass());
916}
917
918void JobScheduler::OnEntryActionJobDone(
919    JobID job_id,
920    const google_apis::EntryActionCallback& callback,
921    google_apis::GDataErrorCode error) {
922  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
923  DCHECK(!callback.is_null());
924
925  if (OnJobDone(job_id, error))
926    callback.Run(error);
927}
928
929void JobScheduler::OnDownloadActionJobDone(
930    JobID job_id,
931    const google_apis::DownloadActionCallback& callback,
932    google_apis::GDataErrorCode error,
933    const base::FilePath& temp_file) {
934  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
935  DCHECK(!callback.is_null());
936
937  if (OnJobDone(job_id, error))
938    callback.Run(error, temp_file);
939}
940
941void JobScheduler::OnUploadCompletionJobDone(
942    JobID job_id,
943    const ResumeUploadParams& resume_params,
944    const google_apis::GetResourceEntryCallback& callback,
945    google_apis::GDataErrorCode error,
946    const GURL& upload_location,
947    scoped_ptr<google_apis::ResourceEntry> resource_entry) {
948  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
949  DCHECK(!callback.is_null());
950
951  if (!upload_location.is_empty()) {
952    // If upload_location is available, update the task to resume the
953    // upload process from the terminated point.
954    // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
955    // so OnJobDone called below will be in charge to re-queue the job.
956    JobEntry* job_entry = job_map_.Lookup(job_id);
957    DCHECK(job_entry);
958
959    ResumeUploadFileParams params;
960    params.upload_location = upload_location;
961    params.local_file_path = resume_params.local_file_path;
962    params.content_type = resume_params.content_type;
963    params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
964                                 weak_ptr_factory_.GetWeakPtr(),
965                                 job_id,
966                                 job_entry->task,
967                                 callback);
968    params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
969                                          weak_ptr_factory_.GetWeakPtr(),
970                                          job_id);
971    job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
972  }
973
974  if (OnJobDone(job_id, error))
975    callback.Run(error, resource_entry.Pass());
976}
977
978void JobScheduler::OnResumeUploadFileDone(
979    JobID job_id,
980    const base::Callback<google_apis::CancelCallback()>& original_task,
981    const google_apis::GetResourceEntryCallback& callback,
982    google_apis::GDataErrorCode error,
983    const GURL& upload_location,
984    scoped_ptr<google_apis::ResourceEntry> resource_entry) {
985  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
986  DCHECK(!original_task.is_null());
987  DCHECK(!callback.is_null());
988
989  if (upload_location.is_empty()) {
990    // If upload_location is not available, we should discard it and stop trying
991    // to resume. Restore the original task.
992    JobEntry* job_entry = job_map_.Lookup(job_id);
993    DCHECK(job_entry);
994    job_entry->task = original_task;
995  }
996
997  if (OnJobDone(job_id, error))
998    callback.Run(error, resource_entry.Pass());
999}
1000
1001void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1002  JobEntry* job_entry = job_map_.Lookup(job_id);
1003  DCHECK(job_entry);
1004
1005  job_entry->job_info.num_completed_bytes = progress;
1006  job_entry->job_info.num_total_bytes = total;
1007  NotifyJobUpdated(job_entry->job_info);
1008}
1009
1010void JobScheduler::OnConnectionTypeChanged(
1011    net::NetworkChangeNotifier::ConnectionType type) {
1012  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1013
1014  // Resume the job loop.
1015  // Note that we don't need to check the network connection status as it will
1016  // be checked in GetCurrentAcceptedPriority().
1017  for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1018    DoJobLoop(static_cast<QueueType>(i));
1019}
1020
1021JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1022  switch (type) {
1023    case TYPE_GET_ABOUT_RESOURCE:
1024    case TYPE_GET_APP_LIST:
1025    case TYPE_GET_ALL_RESOURCE_LIST:
1026    case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1027    case TYPE_SEARCH:
1028    case TYPE_GET_CHANGE_LIST:
1029    case TYPE_CONTINUE_GET_RESOURCE_LIST:
1030    case TYPE_GET_RESOURCE_ENTRY:
1031    case TYPE_GET_SHARE_URL:
1032    case TYPE_DELETE_RESOURCE:
1033    case TYPE_COPY_RESOURCE:
1034    case TYPE_COPY_HOSTED_DOCUMENT:
1035    case TYPE_RENAME_RESOURCE:
1036    case TYPE_TOUCH_RESOURCE:
1037    case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1038    case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1039    case TYPE_ADD_NEW_DIRECTORY:
1040    case TYPE_CREATE_FILE:
1041      return METADATA_QUEUE;
1042
1043    case TYPE_DOWNLOAD_FILE:
1044    case TYPE_UPLOAD_NEW_FILE:
1045    case TYPE_UPLOAD_EXISTING_FILE:
1046      return FILE_QUEUE;
1047  }
1048  NOTREACHED();
1049  return FILE_QUEUE;
1050}
1051
1052void JobScheduler::AbortNotRunningJob(JobEntry* job,
1053                                      google_apis::GDataErrorCode error) {
1054  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1055
1056  const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1057  const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1058  util::Log(logging::LOG_INFO,
1059            "Job aborted: %s => %s (elapsed time: %sms) - %s",
1060            job->job_info.ToString().c_str(),
1061            GDataErrorCodeToString(error).c_str(),
1062            base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1063            GetQueueInfo(queue_type).c_str());
1064
1065  base::Callback<void(google_apis::GDataErrorCode)> callback =
1066      job->abort_callback;
1067  queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1068  NotifyJobDone(job->job_info, error);
1069  job_map_.Remove(job->job_info.job_id);
1070  base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1071                                              base::Bind(callback, error));
1072}
1073
1074void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1075  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1076  FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1077}
1078
1079void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1080                                 google_apis::GDataErrorCode error) {
1081  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1082  FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1083                    OnJobDone(job_info, GDataToFileError(error)));
1084}
1085
1086void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1087  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1088  FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1089}
1090
1091std::string JobScheduler::GetQueueInfo(QueueType type) const {
1092  return QueueTypeToString(type) + " " + queue_[type]->ToString();
1093}
1094
1095// static
1096std::string JobScheduler::QueueTypeToString(QueueType type) {
1097  switch (type) {
1098    case METADATA_QUEUE:
1099      return "METADATA_QUEUE";
1100    case FILE_QUEUE:
1101      return "FILE_QUEUE";
1102    case NUM_QUEUES:
1103      break;  // This value is just a sentinel. Should never be used.
1104  }
1105  NOTREACHED();
1106  return "";
1107}
1108
1109}  // namespace drive
1110