1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/engine/non_blocking_type_processor.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/message_loop/message_loop_proxy.h"
10#include "sync/engine/model_thread_sync_entity.h"
11#include "sync/engine/non_blocking_type_processor_core_interface.h"
12#include "sync/internal_api/public/sync_core_proxy.h"
13#include "sync/syncable/syncable_util.h"
14
15namespace syncer {
16
17NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
18    : type_(type),
19      is_preferred_(false),
20      is_connected_(false),
21      entities_deleter_(&entities_),
22      weak_ptr_factory_for_ui_(this),
23      weak_ptr_factory_for_sync_(this) {
24}
25
26NonBlockingTypeProcessor::~NonBlockingTypeProcessor() {
27}
28
29bool NonBlockingTypeProcessor::IsPreferred() const {
30  DCHECK(CalledOnValidThread());
31  return is_preferred_;
32}
33
34bool NonBlockingTypeProcessor::IsConnected() const {
35  DCHECK(CalledOnValidThread());
36  return is_connected_;
37}
38
39ModelType NonBlockingTypeProcessor::GetModelType() const {
40  DCHECK(CalledOnValidThread());
41  return type_;
42}
43
44void NonBlockingTypeProcessor::Enable(
45    scoped_ptr<SyncCoreProxy> sync_core_proxy) {
46  DCHECK(CalledOnValidThread());
47  DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
48
49  is_preferred_ = true;
50
51  // TODO(rlarocque): At some point, this should be loaded from storage.
52  data_type_state_.progress_marker.set_data_type_id(
53      GetSpecificsFieldNumberFromModelType(type_));
54
55  sync_core_proxy_ = sync_core_proxy.Pass();
56  sync_core_proxy_->ConnectTypeToCore(GetModelType(),
57                                      data_type_state_,
58                                      weak_ptr_factory_for_sync_.GetWeakPtr());
59}
60
61void NonBlockingTypeProcessor::Disable() {
62  DCHECK(CalledOnValidThread());
63  is_preferred_ = false;
64  Disconnect();
65}
66
67void NonBlockingTypeProcessor::Disconnect() {
68  DCHECK(CalledOnValidThread());
69  DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
70  is_connected_ = false;
71
72  if (sync_core_proxy_) {
73    sync_core_proxy_->Disconnect(GetModelType());
74    sync_core_proxy_.reset();
75  }
76
77  weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
78  core_interface_.reset();
79}
80
81base::WeakPtr<NonBlockingTypeProcessor>
82NonBlockingTypeProcessor::AsWeakPtrForUI() {
83  DCHECK(CalledOnValidThread());
84  return weak_ptr_factory_for_ui_.GetWeakPtr();
85}
86
87void NonBlockingTypeProcessor::OnConnect(
88    scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
89  DCHECK(CalledOnValidThread());
90  DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
91
92  is_connected_ = true;
93  core_interface_ = core_interface.Pass();
94
95  FlushPendingCommitRequests();
96}
97
98void NonBlockingTypeProcessor::Put(const std::string& client_tag,
99                                   const sync_pb::EntitySpecifics& specifics) {
100  DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
101
102  const std::string client_tag_hash(
103      syncable::GenerateSyncableHash(type_, client_tag));
104
105  EntityMap::iterator it = entities_.find(client_tag_hash);
106  if (it == entities_.end()) {
107    scoped_ptr<ModelThreadSyncEntity> entity(
108        ModelThreadSyncEntity::NewLocalItem(
109            client_tag, specifics, base::Time::Now()));
110    entities_.insert(std::make_pair(client_tag_hash, entity.release()));
111  } else {
112    ModelThreadSyncEntity* entity = it->second;
113    entity->MakeLocalChange(specifics);
114  }
115
116  FlushPendingCommitRequests();
117}
118
119void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
120  const std::string client_tag_hash(
121      syncable::GenerateSyncableHash(type_, client_tag));
122
123  EntityMap::iterator it = entities_.find(client_tag_hash);
124  if (it == entities_.end()) {
125    // That's unusual, but not necessarily a bad thing.
126    // Missing is as good as deleted as far as the model is concerned.
127    DLOG(WARNING) << "Attempted to delete missing item."
128                  << " client tag: " << client_tag;
129  } else {
130    ModelThreadSyncEntity* entity = it->second;
131    entity->Delete();
132  }
133
134  FlushPendingCommitRequests();
135}
136
137void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
138  CommitRequestDataList commit_requests;
139
140  // Don't bother sending anything if there's no one to send to.
141  if (!IsConnected())
142    return;
143
144  // Don't send anything if the type is not ready to handle commits.
145  if (!data_type_state_.initial_sync_done)
146    return;
147
148  // TODO(rlarocque): Do something smarter than iterate here.
149  for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
150       ++it) {
151    if (it->second->RequiresCommitRequest()) {
152      CommitRequestData request;
153      it->second->InitializeCommitRequestData(&request);
154      commit_requests.push_back(request);
155      it->second->SetCommitRequestInProgress();
156    }
157  }
158
159  if (!commit_requests.empty())
160    core_interface_->RequestCommits(commit_requests);
161}
162
163void NonBlockingTypeProcessor::OnCommitCompletion(
164    const DataTypeState& type_state,
165    const CommitResponseDataList& response_list) {
166  data_type_state_ = type_state;
167
168  for (CommitResponseDataList::const_iterator list_it = response_list.begin();
169       list_it != response_list.end();
170       ++list_it) {
171    const CommitResponseData& response_data = *list_it;
172    const std::string& client_tag_hash = response_data.client_tag_hash;
173
174    EntityMap::iterator it = entities_.find(client_tag_hash);
175    if (it == entities_.end()) {
176      NOTREACHED() << "Received commit response for missing item."
177                   << " type: " << type_ << " client_tag: " << client_tag_hash;
178      return;
179    } else {
180      it->second->ReceiveCommitResponse(response_data.id,
181                                        response_data.sequence_number,
182                                        response_data.response_version);
183    }
184  }
185}
186
187void NonBlockingTypeProcessor::OnUpdateReceived(
188    const DataTypeState& data_type_state,
189    const UpdateResponseDataList& response_list) {
190  bool initial_sync_just_finished =
191      !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
192
193  data_type_state_ = data_type_state;
194
195  for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
196       list_it != response_list.end();
197       ++list_it) {
198    const UpdateResponseData& response_data = *list_it;
199    const std::string& client_tag_hash = response_data.client_tag_hash;
200
201    EntityMap::iterator it = entities_.find(client_tag_hash);
202    if (it == entities_.end()) {
203      scoped_ptr<ModelThreadSyncEntity> entity =
204          ModelThreadSyncEntity::FromServerUpdate(
205              response_data.id,
206              response_data.client_tag_hash,
207              response_data.non_unique_name,
208              response_data.response_version,
209              response_data.specifics,
210              response_data.deleted,
211              response_data.ctime,
212              response_data.mtime);
213      entities_.insert(std::make_pair(client_tag_hash, entity.release()));
214    } else {
215      ModelThreadSyncEntity* entity = it->second;
216      entity->ApplyUpdateFromServer(response_data.response_version,
217                                    response_data.deleted,
218                                    response_data.specifics,
219                                    response_data.mtime);
220      // TODO: Do something special when conflicts are detected.
221    }
222  }
223
224  if (initial_sync_just_finished)
225    FlushPendingCommitRequests();
226
227  // TODO: Inform the model of the new or updated data.
228}
229
230}  // namespace syncer
231