model_type_registry.cc revision effb81e5f8246d0db0270817048dc992db66e9fb
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/sessions/model_type_registry.h" 6 7#include "base/bind.h" 8#include "base/message_loop/message_loop_proxy.h" 9#include "sync/engine/directory_commit_contributor.h" 10#include "sync/engine/directory_update_handler.h" 11#include "sync/engine/non_blocking_type_processor_core.h" 12#include "sync/internal_api/public/non_blocking_type_processor.h" 13 14namespace syncer { 15 16ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} 17 18ModelTypeRegistry::ModelTypeRegistry( 19 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 20 syncable::Directory* directory) 21 : directory_(directory) { 22 for (size_t i = 0u; i < workers.size(); ++i) { 23 workers_map_.insert( 24 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i])); 25 } 26} 27 28ModelTypeRegistry::~ModelTypeRegistry() {} 29 30void ModelTypeRegistry::SetEnabledDirectoryTypes( 31 const ModelSafeRoutingInfo& routing_info) { 32 // Remove all existing directory processors and delete them. 33 for (ModelTypeSet::Iterator it = enabled_directory_types_.First(); 34 it.Good(); it.Inc()) { 35 size_t result1 = update_handler_map_.erase(it.Get()); 36 size_t result2 = commit_contributor_map_.erase(it.Get()); 37 DCHECK_EQ(1U, result1); 38 DCHECK_EQ(1U, result2); 39 } 40 41 // Clear the old instances of directory update handlers and commit 42 // contributors, deleting their contents in the processs. 43 directory_update_handlers_.clear(); 44 directory_commit_contributors_.clear(); 45 46 // Create new ones and add them to the appropriate containers. 47 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin(); 48 routing_iter != routing_info.end(); ++routing_iter) { 49 ModelType type = routing_iter->first; 50 ModelSafeGroup group = routing_iter->second; 51 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator 52 worker_it = workers_map_.find(group); 53 DCHECK(worker_it != workers_map_.end()); 54 scoped_refptr<ModelSafeWorker> worker = worker_it->second; 55 56 DirectoryCommitContributor* committer = 57 new DirectoryCommitContributor(directory_, type); 58 DirectoryUpdateHandler* updater = 59 new DirectoryUpdateHandler(directory_, type, worker); 60 61 // These containers take ownership of their contents. 62 directory_commit_contributors_.push_back(committer); 63 directory_update_handlers_.push_back(updater); 64 65 bool inserted1 = 66 update_handler_map_.insert(std::make_pair(type, updater)).second; 67 DCHECK(inserted1) << "Attempt to override existing type handler in map"; 68 69 bool inserted2 = 70 commit_contributor_map_.insert(std::make_pair(type, committer)).second; 71 DCHECK(inserted2) << "Attempt to override existing type handler in map"; 72 } 73 74 enabled_directory_types_ = GetRoutingInfoTypes(routing_info); 75 DCHECK(Intersection(GetEnabledDirectoryTypes(), 76 GetEnabledNonBlockingTypes()).Empty()); 77} 78 79void ModelTypeRegistry::InitializeNonBlockingType( 80 ModelType type, 81 scoped_refptr<base::SequencedTaskRunner> type_task_runner, 82 base::WeakPtr<NonBlockingTypeProcessor> processor) { 83 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); 84 85 // Initialize CoreProcessor -> Processor communication channel. 86 scoped_ptr<NonBlockingTypeProcessorCore> core( 87 new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); 88 89 // Initialize Processor -> CoreProcessor communication channel. 90 type_task_runner->PostTask( 91 FROM_HERE, 92 base::Bind(&NonBlockingTypeProcessor::OnConnect, 93 processor->AsWeakPtr(), 94 core->AsWeakPtr(), 95 scoped_refptr<base::SequencedTaskRunner>( 96 base::MessageLoopProxy::current()))); 97 98 DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); 99 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); 100 101 update_handler_map_.insert(std::make_pair(type, core.get())); 102 commit_contributor_map_.insert(std::make_pair(type, core.get())); 103 104 // The container takes ownership. 105 non_blocking_type_processor_cores_.push_back(core.release()); 106 107 DCHECK(Intersection(GetEnabledDirectoryTypes(), 108 GetEnabledNonBlockingTypes()).Empty()); 109} 110 111void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) { 112 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); 113 DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); 114 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); 115 116 size_t updaters_erased = update_handler_map_.erase(type); 117 size_t committers_erased = commit_contributor_map_.erase(type); 118 119 DCHECK_EQ(1U, updaters_erased); 120 DCHECK_EQ(1U, committers_erased); 121 122 // Remove from the ScopedVector, deleting the core in the process. 123 for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it = 124 non_blocking_type_processor_cores_.begin(); 125 it != non_blocking_type_processor_cores_.end(); ++it) { 126 if ((*it)->GetModelType() == type) { 127 non_blocking_type_processor_cores_.erase(it); 128 break; 129 } 130 } 131} 132 133ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { 134 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); 135} 136 137UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { 138 return &update_handler_map_; 139} 140 141CommitContributorMap* ModelTypeRegistry::commit_contributor_map() { 142 return &commit_contributor_map_; 143} 144 145ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { 146 return enabled_directory_types_; 147} 148 149ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { 150 ModelTypeSet enabled_off_thread_types; 151 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it = 152 non_blocking_type_processor_cores_.begin(); 153 it != non_blocking_type_processor_cores_.end(); ++it) { 154 enabled_off_thread_types.Put((*it)->GetModelType()); 155 } 156 return enabled_off_thread_types; 157} 158 159} // namespace syncer 160