1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/engine/model_type_sync_worker_impl.h" 6 7#include "base/bind.h" 8#include "base/format_macros.h" 9#include "base/logging.h" 10#include "base/strings/stringprintf.h" 11#include "sync/engine/commit_contribution.h" 12#include "sync/engine/entity_tracker.h" 13#include "sync/engine/model_type_sync_proxy.h" 14#include "sync/engine/non_blocking_type_commit_contribution.h" 15#include "sync/syncable/syncable_util.h" 16#include "sync/util/cryptographer.h" 17#include "sync/util/time.h" 18 19namespace syncer { 20 21ModelTypeSyncWorkerImpl::ModelTypeSyncWorkerImpl( 22 ModelType type, 23 const DataTypeState& initial_state, 24 const UpdateResponseDataList& saved_pending_updates, 25 scoped_ptr<Cryptographer> cryptographer, 26 NudgeHandler* nudge_handler, 27 scoped_ptr<ModelTypeSyncProxy> type_sync_proxy) 28 : type_(type), 29 data_type_state_(initial_state), 30 type_sync_proxy_(type_sync_proxy.Pass()), 31 cryptographer_(cryptographer.Pass()), 32 nudge_handler_(nudge_handler), 33 entities_deleter_(&entities_), 34 weak_ptr_factory_(this) { 35 // Request an initial sync if it hasn't been completed yet. 36 if (!data_type_state_.initial_sync_done) { 37 nudge_handler_->NudgeForInitialDownload(type_); 38 } 39 40 for (UpdateResponseDataList::const_iterator it = 41 saved_pending_updates.begin(); 42 it != saved_pending_updates.end(); 43 ++it) { 44 EntityTracker* entity_tracker = EntityTracker::FromServerUpdate( 45 it->id, it->client_tag_hash, it->response_version); 46 entity_tracker->ReceivePendingUpdate(*it); 47 entities_.insert(std::make_pair(it->client_tag_hash, entity_tracker)); 48 } 49 50 if (cryptographer_) { 51 DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key " 52 << cryptographer_->GetDefaultNigoriKeyName(); 53 OnCryptographerUpdated(); 54 } 55} 56 57ModelTypeSyncWorkerImpl::~ModelTypeSyncWorkerImpl() { 58} 59 60ModelType ModelTypeSyncWorkerImpl::GetModelType() const { 61 DCHECK(CalledOnValidThread()); 62 return type_; 63} 64 65bool ModelTypeSyncWorkerImpl::IsEncryptionRequired() const { 66 return !!cryptographer_; 67} 68 69void ModelTypeSyncWorkerImpl::UpdateCryptographer( 70 scoped_ptr<Cryptographer> cryptographer) { 71 DCHECK(cryptographer); 72 cryptographer_ = cryptographer.Pass(); 73 74 // Update our state and that of the proxy. 75 OnCryptographerUpdated(); 76 77 // Nudge the scheduler if we're now allowed to commit. 78 if (CanCommitItems()) 79 nudge_handler_->NudgeForCommit(type_); 80} 81 82// UpdateHandler implementation. 83void ModelTypeSyncWorkerImpl::GetDownloadProgress( 84 sync_pb::DataTypeProgressMarker* progress_marker) const { 85 DCHECK(CalledOnValidThread()); 86 progress_marker->CopyFrom(data_type_state_.progress_marker); 87} 88 89void ModelTypeSyncWorkerImpl::GetDataTypeContext( 90 sync_pb::DataTypeContext* context) const { 91 DCHECK(CalledOnValidThread()); 92 context->CopyFrom(data_type_state_.type_context); 93} 94 95SyncerError ModelTypeSyncWorkerImpl::ProcessGetUpdatesResponse( 96 const sync_pb::DataTypeProgressMarker& progress_marker, 97 const sync_pb::DataTypeContext& mutated_context, 98 const SyncEntityList& applicable_updates, 99 sessions::StatusController* status) { 100 DCHECK(CalledOnValidThread()); 101 102 // TODO(rlarocque): Handle data type context conflicts. 103 data_type_state_.type_context = mutated_context; 104 data_type_state_.progress_marker = progress_marker; 105 106 UpdateResponseDataList response_datas; 107 UpdateResponseDataList pending_updates; 108 109 for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); 110 update_it != applicable_updates.end(); 111 ++update_it) { 112 const sync_pb::SyncEntity* update_entity = *update_it; 113 if (!update_entity->server_defined_unique_tag().empty()) { 114 // We can't commit an item unless we know its parent ID. This is where 115 // we learn that ID and remember it forever. 116 DCHECK_EQ(ModelTypeToRootTag(type_), 117 update_entity->server_defined_unique_tag()); 118 if (!data_type_state_.type_root_id.empty()) { 119 DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); 120 } 121 data_type_state_.type_root_id = update_entity->id_string(); 122 } else { 123 // Normal updates are handled here. 124 const std::string& client_tag_hash = 125 update_entity->client_defined_unique_tag(); 126 DCHECK(!client_tag_hash.empty()); 127 128 EntityTracker* entity_tracker = NULL; 129 EntityMap::const_iterator map_it = entities_.find(client_tag_hash); 130 if (map_it == entities_.end()) { 131 entity_tracker = 132 EntityTracker::FromServerUpdate(update_entity->id_string(), 133 client_tag_hash, 134 update_entity->version()); 135 entities_.insert(std::make_pair(client_tag_hash, entity_tracker)); 136 } else { 137 entity_tracker = map_it->second; 138 } 139 140 // Prepare the message for the model thread. 141 UpdateResponseData response_data; 142 response_data.id = update_entity->id_string(); 143 response_data.client_tag_hash = client_tag_hash; 144 response_data.response_version = update_entity->version(); 145 response_data.ctime = ProtoTimeToTime(update_entity->ctime()); 146 response_data.mtime = ProtoTimeToTime(update_entity->mtime()); 147 response_data.non_unique_name = update_entity->name(); 148 response_data.deleted = update_entity->deleted(); 149 150 const sync_pb::EntitySpecifics& specifics = update_entity->specifics(); 151 152 if (!specifics.has_encrypted()) { 153 // No encryption. 154 entity_tracker->ReceiveUpdate(update_entity->version()); 155 response_data.specifics = specifics; 156 response_datas.push_back(response_data); 157 } else if (specifics.has_encrypted() && cryptographer_ && 158 cryptographer_->CanDecrypt(specifics.encrypted())) { 159 // Encrypted, but we know the key. 160 if (DecryptSpecifics( 161 cryptographer_.get(), specifics, &response_data.specifics)) { 162 entity_tracker->ReceiveUpdate(update_entity->version()); 163 response_data.encryption_key_name = specifics.encrypted().key_name(); 164 response_datas.push_back(response_data); 165 } 166 } else if (specifics.has_encrypted() && 167 (!cryptographer_ || 168 !cryptographer_->CanDecrypt(specifics.encrypted()))) { 169 // Can't decrypt right now. Ask the entity tracker to handle it. 170 response_data.specifics = specifics; 171 if (entity_tracker->ReceivePendingUpdate(response_data)) { 172 // Send to the model thread for safe-keeping across restarts if the 173 // tracker decides the update is worth keeping. 174 pending_updates.push_back(response_data); 175 } 176 } 177 } 178 } 179 180 DVLOG(1) << ModelTypeToString(type_) << ": " 181 << base::StringPrintf( 182 "Delivering %zd applicable and %zd pending updates.", 183 response_datas.size(), 184 pending_updates.size()); 185 186 // Forward these updates to the model thread so it can do the rest. 187 type_sync_proxy_->OnUpdateReceived( 188 data_type_state_, response_datas, pending_updates); 189 190 return SYNCER_OK; 191} 192 193void ModelTypeSyncWorkerImpl::ApplyUpdates(sessions::StatusController* status) { 194 DCHECK(CalledOnValidThread()); 195 // This function is called only when we've finished a download cycle, ie. we 196 // got a response with changes_remaining == 0. If this is our first download 197 // cycle, we should update our state so the ModelTypeSyncProxy knows that 198 // it's safe to commit items now. 199 if (!data_type_state_.initial_sync_done) { 200 DVLOG(1) << "Delivering 'initial sync done' ping."; 201 202 data_type_state_.initial_sync_done = true; 203 204 type_sync_proxy_->OnUpdateReceived( 205 data_type_state_, UpdateResponseDataList(), UpdateResponseDataList()); 206 } 207} 208 209void ModelTypeSyncWorkerImpl::PassiveApplyUpdates( 210 sessions::StatusController* status) { 211 NOTREACHED() 212 << "Non-blocking types should never apply updates on sync thread. " 213 << "ModelType is: " << ModelTypeToString(type_); 214} 215 216void ModelTypeSyncWorkerImpl::EnqueueForCommit( 217 const CommitRequestDataList& list) { 218 DCHECK(CalledOnValidThread()); 219 220 DCHECK(IsTypeInitialized()) 221 << "Asked to commit items before type was initialized. " 222 << "ModelType is: " << ModelTypeToString(type_); 223 224 for (CommitRequestDataList::const_iterator it = list.begin(); 225 it != list.end(); 226 ++it) { 227 StorePendingCommit(*it); 228 } 229 230 if (CanCommitItems()) 231 nudge_handler_->NudgeForCommit(type_); 232} 233 234// CommitContributor implementation. 235scoped_ptr<CommitContribution> ModelTypeSyncWorkerImpl::GetContribution( 236 size_t max_entries) { 237 DCHECK(CalledOnValidThread()); 238 239 size_t space_remaining = max_entries; 240 std::vector<int64> sequence_numbers; 241 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; 242 243 if (!CanCommitItems()) 244 return scoped_ptr<CommitContribution>(); 245 246 // TODO(rlarocque): Avoid iterating here. 247 for (EntityMap::const_iterator it = entities_.begin(); 248 it != entities_.end() && space_remaining > 0; 249 ++it) { 250 EntityTracker* entity = it->second; 251 if (entity->IsCommitPending()) { 252 sync_pb::SyncEntity* commit_entity = commit_entities.Add(); 253 int64 sequence_number = -1; 254 255 entity->PrepareCommitProto(commit_entity, &sequence_number); 256 HelpInitializeCommitEntity(commit_entity); 257 sequence_numbers.push_back(sequence_number); 258 259 space_remaining--; 260 } 261 } 262 263 if (commit_entities.size() == 0) 264 return scoped_ptr<CommitContribution>(); 265 266 return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( 267 data_type_state_.type_context, commit_entities, sequence_numbers, this)); 268} 269 270void ModelTypeSyncWorkerImpl::StorePendingCommit( 271 const CommitRequestData& request) { 272 if (!request.deleted) { 273 DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); 274 } 275 276 EntityMap::iterator map_it = entities_.find(request.client_tag_hash); 277 if (map_it == entities_.end()) { 278 EntityTracker* entity = 279 EntityTracker::FromCommitRequest(request.id, 280 request.client_tag_hash, 281 request.sequence_number, 282 request.base_version, 283 request.ctime, 284 request.mtime, 285 request.non_unique_name, 286 request.deleted, 287 request.specifics); 288 entities_.insert(std::make_pair(request.client_tag_hash, entity)); 289 } else { 290 EntityTracker* entity = map_it->second; 291 entity->RequestCommit(request.id, 292 request.client_tag_hash, 293 request.sequence_number, 294 request.base_version, 295 request.ctime, 296 request.mtime, 297 request.non_unique_name, 298 request.deleted, 299 request.specifics); 300 } 301} 302 303void ModelTypeSyncWorkerImpl::OnCommitResponse( 304 const CommitResponseDataList& response_list) { 305 for (CommitResponseDataList::const_iterator response_it = 306 response_list.begin(); 307 response_it != response_list.end(); 308 ++response_it) { 309 const std::string client_tag_hash = response_it->client_tag_hash; 310 EntityMap::iterator map_it = entities_.find(client_tag_hash); 311 312 // There's no way we could have committed an entry we know nothing about. 313 if (map_it == entities_.end()) { 314 NOTREACHED() << "Received commit response for item unknown to us." 315 << " Model type: " << ModelTypeToString(type_) 316 << " ID: " << response_it->id; 317 continue; 318 } 319 320 EntityTracker* entity = map_it->second; 321 entity->ReceiveCommitResponse(response_it->id, 322 response_it->response_version, 323 response_it->sequence_number); 324 } 325 326 // Send the responses back to the model thread. It needs to know which 327 // items have been successfully committed so it can save that information in 328 // permanent storage. 329 type_sync_proxy_->OnCommitCompleted(data_type_state_, response_list); 330} 331 332base::WeakPtr<ModelTypeSyncWorkerImpl> ModelTypeSyncWorkerImpl::AsWeakPtr() { 333 return weak_ptr_factory_.GetWeakPtr(); 334} 335 336bool ModelTypeSyncWorkerImpl::IsTypeInitialized() const { 337 return !data_type_state_.type_root_id.empty() && 338 data_type_state_.initial_sync_done; 339} 340 341bool ModelTypeSyncWorkerImpl::CanCommitItems() const { 342 // We can't commit anything until we know the type's parent node. 343 // We'll get it in the first update response. 344 if (!IsTypeInitialized()) 345 return false; 346 347 // Don't commit if we should be encrypting but don't have the required keys. 348 if (IsEncryptionRequired() && 349 (!cryptographer_ || !cryptographer_->is_ready())) { 350 return false; 351 } 352 353 return true; 354} 355 356void ModelTypeSyncWorkerImpl::HelpInitializeCommitEntity( 357 sync_pb::SyncEntity* sync_entity) { 358 DCHECK(CanCommitItems()); 359 360 // Initial commits need our help to generate a client ID. 361 if (!sync_entity->has_id_string()) { 362 DCHECK_EQ(kUncommittedVersion, sync_entity->version()); 363 const int64 id = data_type_state_.next_client_id++; 364 sync_entity->set_id_string( 365 base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); 366 } 367 368 // Encrypt the specifics and hide the title if necessary. 369 if (IsEncryptionRequired()) { 370 // IsEncryptionRequired() && CanCommitItems() implies 371 // that the cryptographer is valid and ready to encrypt. 372 sync_pb::EntitySpecifics encrypted_specifics; 373 bool result = cryptographer_->Encrypt( 374 sync_entity->specifics(), encrypted_specifics.mutable_encrypted()); 375 DCHECK(result); 376 sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics); 377 sync_entity->set_name("encrypted"); 378 } 379 380 // Always include enough specifics to identify the type. Do this even in 381 // deletion requests, where the specifics are otherwise invalid. 382 AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); 383 384 // We're always responsible for the parent ID. 385 sync_entity->set_parent_id_string(data_type_state_.type_root_id); 386} 387 388void ModelTypeSyncWorkerImpl::OnCryptographerUpdated() { 389 DCHECK(cryptographer_); 390 391 bool new_encryption_key = false; 392 UpdateResponseDataList response_datas; 393 394 const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName(); 395 396 // Handle a change in encryption key. 397 if (data_type_state_.encryption_key_name != new_key_name) { 398 DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key " 399 << data_type_state_.encryption_key_name << " -> " << new_key_name; 400 data_type_state_.encryption_key_name = new_key_name; 401 new_encryption_key = true; 402 } 403 404 for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end(); 405 ++it) { 406 if (it->second->HasPendingUpdate()) { 407 const UpdateResponseData& saved_pending = it->second->GetPendingUpdate(); 408 409 // We assume all pending updates are encrypted items for which we 410 // don't have the key. 411 DCHECK(saved_pending.specifics.has_encrypted()); 412 413 if (cryptographer_->CanDecrypt(saved_pending.specifics.encrypted())) { 414 UpdateResponseData decrypted_response = saved_pending; 415 if (DecryptSpecifics(cryptographer_.get(), 416 saved_pending.specifics, 417 &decrypted_response.specifics)) { 418 decrypted_response.encryption_key_name = 419 saved_pending.specifics.encrypted().key_name(); 420 response_datas.push_back(decrypted_response); 421 422 it->second->ClearPendingUpdate(); 423 } 424 } 425 } 426 } 427 428 if (new_encryption_key || response_datas.size() > 0) { 429 DVLOG(1) << ModelTypeToString(type_) << ": " 430 << base::StringPrintf( 431 "Delivering encryption key and %zd decrypted updates.", 432 response_datas.size()); 433 type_sync_proxy_->OnUpdateReceived( 434 data_type_state_, response_datas, UpdateResponseDataList()); 435 } 436} 437 438bool ModelTypeSyncWorkerImpl::DecryptSpecifics( 439 Cryptographer* cryptographer, 440 const sync_pb::EntitySpecifics& in, 441 sync_pb::EntitySpecifics* out) { 442 DCHECK(in.has_encrypted()); 443 DCHECK(cryptographer->CanDecrypt(in.encrypted())); 444 445 std::string plaintext; 446 plaintext = cryptographer->DecryptToString(in.encrypted()); 447 if (plaintext.empty()) { 448 LOG(ERROR) << "Failed to decrypt a decryptable entity"; 449 return false; 450 } 451 if (!out->ParseFromString(plaintext)) { 452 LOG(ERROR) << "Failed to parse decrypted entity"; 453 return false; 454 } 455 return true; 456} 457 458} // namespace syncer 459