model_type_registry.cc revision effb81e5f8246d0db0270817048dc992db66e9fb
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/sessions/model_type_registry.h"
6
7#include "base/bind.h"
8#include "base/message_loop/message_loop_proxy.h"
9#include "sync/engine/directory_commit_contributor.h"
10#include "sync/engine/directory_update_handler.h"
11#include "sync/engine/non_blocking_type_processor_core.h"
12#include "sync/internal_api/public/non_blocking_type_processor.h"
13
14namespace syncer {
15
16ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
17
18ModelTypeRegistry::ModelTypeRegistry(
19    const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
20    syncable::Directory* directory)
21    : directory_(directory) {
22  for (size_t i = 0u; i < workers.size(); ++i) {
23    workers_map_.insert(
24        std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
25  }
26}
27
28ModelTypeRegistry::~ModelTypeRegistry() {}
29
30void ModelTypeRegistry::SetEnabledDirectoryTypes(
31    const ModelSafeRoutingInfo& routing_info) {
32  // Remove all existing directory processors and delete them.
33  for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
34       it.Good(); it.Inc()) {
35    size_t result1 = update_handler_map_.erase(it.Get());
36    size_t result2 = commit_contributor_map_.erase(it.Get());
37    DCHECK_EQ(1U, result1);
38    DCHECK_EQ(1U, result2);
39  }
40
41  // Clear the old instances of directory update handlers and commit
42  // contributors, deleting their contents in the processs.
43  directory_update_handlers_.clear();
44  directory_commit_contributors_.clear();
45
46  // Create new ones and add them to the appropriate containers.
47  for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
48       routing_iter != routing_info.end(); ++routing_iter) {
49    ModelType type = routing_iter->first;
50    ModelSafeGroup group = routing_iter->second;
51    std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
52        worker_it = workers_map_.find(group);
53    DCHECK(worker_it != workers_map_.end());
54    scoped_refptr<ModelSafeWorker> worker = worker_it->second;
55
56    DirectoryCommitContributor* committer =
57        new DirectoryCommitContributor(directory_, type);
58    DirectoryUpdateHandler* updater =
59        new DirectoryUpdateHandler(directory_, type, worker);
60
61    // These containers take ownership of their contents.
62    directory_commit_contributors_.push_back(committer);
63    directory_update_handlers_.push_back(updater);
64
65    bool inserted1 =
66        update_handler_map_.insert(std::make_pair(type, updater)).second;
67    DCHECK(inserted1) << "Attempt to override existing type handler in map";
68
69    bool inserted2 =
70        commit_contributor_map_.insert(std::make_pair(type, committer)).second;
71    DCHECK(inserted2) << "Attempt to override existing type handler in map";
72  }
73
74  enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
75  DCHECK(Intersection(GetEnabledDirectoryTypes(),
76                      GetEnabledNonBlockingTypes()).Empty());
77}
78
79void ModelTypeRegistry::InitializeNonBlockingType(
80    ModelType type,
81    scoped_refptr<base::SequencedTaskRunner> type_task_runner,
82    base::WeakPtr<NonBlockingTypeProcessor> processor) {
83  DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
84
85  // Initialize CoreProcessor -> Processor communication channel.
86  scoped_ptr<NonBlockingTypeProcessorCore> core(
87      new NonBlockingTypeProcessorCore(type, type_task_runner, processor));
88
89  // Initialize Processor -> CoreProcessor communication channel.
90  type_task_runner->PostTask(
91      FROM_HERE,
92      base::Bind(&NonBlockingTypeProcessor::OnConnect,
93                 processor->AsWeakPtr(),
94                 core->AsWeakPtr(),
95                 scoped_refptr<base::SequencedTaskRunner>(
96                     base::MessageLoopProxy::current())));
97
98  DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
99  DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
100
101  update_handler_map_.insert(std::make_pair(type, core.get()));
102  commit_contributor_map_.insert(std::make_pair(type, core.get()));
103
104  // The container takes ownership.
105  non_blocking_type_processor_cores_.push_back(core.release());
106
107  DCHECK(Intersection(GetEnabledDirectoryTypes(),
108                      GetEnabledNonBlockingTypes()).Empty());
109}
110
111void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
112  DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
113  DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
114  DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
115
116  size_t updaters_erased = update_handler_map_.erase(type);
117  size_t committers_erased = commit_contributor_map_.erase(type);
118
119  DCHECK_EQ(1U, updaters_erased);
120  DCHECK_EQ(1U, committers_erased);
121
122  // Remove from the ScopedVector, deleting the core in the process.
123  for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
124       non_blocking_type_processor_cores_.begin();
125       it != non_blocking_type_processor_cores_.end(); ++it) {
126    if ((*it)->GetModelType() == type) {
127      non_blocking_type_processor_cores_.erase(it);
128      break;
129    }
130  }
131}
132
133ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
134  return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
135}
136
137UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
138  return &update_handler_map_;
139}
140
141CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
142  return &commit_contributor_map_;
143}
144
145ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
146  return enabled_directory_types_;
147}
148
149ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
150  ModelTypeSet enabled_off_thread_types;
151  for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
152           non_blocking_type_processor_cores_.begin();
153       it != non_blocking_type_processor_cores_.end(); ++it) {
154    enabled_off_thread_types.Put((*it)->GetModelType());
155  }
156  return enabled_off_thread_types;
157}
158
159}  // namespace syncer
160