1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/engine/model_type_sync_worker_impl.h"
6
7#include "base/bind.h"
8#include "base/format_macros.h"
9#include "base/logging.h"
10#include "base/strings/stringprintf.h"
11#include "sync/engine/commit_contribution.h"
12#include "sync/engine/entity_tracker.h"
13#include "sync/engine/model_type_sync_proxy.h"
14#include "sync/engine/non_blocking_type_commit_contribution.h"
15#include "sync/syncable/syncable_util.h"
16#include "sync/util/cryptographer.h"
17#include "sync/util/time.h"
18
19namespace syncer {
20
21ModelTypeSyncWorkerImpl::ModelTypeSyncWorkerImpl(
22    ModelType type,
23    const DataTypeState& initial_state,
24    const UpdateResponseDataList& saved_pending_updates,
25    scoped_ptr<Cryptographer> cryptographer,
26    NudgeHandler* nudge_handler,
27    scoped_ptr<ModelTypeSyncProxy> type_sync_proxy)
28    : type_(type),
29      data_type_state_(initial_state),
30      type_sync_proxy_(type_sync_proxy.Pass()),
31      cryptographer_(cryptographer.Pass()),
32      nudge_handler_(nudge_handler),
33      entities_deleter_(&entities_),
34      weak_ptr_factory_(this) {
35  // Request an initial sync if it hasn't been completed yet.
36  if (!data_type_state_.initial_sync_done) {
37    nudge_handler_->NudgeForInitialDownload(type_);
38  }
39
40  for (UpdateResponseDataList::const_iterator it =
41           saved_pending_updates.begin();
42       it != saved_pending_updates.end();
43       ++it) {
44    EntityTracker* entity_tracker = EntityTracker::FromServerUpdate(
45        it->id, it->client_tag_hash, it->response_version);
46    entity_tracker->ReceivePendingUpdate(*it);
47    entities_.insert(std::make_pair(it->client_tag_hash, entity_tracker));
48  }
49
50  if (cryptographer_) {
51    DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key "
52             << cryptographer_->GetDefaultNigoriKeyName();
53    OnCryptographerUpdated();
54  }
55}
56
57ModelTypeSyncWorkerImpl::~ModelTypeSyncWorkerImpl() {
58}
59
60ModelType ModelTypeSyncWorkerImpl::GetModelType() const {
61  DCHECK(CalledOnValidThread());
62  return type_;
63}
64
65bool ModelTypeSyncWorkerImpl::IsEncryptionRequired() const {
66  return !!cryptographer_;
67}
68
69void ModelTypeSyncWorkerImpl::UpdateCryptographer(
70    scoped_ptr<Cryptographer> cryptographer) {
71  DCHECK(cryptographer);
72  cryptographer_ = cryptographer.Pass();
73
74  // Update our state and that of the proxy.
75  OnCryptographerUpdated();
76
77  // Nudge the scheduler if we're now allowed to commit.
78  if (CanCommitItems())
79    nudge_handler_->NudgeForCommit(type_);
80}
81
82// UpdateHandler implementation.
83void ModelTypeSyncWorkerImpl::GetDownloadProgress(
84    sync_pb::DataTypeProgressMarker* progress_marker) const {
85  DCHECK(CalledOnValidThread());
86  progress_marker->CopyFrom(data_type_state_.progress_marker);
87}
88
89void ModelTypeSyncWorkerImpl::GetDataTypeContext(
90    sync_pb::DataTypeContext* context) const {
91  DCHECK(CalledOnValidThread());
92  context->CopyFrom(data_type_state_.type_context);
93}
94
95SyncerError ModelTypeSyncWorkerImpl::ProcessGetUpdatesResponse(
96    const sync_pb::DataTypeProgressMarker& progress_marker,
97    const sync_pb::DataTypeContext& mutated_context,
98    const SyncEntityList& applicable_updates,
99    sessions::StatusController* status) {
100  DCHECK(CalledOnValidThread());
101
102  // TODO(rlarocque): Handle data type context conflicts.
103  data_type_state_.type_context = mutated_context;
104  data_type_state_.progress_marker = progress_marker;
105
106  UpdateResponseDataList response_datas;
107  UpdateResponseDataList pending_updates;
108
109  for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
110       update_it != applicable_updates.end();
111       ++update_it) {
112    const sync_pb::SyncEntity* update_entity = *update_it;
113    if (!update_entity->server_defined_unique_tag().empty()) {
114      // We can't commit an item unless we know its parent ID.  This is where
115      // we learn that ID and remember it forever.
116      DCHECK_EQ(ModelTypeToRootTag(type_),
117                update_entity->server_defined_unique_tag());
118      if (!data_type_state_.type_root_id.empty()) {
119        DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
120      }
121      data_type_state_.type_root_id = update_entity->id_string();
122    } else {
123      // Normal updates are handled here.
124      const std::string& client_tag_hash =
125          update_entity->client_defined_unique_tag();
126      DCHECK(!client_tag_hash.empty());
127
128      EntityTracker* entity_tracker = NULL;
129      EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
130      if (map_it == entities_.end()) {
131        entity_tracker =
132            EntityTracker::FromServerUpdate(update_entity->id_string(),
133                                            client_tag_hash,
134                                            update_entity->version());
135        entities_.insert(std::make_pair(client_tag_hash, entity_tracker));
136      } else {
137        entity_tracker = map_it->second;
138      }
139
140      // Prepare the message for the model thread.
141      UpdateResponseData response_data;
142      response_data.id = update_entity->id_string();
143      response_data.client_tag_hash = client_tag_hash;
144      response_data.response_version = update_entity->version();
145      response_data.ctime = ProtoTimeToTime(update_entity->ctime());
146      response_data.mtime = ProtoTimeToTime(update_entity->mtime());
147      response_data.non_unique_name = update_entity->name();
148      response_data.deleted = update_entity->deleted();
149
150      const sync_pb::EntitySpecifics& specifics = update_entity->specifics();
151
152      if (!specifics.has_encrypted()) {
153        // No encryption.
154        entity_tracker->ReceiveUpdate(update_entity->version());
155        response_data.specifics = specifics;
156        response_datas.push_back(response_data);
157      } else if (specifics.has_encrypted() && cryptographer_ &&
158                 cryptographer_->CanDecrypt(specifics.encrypted())) {
159        // Encrypted, but we know the key.
160        if (DecryptSpecifics(
161                cryptographer_.get(), specifics, &response_data.specifics)) {
162          entity_tracker->ReceiveUpdate(update_entity->version());
163          response_data.encryption_key_name = specifics.encrypted().key_name();
164          response_datas.push_back(response_data);
165        }
166      } else if (specifics.has_encrypted() &&
167                 (!cryptographer_ ||
168                  !cryptographer_->CanDecrypt(specifics.encrypted()))) {
169        // Can't decrypt right now.  Ask the entity tracker to handle it.
170        response_data.specifics = specifics;
171        if (entity_tracker->ReceivePendingUpdate(response_data)) {
172          // Send to the model thread for safe-keeping across restarts if the
173          // tracker decides the update is worth keeping.
174          pending_updates.push_back(response_data);
175        }
176      }
177    }
178  }
179
180  DVLOG(1) << ModelTypeToString(type_) << ": "
181           << base::StringPrintf(
182                  "Delivering %zd applicable and %zd pending updates.",
183                  response_datas.size(),
184                  pending_updates.size());
185
186  // Forward these updates to the model thread so it can do the rest.
187  type_sync_proxy_->OnUpdateReceived(
188      data_type_state_, response_datas, pending_updates);
189
190  return SYNCER_OK;
191}
192
193void ModelTypeSyncWorkerImpl::ApplyUpdates(sessions::StatusController* status) {
194  DCHECK(CalledOnValidThread());
195  // This function is called only when we've finished a download cycle, ie. we
196  // got a response with changes_remaining == 0.  If this is our first download
197  // cycle, we should update our state so the ModelTypeSyncProxy knows that
198  // it's safe to commit items now.
199  if (!data_type_state_.initial_sync_done) {
200    DVLOG(1) << "Delivering 'initial sync done' ping.";
201
202    data_type_state_.initial_sync_done = true;
203
204    type_sync_proxy_->OnUpdateReceived(
205        data_type_state_, UpdateResponseDataList(), UpdateResponseDataList());
206  }
207}
208
209void ModelTypeSyncWorkerImpl::PassiveApplyUpdates(
210    sessions::StatusController* status) {
211  NOTREACHED()
212      << "Non-blocking types should never apply updates on sync thread.  "
213      << "ModelType is: " << ModelTypeToString(type_);
214}
215
216void ModelTypeSyncWorkerImpl::EnqueueForCommit(
217    const CommitRequestDataList& list) {
218  DCHECK(CalledOnValidThread());
219
220  DCHECK(IsTypeInitialized())
221      << "Asked to commit items before type was initialized.  "
222      << "ModelType is: " << ModelTypeToString(type_);
223
224  for (CommitRequestDataList::const_iterator it = list.begin();
225       it != list.end();
226       ++it) {
227    StorePendingCommit(*it);
228  }
229
230  if (CanCommitItems())
231    nudge_handler_->NudgeForCommit(type_);
232}
233
234// CommitContributor implementation.
235scoped_ptr<CommitContribution> ModelTypeSyncWorkerImpl::GetContribution(
236    size_t max_entries) {
237  DCHECK(CalledOnValidThread());
238
239  size_t space_remaining = max_entries;
240  std::vector<int64> sequence_numbers;
241  google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
242
243  if (!CanCommitItems())
244    return scoped_ptr<CommitContribution>();
245
246  // TODO(rlarocque): Avoid iterating here.
247  for (EntityMap::const_iterator it = entities_.begin();
248       it != entities_.end() && space_remaining > 0;
249       ++it) {
250    EntityTracker* entity = it->second;
251    if (entity->IsCommitPending()) {
252      sync_pb::SyncEntity* commit_entity = commit_entities.Add();
253      int64 sequence_number = -1;
254
255      entity->PrepareCommitProto(commit_entity, &sequence_number);
256      HelpInitializeCommitEntity(commit_entity);
257      sequence_numbers.push_back(sequence_number);
258
259      space_remaining--;
260    }
261  }
262
263  if (commit_entities.size() == 0)
264    return scoped_ptr<CommitContribution>();
265
266  return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
267      data_type_state_.type_context, commit_entities, sequence_numbers, this));
268}
269
270void ModelTypeSyncWorkerImpl::StorePendingCommit(
271    const CommitRequestData& request) {
272  if (!request.deleted) {
273    DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
274  }
275
276  EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
277  if (map_it == entities_.end()) {
278    EntityTracker* entity =
279        EntityTracker::FromCommitRequest(request.id,
280                                         request.client_tag_hash,
281                                         request.sequence_number,
282                                         request.base_version,
283                                         request.ctime,
284                                         request.mtime,
285                                         request.non_unique_name,
286                                         request.deleted,
287                                         request.specifics);
288    entities_.insert(std::make_pair(request.client_tag_hash, entity));
289  } else {
290    EntityTracker* entity = map_it->second;
291    entity->RequestCommit(request.id,
292                          request.client_tag_hash,
293                          request.sequence_number,
294                          request.base_version,
295                          request.ctime,
296                          request.mtime,
297                          request.non_unique_name,
298                          request.deleted,
299                          request.specifics);
300  }
301}
302
303void ModelTypeSyncWorkerImpl::OnCommitResponse(
304    const CommitResponseDataList& response_list) {
305  for (CommitResponseDataList::const_iterator response_it =
306           response_list.begin();
307       response_it != response_list.end();
308       ++response_it) {
309    const std::string client_tag_hash = response_it->client_tag_hash;
310    EntityMap::iterator map_it = entities_.find(client_tag_hash);
311
312    // There's no way we could have committed an entry we know nothing about.
313    if (map_it == entities_.end()) {
314      NOTREACHED() << "Received commit response for item unknown to us."
315                   << " Model type: " << ModelTypeToString(type_)
316                   << " ID: " << response_it->id;
317      continue;
318    }
319
320    EntityTracker* entity = map_it->second;
321    entity->ReceiveCommitResponse(response_it->id,
322                                  response_it->response_version,
323                                  response_it->sequence_number);
324  }
325
326  // Send the responses back to the model thread.  It needs to know which
327  // items have been successfully committed so it can save that information in
328  // permanent storage.
329  type_sync_proxy_->OnCommitCompleted(data_type_state_, response_list);
330}
331
332base::WeakPtr<ModelTypeSyncWorkerImpl> ModelTypeSyncWorkerImpl::AsWeakPtr() {
333  return weak_ptr_factory_.GetWeakPtr();
334}
335
336bool ModelTypeSyncWorkerImpl::IsTypeInitialized() const {
337  return !data_type_state_.type_root_id.empty() &&
338         data_type_state_.initial_sync_done;
339}
340
341bool ModelTypeSyncWorkerImpl::CanCommitItems() const {
342  // We can't commit anything until we know the type's parent node.
343  // We'll get it in the first update response.
344  if (!IsTypeInitialized())
345    return false;
346
347  // Don't commit if we should be encrypting but don't have the required keys.
348  if (IsEncryptionRequired() &&
349      (!cryptographer_ || !cryptographer_->is_ready())) {
350    return false;
351  }
352
353  return true;
354}
355
356void ModelTypeSyncWorkerImpl::HelpInitializeCommitEntity(
357    sync_pb::SyncEntity* sync_entity) {
358  DCHECK(CanCommitItems());
359
360  // Initial commits need our help to generate a client ID.
361  if (!sync_entity->has_id_string()) {
362    DCHECK_EQ(kUncommittedVersion, sync_entity->version());
363    const int64 id = data_type_state_.next_client_id++;
364    sync_entity->set_id_string(
365        base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
366  }
367
368  // Encrypt the specifics and hide the title if necessary.
369  if (IsEncryptionRequired()) {
370    // IsEncryptionRequired() && CanCommitItems() implies
371    // that the cryptographer is valid and ready to encrypt.
372    sync_pb::EntitySpecifics encrypted_specifics;
373    bool result = cryptographer_->Encrypt(
374        sync_entity->specifics(), encrypted_specifics.mutable_encrypted());
375    DCHECK(result);
376    sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics);
377    sync_entity->set_name("encrypted");
378  }
379
380  // Always include enough specifics to identify the type.  Do this even in
381  // deletion requests, where the specifics are otherwise invalid.
382  AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
383
384  // We're always responsible for the parent ID.
385  sync_entity->set_parent_id_string(data_type_state_.type_root_id);
386}
387
388void ModelTypeSyncWorkerImpl::OnCryptographerUpdated() {
389  DCHECK(cryptographer_);
390
391  bool new_encryption_key = false;
392  UpdateResponseDataList response_datas;
393
394  const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName();
395
396  // Handle a change in encryption key.
397  if (data_type_state_.encryption_key_name != new_key_name) {
398    DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key "
399             << data_type_state_.encryption_key_name << " -> " << new_key_name;
400    data_type_state_.encryption_key_name = new_key_name;
401    new_encryption_key = true;
402  }
403
404  for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end();
405       ++it) {
406    if (it->second->HasPendingUpdate()) {
407      const UpdateResponseData& saved_pending = it->second->GetPendingUpdate();
408
409      // We assume all pending updates are encrypted items for which we
410      // don't have the key.
411      DCHECK(saved_pending.specifics.has_encrypted());
412
413      if (cryptographer_->CanDecrypt(saved_pending.specifics.encrypted())) {
414        UpdateResponseData decrypted_response = saved_pending;
415        if (DecryptSpecifics(cryptographer_.get(),
416                             saved_pending.specifics,
417                             &decrypted_response.specifics)) {
418          decrypted_response.encryption_key_name =
419              saved_pending.specifics.encrypted().key_name();
420          response_datas.push_back(decrypted_response);
421
422          it->second->ClearPendingUpdate();
423        }
424      }
425    }
426  }
427
428  if (new_encryption_key || response_datas.size() > 0) {
429    DVLOG(1) << ModelTypeToString(type_) << ": "
430             << base::StringPrintf(
431                    "Delivering encryption key and %zd decrypted updates.",
432                    response_datas.size());
433    type_sync_proxy_->OnUpdateReceived(
434        data_type_state_, response_datas, UpdateResponseDataList());
435  }
436}
437
438bool ModelTypeSyncWorkerImpl::DecryptSpecifics(
439    Cryptographer* cryptographer,
440    const sync_pb::EntitySpecifics& in,
441    sync_pb::EntitySpecifics* out) {
442  DCHECK(in.has_encrypted());
443  DCHECK(cryptographer->CanDecrypt(in.encrypted()));
444
445  std::string plaintext;
446  plaintext = cryptographer->DecryptToString(in.encrypted());
447  if (plaintext.empty()) {
448    LOG(ERROR) << "Failed to decrypt a decryptable entity";
449    return false;
450  }
451  if (!out->ParseFromString(plaintext)) {
452    LOG(ERROR) << "Failed to parse decrypted entity";
453    return false;
454  }
455  return true;
456}
457
458}  // namespace syncer
459