sync_worker.cc revision a02191e04bc25c4935f804f2c080ae28663d096d
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/memory/weak_ptr.h"
11#include "base/threading/sequenced_worker_pool.h"
12#include "base/values.h"
13#include "chrome/browser/drive/drive_api_service.h"
14#include "chrome/browser/drive/drive_notification_manager.h"
15#include "chrome/browser/drive/drive_notification_manager_factory.h"
16#include "chrome/browser/drive/drive_service_interface.h"
17#include "chrome/browser/drive/drive_uploader.h"
18#include "chrome/browser/extensions/extension_service.h"
19#include "chrome/browser/profiles/profile.h"
20#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
21#include "chrome/browser/signin/signin_manager_factory.h"
22#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
23#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
24#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
25#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
26#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
27#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
28#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
29#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
30#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
31#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
32#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
33#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
34#include "chrome/browser/sync_file_system/file_status_observer.h"
35#include "chrome/browser/sync_file_system/logger.h"
36#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
37#include "components/signin/core/browser/profile_oauth2_token_service.h"
38#include "components/signin/core/browser/signin_manager.h"
39#include "content/public/browser/browser_thread.h"
40#include "extensions/browser/extension_system.h"
41#include "extensions/browser/extension_system_provider.h"
42#include "extensions/browser/extensions_browser_client.h"
43#include "extensions/common/extension.h"
44#include "google_apis/drive/drive_api_url_generator.h"
45#include "google_apis/drive/gdata_wapi_url_generator.h"
46#include "webkit/common/blob/scoped_file.h"
47#include "webkit/common/fileapi/file_system_util.h"
48
49namespace sync_file_system {
50
51class RemoteChangeProcessor;
52
53namespace drive_backend {
54
55namespace {
56
57void EmptyStatusCallback(SyncStatusCode status) {}
58
59}  // namespace
60
61scoped_ptr<SyncWorker> SyncWorker::CreateOnWorker(
62    const base::WeakPtr<drive_backend::SyncEngine>& sync_engine,
63    const base::FilePath& base_dir,
64    scoped_ptr<SyncEngineContext> sync_engine_context,
65    leveldb::Env* env_override) {
66  scoped_ptr<SyncWorker> sync_worker(
67      new SyncWorker(sync_engine,
68                     base_dir,
69                     sync_engine_context.Pass(),
70                     env_override));
71  sync_worker->Initialize();
72
73  return sync_worker.Pass();
74}
75
76SyncWorker::~SyncWorker() {}
77
78void SyncWorker::Initialize() {
79  DCHECK(!task_manager_);
80
81  task_manager_.reset(new SyncTaskManager(
82      weak_ptr_factory_.GetWeakPtr(), 0 /* maximum_background_task */));
83  task_manager_->Initialize(SYNC_STATUS_OK);
84
85  PostInitializeTask();
86
87  net::NetworkChangeNotifier::ConnectionType type =
88      net::NetworkChangeNotifier::GetConnectionType();
89  network_available_ =
90      type != net::NetworkChangeNotifier::CONNECTION_NONE;
91}
92
93void SyncWorker::RegisterOrigin(
94    const GURL& origin,
95    const SyncStatusCallback& callback) {
96  if (!GetMetadataDatabase() && GetDriveService()->HasRefreshToken())
97    PostInitializeTask();
98
99  scoped_ptr<RegisterAppTask> task(
100      new RegisterAppTask(context_.get(), origin.host()));
101  if (task->CanFinishImmediately()) {
102    callback.Run(SYNC_STATUS_OK);
103    return;
104  }
105
106  task_manager_->ScheduleSyncTask(
107      FROM_HERE,
108      task.PassAs<SyncTask>(),
109      SyncTaskManager::PRIORITY_HIGH,
110      callback);
111}
112
113void SyncWorker::EnableOrigin(
114    const GURL& origin,
115    const SyncStatusCallback& callback) {
116  task_manager_->ScheduleTask(
117      FROM_HERE,
118      base::Bind(&SyncWorker::DoEnableApp,
119                 weak_ptr_factory_.GetWeakPtr(),
120                 origin.host()),
121      SyncTaskManager::PRIORITY_HIGH,
122      callback);
123}
124
125void SyncWorker::DisableOrigin(
126    const GURL& origin,
127    const SyncStatusCallback& callback) {
128  task_manager_->ScheduleTask(
129      FROM_HERE,
130      base::Bind(&SyncWorker::DoDisableApp,
131                 weak_ptr_factory_.GetWeakPtr(),
132                 origin.host()),
133      SyncTaskManager::PRIORITY_HIGH,
134      callback);
135}
136
137void SyncWorker::UninstallOrigin(
138    const GURL& origin,
139    RemoteFileSyncService::UninstallFlag flag,
140    const SyncStatusCallback& callback) {
141  task_manager_->ScheduleSyncTask(
142      FROM_HERE,
143      scoped_ptr<SyncTask>(
144          new UninstallAppTask(context_.get(), origin.host(), flag)),
145      SyncTaskManager::PRIORITY_HIGH,
146      callback);
147}
148
149void SyncWorker::ProcessRemoteChange(
150    const SyncFileCallback& callback) {
151  RemoteToLocalSyncer* syncer = new RemoteToLocalSyncer(context_.get());
152  task_manager_->ScheduleSyncTask(
153      FROM_HERE,
154      scoped_ptr<SyncTask>(syncer),
155      SyncTaskManager::PRIORITY_MED,
156      base::Bind(&SyncWorker::DidProcessRemoteChange,
157                 weak_ptr_factory_.GetWeakPtr(),
158                 syncer, callback));
159}
160
161void SyncWorker::SetRemoteChangeProcessor(
162    RemoteChangeProcessor* processor) {
163  context_->SetRemoteChangeProcessor(processor);
164}
165
166RemoteServiceState SyncWorker::GetCurrentState() const {
167  if (!sync_enabled_)
168    return REMOTE_SERVICE_DISABLED;
169  return service_state_;
170}
171
172void SyncWorker::GetOriginStatusMap(
173    RemoteFileSyncService::OriginStatusMap* status_map) {
174  DCHECK(status_map);
175
176  if (!GetMetadataDatabase())
177    return;
178
179  std::vector<std::string> app_ids;
180  GetMetadataDatabase()->GetRegisteredAppIDs(&app_ids);
181
182  for (std::vector<std::string>::const_iterator itr = app_ids.begin();
183       itr != app_ids.end(); ++itr) {
184    const std::string& app_id = *itr;
185    GURL origin =
186        extensions::Extension::GetBaseURLFromExtensionId(app_id);
187    (*status_map)[origin] =
188        GetMetadataDatabase()->IsAppEnabled(app_id) ?
189        "Enabled" : "Disabled";
190  }
191}
192
193scoped_ptr<base::ListValue> SyncWorker::DumpFiles(const GURL& origin) {
194  if (!GetMetadataDatabase())
195    return scoped_ptr<base::ListValue>();
196  return GetMetadataDatabase()->DumpFiles(origin.host());
197}
198
199scoped_ptr<base::ListValue> SyncWorker::DumpDatabase() {
200  if (!GetMetadataDatabase())
201    return scoped_ptr<base::ListValue>();
202  return GetMetadataDatabase()->DumpDatabase();
203}
204
205void SyncWorker::SetSyncEnabled(bool enabled) {
206  if (sync_enabled_ == enabled)
207    return;
208
209  RemoteServiceState old_state = GetCurrentState();
210  sync_enabled_ = enabled;
211  if (old_state == GetCurrentState())
212    return;
213
214  // TODO(peria): PostTask()
215  sync_engine_->UpdateSyncEnabled(enabled);
216}
217
218SyncStatusCode SyncWorker::SetDefaultConflictResolutionPolicy(
219    ConflictResolutionPolicy policy) {
220  default_conflict_resolution_policy_ = policy;
221  return SYNC_STATUS_OK;
222}
223
224SyncStatusCode SyncWorker::SetConflictResolutionPolicy(
225    const GURL& origin,
226    ConflictResolutionPolicy policy) {
227  NOTIMPLEMENTED();
228  default_conflict_resolution_policy_ = policy;
229  return SYNC_STATUS_OK;
230}
231
232ConflictResolutionPolicy SyncWorker::GetDefaultConflictResolutionPolicy()
233    const {
234  return default_conflict_resolution_policy_;
235}
236
237ConflictResolutionPolicy SyncWorker::GetConflictResolutionPolicy(
238    const GURL& origin) const {
239  NOTIMPLEMENTED();
240  return default_conflict_resolution_policy_;
241}
242
243void SyncWorker::ApplyLocalChange(
244    const FileChange& local_change,
245    const base::FilePath& local_path,
246    const SyncFileMetadata& local_metadata,
247    const fileapi::FileSystemURL& url,
248    const SyncStatusCallback& callback) {
249  LocalToRemoteSyncer* syncer = new LocalToRemoteSyncer(
250      context_.get(), local_metadata, local_change, local_path, url);
251  task_manager_->ScheduleSyncTask(
252      FROM_HERE,
253      scoped_ptr<SyncTask>(syncer),
254      SyncTaskManager::PRIORITY_MED,
255      base::Bind(&SyncWorker::DidApplyLocalChange,
256                 weak_ptr_factory_.GetWeakPtr(),
257                 syncer, callback));
258}
259
260void SyncWorker::MaybeScheduleNextTask() {
261  if (GetCurrentState() == REMOTE_SERVICE_DISABLED)
262    return;
263
264  // TODO(tzik): Notify observer of OnRemoteChangeQueueUpdated.
265  // TODO(tzik): Add an interface to get the number of dirty trackers to
266  // MetadataDatabase.
267
268  MaybeStartFetchChanges();
269}
270
271void SyncWorker::NotifyLastOperationStatus(
272    SyncStatusCode status,
273    bool used_network) {
274  UpdateServiceStateFromSyncStatusCode(status, used_network);
275
276  if (GetMetadataDatabase()) {
277    // TODO(peria): Post task
278    sync_engine_->NotifyLastOperationStatus();
279  }
280}
281
282void SyncWorker::OnNotificationReceived() {
283  if (service_state_ == REMOTE_SERVICE_TEMPORARY_UNAVAILABLE)
284    UpdateServiceState(REMOTE_SERVICE_OK, "Got push notification for Drive.");
285
286  should_check_remote_change_ = true;
287  MaybeScheduleNextTask();
288}
289
290void SyncWorker::OnReadyToSendRequests(const std::string& account_id) {
291  if (service_state_ == REMOTE_SERVICE_OK)
292    return;
293  UpdateServiceState(REMOTE_SERVICE_OK, "Authenticated");
294
295  if (!GetMetadataDatabase() && !account_id.empty()) {
296    GetDriveService()->Initialize(account_id);
297    PostInitializeTask();
298    return;
299  }
300
301  should_check_remote_change_ = true;
302  MaybeScheduleNextTask();
303}
304
305void SyncWorker::OnRefreshTokenInvalid() {
306  UpdateServiceState(
307      REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
308      "Found invalid refresh token.");
309}
310
311void SyncWorker::OnNetworkChanged(
312    net::NetworkChangeNotifier::ConnectionType type) {
313  bool new_network_availability =
314      type != net::NetworkChangeNotifier::CONNECTION_NONE;
315
316  if (network_available_ && !new_network_availability) {
317    UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE, "Disconnected");
318  } else if (!network_available_ && new_network_availability) {
319    UpdateServiceState(REMOTE_SERVICE_OK, "Connected");
320    should_check_remote_change_ = true;
321    MaybeStartFetchChanges();
322  }
323  network_available_ = new_network_availability;
324}
325
326drive::DriveServiceInterface* SyncWorker::GetDriveService() {
327  return context_->GetDriveService();
328}
329
330drive::DriveUploaderInterface* SyncWorker::GetDriveUploader() {
331  return context_->GetDriveUploader();
332}
333
334MetadataDatabase* SyncWorker::GetMetadataDatabase() {
335  return context_->GetMetadataDatabase();
336}
337
338SyncTaskManager* SyncWorker::GetSyncTaskManager() {
339  return task_manager_.get();
340}
341
342SyncWorker::SyncWorker(
343    const base::WeakPtr<drive_backend::SyncEngine>& sync_engine,
344    const base::FilePath& base_dir,
345    scoped_ptr<SyncEngineContext> sync_engine_context,
346    leveldb::Env* env_override)
347    : base_dir_(base_dir),
348      env_override_(env_override),
349      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
350      should_check_conflict_(true),
351      should_check_remote_change_(true),
352      listing_remote_changes_(false),
353      sync_enabled_(false),
354      default_conflict_resolution_policy_(
355          CONFLICT_RESOLUTION_POLICY_LAST_WRITE_WIN),
356      network_available_(false),
357      context_(sync_engine_context.Pass()),
358      sync_engine_(sync_engine),
359      weak_ptr_factory_(this) {}
360
361void SyncWorker::DoDisableApp(const std::string& app_id,
362                              const SyncStatusCallback& callback) {
363  if (GetMetadataDatabase())
364    GetMetadataDatabase()->DisableApp(app_id, callback);
365  else
366    callback.Run(SYNC_STATUS_OK);
367}
368
369void SyncWorker::DoEnableApp(const std::string& app_id,
370                             const SyncStatusCallback& callback) {
371  if (GetMetadataDatabase())
372    GetMetadataDatabase()->EnableApp(app_id, callback);
373  else
374    callback.Run(SYNC_STATUS_OK);
375}
376
377void SyncWorker::PostInitializeTask() {
378  DCHECK(!GetMetadataDatabase());
379
380  // This initializer task may not run if MetadataDatabase in context_ is
381  // already initialized when it runs.
382  SyncEngineInitializer* initializer =
383      new SyncEngineInitializer(context_.get(),
384                                context_->GetBlockingTaskRunner(),
385                                base_dir_.Append(kDatabaseName),
386                                env_override_);
387  task_manager_->ScheduleSyncTask(
388      FROM_HERE,
389      scoped_ptr<SyncTask>(initializer),
390      SyncTaskManager::PRIORITY_HIGH,
391      base::Bind(&SyncWorker::DidInitialize, weak_ptr_factory_.GetWeakPtr(),
392                 initializer));
393}
394
395void SyncWorker::DidInitialize(SyncEngineInitializer* initializer,
396                               SyncStatusCode status) {
397  if (status != SYNC_STATUS_OK) {
398    if (GetDriveService()->HasRefreshToken()) {
399      UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE,
400                         "Could not initialize remote service");
401    } else {
402      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
403                         "Authentication required.");
404    }
405    return;
406  }
407
408  scoped_ptr<MetadataDatabase> metadata_database =
409      initializer->PassMetadataDatabase();
410  if (metadata_database)
411    context_->SetMetadataDatabase(metadata_database.Pass());
412
413  // TODO(peria): Post task
414  sync_engine_->UpdateRegisteredApps();
415}
416
417void SyncWorker::DidProcessRemoteChange(RemoteToLocalSyncer* syncer,
418                                        const SyncFileCallback& callback,
419                                        SyncStatusCode status) {
420  if (syncer->is_sync_root_deletion()) {
421    MetadataDatabase::ClearDatabase(context_->PassMetadataDatabase());
422    PostInitializeTask();
423    callback.Run(status, syncer->url());
424    return;
425  }
426
427  if (status == SYNC_STATUS_OK) {
428    // TODO(peria): Post task
429    sync_engine_->DidProcessRemoteChange(syncer);
430
431    if (syncer->sync_action() == SYNC_ACTION_DELETED &&
432        syncer->url().is_valid() &&
433        fileapi::VirtualPath::IsRootPath(syncer->url().path())) {
434      RegisterOrigin(syncer->url().origin(), base::Bind(&EmptyStatusCallback));
435    }
436    should_check_conflict_ = true;
437  }
438  callback.Run(status, syncer->url());
439}
440
441void SyncWorker::DidApplyLocalChange(LocalToRemoteSyncer* syncer,
442                                     const SyncStatusCallback& callback,
443                                     SyncStatusCode status) {
444  // TODO(peria): Post task
445  sync_engine_->DidApplyLocalChange(syncer, status);
446
447  if (status == SYNC_STATUS_UNKNOWN_ORIGIN && syncer->url().is_valid()) {
448    RegisterOrigin(syncer->url().origin(),
449                   base::Bind(&EmptyStatusCallback));
450  }
451
452  if (syncer->needs_remote_change_listing() &&
453      !listing_remote_changes_) {
454    task_manager_->ScheduleSyncTask(
455        FROM_HERE,
456        scoped_ptr<SyncTask>(new ListChangesTask(context_.get())),
457        SyncTaskManager::PRIORITY_HIGH,
458        base::Bind(&SyncWorker::DidFetchChanges,
459                   weak_ptr_factory_.GetWeakPtr()));
460    should_check_remote_change_ = false;
461    listing_remote_changes_ = true;
462    time_to_check_changes_ =
463        base::TimeTicks::Now() +
464        base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds);
465  }
466
467  if (status != SYNC_STATUS_OK &&
468      status != SYNC_STATUS_NO_CHANGE_TO_SYNC) {
469    callback.Run(status);
470    return;
471  }
472
473  if (status == SYNC_STATUS_OK)
474    should_check_conflict_ = true;
475
476  callback.Run(status);
477}
478
479void SyncWorker::MaybeStartFetchChanges() {
480  if (GetCurrentState() == REMOTE_SERVICE_DISABLED)
481    return;
482
483  if (!GetMetadataDatabase())
484    return;
485
486  if (listing_remote_changes_)
487    return;
488
489  base::TimeTicks now = base::TimeTicks::Now();
490  if (!should_check_remote_change_ && now < time_to_check_changes_) {
491    if (!GetMetadataDatabase()->HasDirtyTracker() &&
492        should_check_conflict_) {
493      should_check_conflict_ = false;
494      task_manager_->ScheduleSyncTaskIfIdle(
495          FROM_HERE,
496          scoped_ptr<SyncTask>(new ConflictResolver(context_.get())),
497          base::Bind(&SyncWorker::DidResolveConflict,
498                     weak_ptr_factory_.GetWeakPtr()));
499    }
500    return;
501  }
502
503  if (task_manager_->ScheduleSyncTaskIfIdle(
504          FROM_HERE,
505          scoped_ptr<SyncTask>(new ListChangesTask(context_.get())),
506          base::Bind(&SyncWorker::DidFetchChanges,
507                     weak_ptr_factory_.GetWeakPtr()))) {
508    should_check_remote_change_ = false;
509    listing_remote_changes_ = true;
510    time_to_check_changes_ =
511        now + base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds);
512  }
513}
514
515void SyncWorker::DidResolveConflict(SyncStatusCode status) {
516  if (status == SYNC_STATUS_OK)
517    should_check_conflict_ = true;
518}
519
520void SyncWorker::DidFetchChanges(SyncStatusCode status) {
521  if (status == SYNC_STATUS_OK)
522    should_check_conflict_ = true;
523  listing_remote_changes_ = false;
524}
525
526void SyncWorker::UpdateServiceStateFromSyncStatusCode(
527    SyncStatusCode status,
528    bool used_network) {
529  switch (status) {
530    case SYNC_STATUS_OK:
531      if (used_network)
532        UpdateServiceState(REMOTE_SERVICE_OK, std::string());
533      break;
534
535    // Authentication error.
536    case SYNC_STATUS_AUTHENTICATION_FAILED:
537      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
538                         "Authentication required");
539      break;
540
541    // OAuth token error.
542    case SYNC_STATUS_ACCESS_FORBIDDEN:
543      UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
544                         "Access forbidden");
545      break;
546
547    // Errors which could make the service temporarily unavailable.
548    case SYNC_STATUS_SERVICE_TEMPORARILY_UNAVAILABLE:
549    case SYNC_STATUS_NETWORK_ERROR:
550    case SYNC_STATUS_ABORT:
551    case SYNC_STATUS_FAILED:
552      if (GetDriveService()->HasRefreshToken()) {
553        UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE,
554                           "Network or temporary service error.");
555      } else {
556        UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
557                           "Authentication required");
558      }
559      break;
560
561    // Errors which would require manual user intervention to resolve.
562    case SYNC_DATABASE_ERROR_CORRUPTION:
563    case SYNC_DATABASE_ERROR_IO_ERROR:
564    case SYNC_DATABASE_ERROR_FAILED:
565      UpdateServiceState(REMOTE_SERVICE_DISABLED,
566                         "Unrecoverable database error");
567      break;
568
569    default:
570      // Other errors don't affect service state
571      break;
572  }
573}
574
575void SyncWorker::UpdateServiceState(RemoteServiceState state,
576                                    const std::string& description) {
577  RemoteServiceState old_state = GetCurrentState();
578  service_state_ = state;
579
580  if (old_state == GetCurrentState())
581    return;
582
583  util::Log(logging::LOG_VERBOSE, FROM_HERE,
584            "Service state changed: %d->%d: %s",
585            old_state, GetCurrentState(), description.c_str());
586  // TODO(peria): Post task
587  sync_engine_->UpdateServiceState(description);
588}
589
590}  // namespace drive_backend
591}  // namespace sync_file_system
592