1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/sessions/model_type_registry.h"
6
7#include "base/bind.h"
8#include "base/observer_list.h"
9#include "base/thread_task_runner_handle.h"
10#include "sync/engine/directory_commit_contributor.h"
11#include "sync/engine/directory_update_handler.h"
12#include "sync/engine/model_type_sync_proxy.h"
13#include "sync/engine/model_type_sync_proxy_impl.h"
14#include "sync/engine/model_type_sync_worker.h"
15#include "sync/engine/model_type_sync_worker_impl.h"
16#include "sync/internal_api/public/non_blocking_sync_common.h"
17#include "sync/sessions/directory_type_debug_info_emitter.h"
18#include "sync/util/cryptographer.h"
19
20namespace syncer {
21
22namespace {
23
24class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
25 public:
26  ModelTypeSyncProxyWrapper(
27      const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
28      const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29  virtual ~ModelTypeSyncProxyWrapper();
30
31  virtual void OnCommitCompleted(
32      const DataTypeState& type_state,
33      const CommitResponseDataList& response_list) OVERRIDE;
34  virtual void OnUpdateReceived(
35      const DataTypeState& type_state,
36      const UpdateResponseDataList& response_list,
37      const UpdateResponseDataList& pending_updates) OVERRIDE;
38
39 private:
40  base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
41  scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42};
43
44ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
45    const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
46    const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
47    : processor_(proxy), processor_task_runner_(processor_task_runner) {
48}
49
50ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51}
52
53void ModelTypeSyncProxyWrapper::OnCommitCompleted(
54    const DataTypeState& type_state,
55    const CommitResponseDataList& response_list) {
56  processor_task_runner_->PostTask(
57      FROM_HERE,
58      base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
59                 processor_,
60                 type_state,
61                 response_list));
62}
63
64void ModelTypeSyncProxyWrapper::OnUpdateReceived(
65    const DataTypeState& type_state,
66    const UpdateResponseDataList& response_list,
67    const UpdateResponseDataList& pending_updates) {
68  processor_task_runner_->PostTask(
69      FROM_HERE,
70      base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
71                 processor_,
72                 type_state,
73                 response_list,
74                 pending_updates));
75}
76
77class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
78 public:
79  ModelTypeSyncWorkerWrapper(
80      const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
81      const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
82  virtual ~ModelTypeSyncWorkerWrapper();
83
84  virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
85
86 private:
87  base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
88  scoped_refptr<base::SequencedTaskRunner> sync_thread_;
89};
90
91ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
92    const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
93    const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
94    : worker_(worker), sync_thread_(sync_thread) {
95}
96
97ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
98}
99
100void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
101    const CommitRequestDataList& list) {
102  sync_thread_->PostTask(
103      FROM_HERE,
104      base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
105}
106
107}  // namespace
108
109ModelTypeRegistry::ModelTypeRegistry(
110    const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
111    syncable::Directory* directory,
112    NudgeHandler* nudge_handler)
113    : directory_(directory),
114      nudge_handler_(nudge_handler),
115      weak_ptr_factory_(this) {
116  for (size_t i = 0u; i < workers.size(); ++i) {
117    workers_map_.insert(
118        std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
119  }
120}
121
122ModelTypeRegistry::~ModelTypeRegistry() {
123}
124
125void ModelTypeRegistry::SetEnabledDirectoryTypes(
126    const ModelSafeRoutingInfo& routing_info) {
127  // Remove all existing directory processors and delete them.  The
128  // DebugInfoEmitters are not deleted here, since we want to preserve their
129  // counters.
130  for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
131       it.Good(); it.Inc()) {
132    size_t result1 = update_handler_map_.erase(it.Get());
133    size_t result2 = commit_contributor_map_.erase(it.Get());
134    DCHECK_EQ(1U, result1);
135    DCHECK_EQ(1U, result2);
136  }
137
138  // Clear the old instances of directory update handlers and commit
139  // contributors, deleting their contents in the processs.
140  directory_update_handlers_.clear();
141  directory_commit_contributors_.clear();
142
143  // Create new ones and add them to the appropriate containers.
144  for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
145       routing_iter != routing_info.end(); ++routing_iter) {
146    ModelType type = routing_iter->first;
147    ModelSafeGroup group = routing_iter->second;
148    std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
149        worker_it = workers_map_.find(group);
150    DCHECK(worker_it != workers_map_.end());
151    scoped_refptr<ModelSafeWorker> worker = worker_it->second;
152
153    // DebugInfoEmitters are never deleted.  Use existing one if we have it.
154    DirectoryTypeDebugInfoEmitter* emitter = NULL;
155    DirectoryTypeDebugInfoEmitterMap::iterator it =
156        directory_type_debug_info_emitter_map_.find(type);
157    if (it != directory_type_debug_info_emitter_map_.end()) {
158      emitter = it->second;
159    } else {
160      emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
161                                                  &type_debug_info_observers_);
162      directory_type_debug_info_emitter_map_.insert(
163          std::make_pair(type, emitter));
164      directory_type_debug_info_emitters_.push_back(emitter);
165    }
166
167    DirectoryCommitContributor* committer =
168        new DirectoryCommitContributor(directory_, type, emitter);
169    DirectoryUpdateHandler* updater =
170        new DirectoryUpdateHandler(directory_, type, worker, emitter);
171
172    // These containers take ownership of their contents.
173    directory_commit_contributors_.push_back(committer);
174    directory_update_handlers_.push_back(updater);
175
176    bool inserted1 =
177        update_handler_map_.insert(std::make_pair(type, updater)).second;
178    DCHECK(inserted1) << "Attempt to override existing type handler in map";
179
180    bool inserted2 =
181        commit_contributor_map_.insert(std::make_pair(type, committer)).second;
182    DCHECK(inserted2) << "Attempt to override existing type handler in map";
183  }
184
185  enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
186  DCHECK(Intersection(GetEnabledDirectoryTypes(),
187                      GetEnabledNonBlockingTypes()).Empty());
188}
189
190void ModelTypeRegistry::ConnectSyncTypeToWorker(
191    ModelType type,
192    const DataTypeState& data_type_state,
193    const UpdateResponseDataList& saved_pending_updates,
194    const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
195    const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
196  DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
197
198  // Initialize Worker -> Proxy communication channel.
199  scoped_ptr<ModelTypeSyncProxy> proxy(
200      new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
201  scoped_ptr<Cryptographer> cryptographer_copy;
202  if (encrypted_types_.Has(type))
203    cryptographer_copy.reset(new Cryptographer(*cryptographer_));
204
205  scoped_ptr<ModelTypeSyncWorkerImpl> worker(
206      new ModelTypeSyncWorkerImpl(type,
207                                  data_type_state,
208                                  saved_pending_updates,
209                                  cryptographer_copy.Pass(),
210                                  nudge_handler_,
211                                  proxy.Pass()));
212
213  // Initialize Proxy -> Worker communication channel.
214  scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
215      new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
216                                     scoped_refptr<base::SequencedTaskRunner>(
217                                         base::ThreadTaskRunnerHandle::Get())));
218  type_task_runner->PostTask(FROM_HERE,
219                             base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
220                                        proxy_impl,
221                                        base::Passed(&wrapped_worker)));
222
223  DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
224  DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
225
226  update_handler_map_.insert(std::make_pair(type, worker.get()));
227  commit_contributor_map_.insert(std::make_pair(type, worker.get()));
228
229  // The container takes ownership.
230  model_type_sync_workers_.push_back(worker.release());
231
232  DCHECK(Intersection(GetEnabledDirectoryTypes(),
233                      GetEnabledNonBlockingTypes()).Empty());
234}
235
236void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
237  DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
238  DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
239  DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
240
241  size_t updaters_erased = update_handler_map_.erase(type);
242  size_t committers_erased = commit_contributor_map_.erase(type);
243
244  DCHECK_EQ(1U, updaters_erased);
245  DCHECK_EQ(1U, committers_erased);
246
247  // Remove from the ScopedVector, deleting the worker in the process.
248  for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
249           model_type_sync_workers_.begin();
250       it != model_type_sync_workers_.end();
251       ++it) {
252    if ((*it)->GetModelType() == type) {
253      model_type_sync_workers_.erase(it);
254      break;
255    }
256  }
257}
258
259ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
260  return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
261}
262
263UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
264  return &update_handler_map_;
265}
266
267CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
268  return &commit_contributor_map_;
269}
270
271DirectoryTypeDebugInfoEmitterMap*
272ModelTypeRegistry::directory_type_debug_info_emitter_map() {
273  return &directory_type_debug_info_emitter_map_;
274}
275
276void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
277    syncer::TypeDebugInfoObserver* observer) {
278  if (!type_debug_info_observers_.HasObserver(observer))
279    type_debug_info_observers_.AddObserver(observer);
280}
281
282void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
283    syncer::TypeDebugInfoObserver* observer) {
284  type_debug_info_observers_.RemoveObserver(observer);
285}
286
287bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
288    syncer::TypeDebugInfoObserver* observer) {
289  return type_debug_info_observers_.HasObserver(observer);
290}
291
292void ModelTypeRegistry::RequestEmitDebugInfo() {
293  for (DirectoryTypeDebugInfoEmitterMap::iterator it =
294       directory_type_debug_info_emitter_map_.begin();
295       it != directory_type_debug_info_emitter_map_.end(); ++it) {
296    it->second->EmitCommitCountersUpdate();
297    it->second->EmitUpdateCountersUpdate();
298    it->second->EmitStatusCountersUpdate();
299  }
300}
301
302base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
303  return weak_ptr_factory_.GetWeakPtr();
304}
305
306void ModelTypeRegistry::OnPassphraseRequired(
307    PassphraseRequiredReason reason,
308    const sync_pb::EncryptedData& pending_keys) {
309}
310
311void ModelTypeRegistry::OnPassphraseAccepted() {
312}
313
314void ModelTypeRegistry::OnBootstrapTokenUpdated(
315    const std::string& bootstrap_token,
316    BootstrapTokenType type) {
317}
318
319void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
320                                                bool encrypt_everything) {
321  encrypted_types_ = encrypted_types;
322  OnEncryptionStateChanged();
323}
324
325void ModelTypeRegistry::OnEncryptionComplete() {
326}
327
328void ModelTypeRegistry::OnCryptographerStateChanged(
329    Cryptographer* cryptographer) {
330  cryptographer_.reset(new Cryptographer(*cryptographer));
331  OnEncryptionStateChanged();
332}
333
334void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
335                                                base::Time passphrase_time) {
336}
337
338ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
339  return enabled_directory_types_;
340}
341
342void ModelTypeRegistry::OnEncryptionStateChanged() {
343  for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
344           model_type_sync_workers_.begin();
345       it != model_type_sync_workers_.end();
346       ++it) {
347    if (encrypted_types_.Has((*it)->GetModelType())) {
348      (*it)->UpdateCryptographer(
349          make_scoped_ptr(new Cryptographer(*cryptographer_)));
350    }
351  }
352}
353
354ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
355  ModelTypeSet enabled_off_thread_types;
356  for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
357           model_type_sync_workers_.begin();
358       it != model_type_sync_workers_.end();
359       ++it) {
360    enabled_off_thread_types.Put((*it)->GetModelType());
361  }
362  return enabled_off_thread_types;
363}
364
365}  // namespace syncer
366