1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/engine/non_blocking_type_processor_core.h"
6
7#include "base/bind.h"
8#include "base/format_macros.h"
9#include "base/logging.h"
10#include "base/strings/stringprintf.h"
11#include "sync/engine/commit_contribution.h"
12#include "sync/engine/non_blocking_type_commit_contribution.h"
13#include "sync/engine/non_blocking_type_processor_interface.h"
14#include "sync/engine/sync_thread_sync_entity.h"
15#include "sync/syncable/syncable_util.h"
16#include "sync/util/time.h"
17
18namespace syncer {
19
20NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
21    ModelType type,
22    const DataTypeState& initial_state,
23    scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
24    : type_(type),
25      data_type_state_(initial_state),
26      processor_interface_(processor_interface.Pass()),
27      entities_deleter_(&entities_),
28      weak_ptr_factory_(this) {
29}
30
31NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
32}
33
34ModelType NonBlockingTypeProcessorCore::GetModelType() const {
35  DCHECK(CalledOnValidThread());
36  return type_;
37}
38
39// UpdateHandler implementation.
40void NonBlockingTypeProcessorCore::GetDownloadProgress(
41    sync_pb::DataTypeProgressMarker* progress_marker) const {
42  DCHECK(CalledOnValidThread());
43  progress_marker->CopyFrom(data_type_state_.progress_marker);
44}
45
46void NonBlockingTypeProcessorCore::GetDataTypeContext(
47    sync_pb::DataTypeContext* context) const {
48  DCHECK(CalledOnValidThread());
49  context->CopyFrom(data_type_state_.type_context);
50}
51
52SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
53    const sync_pb::DataTypeProgressMarker& progress_marker,
54    const sync_pb::DataTypeContext& mutated_context,
55    const SyncEntityList& applicable_updates,
56    sessions::StatusController* status) {
57  DCHECK(CalledOnValidThread());
58
59  // TODO(rlarocque): Handle data type context conflicts.
60  data_type_state_.type_context = mutated_context;
61  data_type_state_.progress_marker = progress_marker;
62
63  UpdateResponseDataList response_datas;
64
65  for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
66       update_it != applicable_updates.end();
67       ++update_it) {
68    const sync_pb::SyncEntity* update_entity = *update_it;
69    if (!update_entity->server_defined_unique_tag().empty()) {
70      // We can't commit an item unless we know its parent ID.  This is where
71      // we learn that ID and remember it forever.
72      DCHECK_EQ(ModelTypeToRootTag(type_),
73                update_entity->server_defined_unique_tag());
74      if (!data_type_state_.type_root_id.empty()) {
75        DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
76      }
77      data_type_state_.type_root_id = update_entity->id_string();
78    } else {
79      // Normal updates are handled here.
80      const std::string& client_tag_hash =
81          update_entity->client_defined_unique_tag();
82      DCHECK(!client_tag_hash.empty());
83      EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
84      if (map_it == entities_.end()) {
85        SyncThreadSyncEntity* entity =
86            SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
87                                                   client_tag_hash,
88                                                   update_entity->version());
89        entities_.insert(std::make_pair(client_tag_hash, entity));
90      } else {
91        SyncThreadSyncEntity* entity = map_it->second;
92        entity->ReceiveUpdate(update_entity->version());
93      }
94
95      // Prepare the message for the model thread.
96      UpdateResponseData response_data;
97      response_data.id = update_entity->id_string();
98      response_data.client_tag_hash = client_tag_hash;
99      response_data.response_version = update_entity->version();
100      response_data.ctime = ProtoTimeToTime(update_entity->ctime());
101      response_data.mtime = ProtoTimeToTime(update_entity->mtime());
102      response_data.non_unique_name = update_entity->name();
103      response_data.deleted = update_entity->deleted();
104      response_data.specifics = update_entity->specifics();
105
106      response_datas.push_back(response_data);
107    }
108  }
109
110  // Forward these updates to the model thread so it can do the rest.
111  processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
112
113  return SYNCER_OK;
114}
115
116void NonBlockingTypeProcessorCore::ApplyUpdates(
117    sessions::StatusController* status) {
118  DCHECK(CalledOnValidThread());
119  // This function is called only when we've finished a download cycle, ie. we
120  // got a response with changes_remaining == 0.  If this is our first download
121  // cycle, we should update our state so the NonBlockingTypeProcessor knows
122  // that it's safe to commit items now.
123  if (!data_type_state_.initial_sync_done) {
124    data_type_state_.initial_sync_done = true;
125
126    UpdateResponseDataList empty_update_list;
127    processor_interface_->ReceiveUpdateResponse(data_type_state_,
128                                                empty_update_list);
129  }
130}
131
132void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
133    sessions::StatusController* status) {
134  NOTREACHED()
135      << "Non-blocking types should never apply updates on sync thread.  "
136      << "ModelType is: " << ModelTypeToString(type_);
137}
138
139void NonBlockingTypeProcessorCore::EnqueueForCommit(
140    const CommitRequestDataList& list) {
141  DCHECK(CalledOnValidThread());
142
143  DCHECK(CanCommitItems())
144      << "Asked to commit items before type was initialized.  "
145      << "ModelType is: " << ModelTypeToString(type_);
146
147  for (CommitRequestDataList::const_iterator it = list.begin();
148       it != list.end();
149       ++it) {
150    StorePendingCommit(*it);
151  }
152}
153
154// CommitContributor implementation.
155scoped_ptr<CommitContribution>
156NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
157  DCHECK(CalledOnValidThread());
158
159  size_t space_remaining = max_entries;
160  std::vector<int64> sequence_numbers;
161  google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
162
163  if (!CanCommitItems())
164    return scoped_ptr<CommitContribution>();
165
166  // TODO(rlarocque): Avoid iterating here.
167  for (EntityMap::const_iterator it = entities_.begin();
168       it != entities_.end() && space_remaining > 0;
169       ++it) {
170    SyncThreadSyncEntity* entity = it->second;
171    if (entity->IsCommitPending()) {
172      sync_pb::SyncEntity* commit_entity = commit_entities.Add();
173      int64 sequence_number = -1;
174
175      entity->PrepareCommitProto(commit_entity, &sequence_number);
176      HelpInitializeCommitEntity(commit_entity);
177      sequence_numbers.push_back(sequence_number);
178
179      space_remaining--;
180    }
181  }
182
183  if (commit_entities.size() == 0)
184    return scoped_ptr<CommitContribution>();
185
186  return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
187      data_type_state_.type_context, commit_entities, sequence_numbers, this));
188}
189
190void NonBlockingTypeProcessorCore::StorePendingCommit(
191    const CommitRequestData& request) {
192  if (!request.deleted) {
193    DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
194  }
195
196  EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
197  if (map_it == entities_.end()) {
198    SyncThreadSyncEntity* entity =
199        SyncThreadSyncEntity::FromCommitRequest(request.id,
200                                                request.client_tag_hash,
201                                                request.sequence_number,
202                                                request.base_version,
203                                                request.ctime,
204                                                request.mtime,
205                                                request.non_unique_name,
206                                                request.deleted,
207                                                request.specifics);
208    entities_.insert(std::make_pair(request.client_tag_hash, entity));
209  } else {
210    SyncThreadSyncEntity* entity = map_it->second;
211    entity->RequestCommit(request.id,
212                          request.client_tag_hash,
213                          request.sequence_number,
214                          request.base_version,
215                          request.ctime,
216                          request.mtime,
217                          request.non_unique_name,
218                          request.deleted,
219                          request.specifics);
220  }
221
222  // TODO: Nudge SyncScheduler.
223}
224
225void NonBlockingTypeProcessorCore::OnCommitResponse(
226    const CommitResponseDataList& response_list) {
227  for (CommitResponseDataList::const_iterator response_it =
228           response_list.begin();
229       response_it != response_list.end();
230       ++response_it) {
231    const std::string client_tag_hash = response_it->client_tag_hash;
232    EntityMap::iterator map_it = entities_.find(client_tag_hash);
233
234    // There's no way we could have committed an entry we know nothing about.
235    if (map_it == entities_.end()) {
236      NOTREACHED() << "Received commit response for item unknown to us."
237                   << " Model type: " << ModelTypeToString(type_)
238                   << " ID: " << response_it->id;
239      continue;
240    }
241
242    SyncThreadSyncEntity* entity = map_it->second;
243    entity->ReceiveCommitResponse(response_it->id,
244                                  response_it->response_version,
245                                  response_it->sequence_number);
246  }
247
248  // Send the responses back to the model thread.  It needs to know which
249  // items have been successfully committed so it can save that information in
250  // permanent storage.
251  processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
252}
253
254base::WeakPtr<NonBlockingTypeProcessorCore>
255NonBlockingTypeProcessorCore::AsWeakPtr() {
256  return weak_ptr_factory_.GetWeakPtr();
257}
258
259bool NonBlockingTypeProcessorCore::CanCommitItems() const {
260  // We can't commit anything until we know the type's parent node.
261  // We'll get it in the first update response.
262  return !data_type_state_.type_root_id.empty() &&
263         data_type_state_.initial_sync_done;
264}
265
266void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
267    sync_pb::SyncEntity* sync_entity) {
268  // Initial commits need our help to generate a client ID.
269  if (!sync_entity->has_id_string()) {
270    DCHECK_EQ(kUncommittedVersion, sync_entity->version());
271    const int64 id = data_type_state_.next_client_id++;
272    sync_entity->set_id_string(
273        base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
274  }
275
276  // Always include enough specifics to identify the type.  Do this even in
277  // deletion requests, where the specifics are otherwise invalid.
278  if (!sync_entity->has_specifics()) {
279    AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
280  }
281
282  // We're always responsible for the parent ID.
283  sync_entity->set_parent_id_string(data_type_state_.type_root_id);
284}
285
286}  // namespace syncer
287