1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/engine/non_blocking_type_processor_core.h" 6 7#include "base/bind.h" 8#include "base/format_macros.h" 9#include "base/logging.h" 10#include "base/strings/stringprintf.h" 11#include "sync/engine/commit_contribution.h" 12#include "sync/engine/non_blocking_type_commit_contribution.h" 13#include "sync/engine/non_blocking_type_processor_interface.h" 14#include "sync/engine/sync_thread_sync_entity.h" 15#include "sync/syncable/syncable_util.h" 16#include "sync/util/time.h" 17 18namespace syncer { 19 20NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( 21 ModelType type, 22 const DataTypeState& initial_state, 23 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) 24 : type_(type), 25 data_type_state_(initial_state), 26 processor_interface_(processor_interface.Pass()), 27 entities_deleter_(&entities_), 28 weak_ptr_factory_(this) { 29} 30 31NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { 32} 33 34ModelType NonBlockingTypeProcessorCore::GetModelType() const { 35 DCHECK(CalledOnValidThread()); 36 return type_; 37} 38 39// UpdateHandler implementation. 40void NonBlockingTypeProcessorCore::GetDownloadProgress( 41 sync_pb::DataTypeProgressMarker* progress_marker) const { 42 DCHECK(CalledOnValidThread()); 43 progress_marker->CopyFrom(data_type_state_.progress_marker); 44} 45 46void NonBlockingTypeProcessorCore::GetDataTypeContext( 47 sync_pb::DataTypeContext* context) const { 48 DCHECK(CalledOnValidThread()); 49 context->CopyFrom(data_type_state_.type_context); 50} 51 52SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( 53 const sync_pb::DataTypeProgressMarker& progress_marker, 54 const sync_pb::DataTypeContext& mutated_context, 55 const SyncEntityList& applicable_updates, 56 sessions::StatusController* status) { 57 DCHECK(CalledOnValidThread()); 58 59 // TODO(rlarocque): Handle data type context conflicts. 60 data_type_state_.type_context = mutated_context; 61 data_type_state_.progress_marker = progress_marker; 62 63 UpdateResponseDataList response_datas; 64 65 for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); 66 update_it != applicable_updates.end(); 67 ++update_it) { 68 const sync_pb::SyncEntity* update_entity = *update_it; 69 if (!update_entity->server_defined_unique_tag().empty()) { 70 // We can't commit an item unless we know its parent ID. This is where 71 // we learn that ID and remember it forever. 72 DCHECK_EQ(ModelTypeToRootTag(type_), 73 update_entity->server_defined_unique_tag()); 74 if (!data_type_state_.type_root_id.empty()) { 75 DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); 76 } 77 data_type_state_.type_root_id = update_entity->id_string(); 78 } else { 79 // Normal updates are handled here. 80 const std::string& client_tag_hash = 81 update_entity->client_defined_unique_tag(); 82 DCHECK(!client_tag_hash.empty()); 83 EntityMap::const_iterator map_it = entities_.find(client_tag_hash); 84 if (map_it == entities_.end()) { 85 SyncThreadSyncEntity* entity = 86 SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), 87 client_tag_hash, 88 update_entity->version()); 89 entities_.insert(std::make_pair(client_tag_hash, entity)); 90 } else { 91 SyncThreadSyncEntity* entity = map_it->second; 92 entity->ReceiveUpdate(update_entity->version()); 93 } 94 95 // Prepare the message for the model thread. 96 UpdateResponseData response_data; 97 response_data.id = update_entity->id_string(); 98 response_data.client_tag_hash = client_tag_hash; 99 response_data.response_version = update_entity->version(); 100 response_data.ctime = ProtoTimeToTime(update_entity->ctime()); 101 response_data.mtime = ProtoTimeToTime(update_entity->mtime()); 102 response_data.non_unique_name = update_entity->name(); 103 response_data.deleted = update_entity->deleted(); 104 response_data.specifics = update_entity->specifics(); 105 106 response_datas.push_back(response_data); 107 } 108 } 109 110 // Forward these updates to the model thread so it can do the rest. 111 processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); 112 113 return SYNCER_OK; 114} 115 116void NonBlockingTypeProcessorCore::ApplyUpdates( 117 sessions::StatusController* status) { 118 DCHECK(CalledOnValidThread()); 119 // This function is called only when we've finished a download cycle, ie. we 120 // got a response with changes_remaining == 0. If this is our first download 121 // cycle, we should update our state so the NonBlockingTypeProcessor knows 122 // that it's safe to commit items now. 123 if (!data_type_state_.initial_sync_done) { 124 data_type_state_.initial_sync_done = true; 125 126 UpdateResponseDataList empty_update_list; 127 processor_interface_->ReceiveUpdateResponse(data_type_state_, 128 empty_update_list); 129 } 130} 131 132void NonBlockingTypeProcessorCore::PassiveApplyUpdates( 133 sessions::StatusController* status) { 134 NOTREACHED() 135 << "Non-blocking types should never apply updates on sync thread. " 136 << "ModelType is: " << ModelTypeToString(type_); 137} 138 139void NonBlockingTypeProcessorCore::EnqueueForCommit( 140 const CommitRequestDataList& list) { 141 DCHECK(CalledOnValidThread()); 142 143 DCHECK(CanCommitItems()) 144 << "Asked to commit items before type was initialized. " 145 << "ModelType is: " << ModelTypeToString(type_); 146 147 for (CommitRequestDataList::const_iterator it = list.begin(); 148 it != list.end(); 149 ++it) { 150 StorePendingCommit(*it); 151 } 152} 153 154// CommitContributor implementation. 155scoped_ptr<CommitContribution> 156NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { 157 DCHECK(CalledOnValidThread()); 158 159 size_t space_remaining = max_entries; 160 std::vector<int64> sequence_numbers; 161 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; 162 163 if (!CanCommitItems()) 164 return scoped_ptr<CommitContribution>(); 165 166 // TODO(rlarocque): Avoid iterating here. 167 for (EntityMap::const_iterator it = entities_.begin(); 168 it != entities_.end() && space_remaining > 0; 169 ++it) { 170 SyncThreadSyncEntity* entity = it->second; 171 if (entity->IsCommitPending()) { 172 sync_pb::SyncEntity* commit_entity = commit_entities.Add(); 173 int64 sequence_number = -1; 174 175 entity->PrepareCommitProto(commit_entity, &sequence_number); 176 HelpInitializeCommitEntity(commit_entity); 177 sequence_numbers.push_back(sequence_number); 178 179 space_remaining--; 180 } 181 } 182 183 if (commit_entities.size() == 0) 184 return scoped_ptr<CommitContribution>(); 185 186 return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( 187 data_type_state_.type_context, commit_entities, sequence_numbers, this)); 188} 189 190void NonBlockingTypeProcessorCore::StorePendingCommit( 191 const CommitRequestData& request) { 192 if (!request.deleted) { 193 DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); 194 } 195 196 EntityMap::iterator map_it = entities_.find(request.client_tag_hash); 197 if (map_it == entities_.end()) { 198 SyncThreadSyncEntity* entity = 199 SyncThreadSyncEntity::FromCommitRequest(request.id, 200 request.client_tag_hash, 201 request.sequence_number, 202 request.base_version, 203 request.ctime, 204 request.mtime, 205 request.non_unique_name, 206 request.deleted, 207 request.specifics); 208 entities_.insert(std::make_pair(request.client_tag_hash, entity)); 209 } else { 210 SyncThreadSyncEntity* entity = map_it->second; 211 entity->RequestCommit(request.id, 212 request.client_tag_hash, 213 request.sequence_number, 214 request.base_version, 215 request.ctime, 216 request.mtime, 217 request.non_unique_name, 218 request.deleted, 219 request.specifics); 220 } 221 222 // TODO: Nudge SyncScheduler. 223} 224 225void NonBlockingTypeProcessorCore::OnCommitResponse( 226 const CommitResponseDataList& response_list) { 227 for (CommitResponseDataList::const_iterator response_it = 228 response_list.begin(); 229 response_it != response_list.end(); 230 ++response_it) { 231 const std::string client_tag_hash = response_it->client_tag_hash; 232 EntityMap::iterator map_it = entities_.find(client_tag_hash); 233 234 // There's no way we could have committed an entry we know nothing about. 235 if (map_it == entities_.end()) { 236 NOTREACHED() << "Received commit response for item unknown to us." 237 << " Model type: " << ModelTypeToString(type_) 238 << " ID: " << response_it->id; 239 continue; 240 } 241 242 SyncThreadSyncEntity* entity = map_it->second; 243 entity->ReceiveCommitResponse(response_it->id, 244 response_it->response_version, 245 response_it->sequence_number); 246 } 247 248 // Send the responses back to the model thread. It needs to know which 249 // items have been successfully committed so it can save that information in 250 // permanent storage. 251 processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); 252} 253 254base::WeakPtr<NonBlockingTypeProcessorCore> 255NonBlockingTypeProcessorCore::AsWeakPtr() { 256 return weak_ptr_factory_.GetWeakPtr(); 257} 258 259bool NonBlockingTypeProcessorCore::CanCommitItems() const { 260 // We can't commit anything until we know the type's parent node. 261 // We'll get it in the first update response. 262 return !data_type_state_.type_root_id.empty() && 263 data_type_state_.initial_sync_done; 264} 265 266void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( 267 sync_pb::SyncEntity* sync_entity) { 268 // Initial commits need our help to generate a client ID. 269 if (!sync_entity->has_id_string()) { 270 DCHECK_EQ(kUncommittedVersion, sync_entity->version()); 271 const int64 id = data_type_state_.next_client_id++; 272 sync_entity->set_id_string( 273 base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); 274 } 275 276 // Always include enough specifics to identify the type. Do this even in 277 // deletion requests, where the specifics are otherwise invalid. 278 if (!sync_entity->has_specifics()) { 279 AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); 280 } 281 282 // We're always responsible for the parent ID. 283 sync_entity->set_parent_id_string(data_type_state_.type_root_id); 284} 285 286} // namespace syncer 287