1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync/engine/syncer.h"
6
7#include "base/message_loop.h"
8#include "base/time.h"
9#include "chrome/browser/sync/engine/apply_updates_command.h"
10#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
11#include "chrome/browser/sync/engine/build_commit_command.h"
12#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
13#include "chrome/browser/sync/engine/clear_data_command.h"
14#include "chrome/browser/sync/engine/conflict_resolver.h"
15#include "chrome/browser/sync/engine/download_updates_command.h"
16#include "chrome/browser/sync/engine/get_commit_ids_command.h"
17#include "chrome/browser/sync/engine/net/server_connection_manager.h"
18#include "chrome/browser/sync/engine/post_commit_message_command.h"
19#include "chrome/browser/sync/engine/process_commit_response_command.h"
20#include "chrome/browser/sync/engine/process_updates_command.h"
21#include "chrome/browser/sync/engine/resolve_conflicts_command.h"
22#include "chrome/browser/sync/engine/store_timestamps_command.h"
23#include "chrome/browser/sync/engine/syncer_end_command.h"
24#include "chrome/browser/sync/engine/syncer_types.h"
25#include "chrome/browser/sync/engine/syncer_util.h"
26#include "chrome/browser/sync/engine/syncproto.h"
27#include "chrome/browser/sync/engine/verify_updates_command.h"
28#include "chrome/browser/sync/syncable/directory_manager.h"
29#include "chrome/browser/sync/syncable/syncable-inl.h"
30#include "chrome/browser/sync/syncable/syncable.h"
31
32using base::TimeDelta;
33using sync_pb::ClientCommand;
34using syncable::Blob;
35using syncable::IS_UNAPPLIED_UPDATE;
36using syncable::SERVER_CTIME;
37using syncable::SERVER_IS_DEL;
38using syncable::SERVER_IS_DIR;
39using syncable::SERVER_MTIME;
40using syncable::SERVER_NON_UNIQUE_NAME;
41using syncable::SERVER_PARENT_ID;
42using syncable::SERVER_POSITION_IN_PARENT;
43using syncable::SERVER_SPECIFICS;
44using syncable::SERVER_VERSION;
45using syncable::SYNCER;
46using syncable::ScopedDirLookup;
47using syncable::WriteTransaction;
48
49namespace browser_sync {
50
51using sessions::ScopedSessionContextConflictResolver;
52using sessions::StatusController;
53using sessions::SyncSession;
54using sessions::ConflictProgress;
55
56Syncer::Syncer()
57    : early_exit_requested_(false),
58      pre_conflict_resolution_closure_(NULL) {
59}
60
61Syncer::~Syncer() {}
62
63bool Syncer::ExitRequested() {
64  base::AutoLock lock(early_exit_requested_lock_);
65  return early_exit_requested_;
66}
67
68void Syncer::RequestEarlyExit() {
69  base::AutoLock lock(early_exit_requested_lock_);
70  early_exit_requested_ = true;
71}
72
73// TODO(tim): Deprecated.
74void Syncer::SyncShare(sessions::SyncSession* session) {
75  ScopedDirLookup dir(session->context()->directory_manager(),
76                      session->context()->account_name());
77  // The directory must be good here.
78  CHECK(dir.good());
79
80  const sessions::SyncSourceInfo& source(session->source());
81  if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA ==
82      source.updates_source) {
83    SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
84    return;
85  } else {
86    SyncShare(session, SYNCER_BEGIN, SYNCER_END);
87  }
88}
89
90void Syncer::SyncShare(sessions::SyncSession* session,
91                       const SyncerStep first_step,
92                       const SyncerStep last_step) {
93  ScopedDirLookup dir(session->context()->directory_manager(),
94                      session->context()->account_name());
95  // The directory must be good here.
96  CHECK(dir.good());
97
98  ScopedSessionContextConflictResolver scoped(session->context(),
99                                              &resolver_);
100  SyncerStep current_step = first_step;
101
102  SyncerStep next_step = current_step;
103  while (!ExitRequested()) {
104    switch (current_step) {
105      case SYNCER_BEGIN:
106        VLOG(1) << "Syncer Begin";
107        // This isn't perfect, as we can end up bundling extensions activity
108        // intended for the next session into the current one.  We could do a
109        // test-and-reset as with the source, but note that also falls short if
110        // the commit request fails (e.g. due to lost connection), as we will
111        // fall all the way back to the syncer thread main loop in that case,
112        // creating a new session when a connection is established, losing the
113        // records set here on the original attempt.  This should provide us
114        // with the right data "most of the time", and we're only using this
115        // for analysis purposes, so Law of Large Numbers FTW.
116        session->context()->extensions_monitor()->GetAndClearRecords(
117            session->mutable_extensions_activity());
118        next_step = CLEANUP_DISABLED_TYPES;
119        break;
120      case CLEANUP_DISABLED_TYPES: {
121        VLOG(1) << "Cleaning up disabled types";
122        CleanupDisabledTypesCommand cleanup;
123        cleanup.Execute(session);
124        next_step = DOWNLOAD_UPDATES;
125        break;
126      }
127      case DOWNLOAD_UPDATES: {
128        VLOG(1) << "Downloading Updates";
129        DownloadUpdatesCommand download_updates;
130        download_updates.Execute(session);
131        next_step = PROCESS_CLIENT_COMMAND;
132        break;
133      }
134      case PROCESS_CLIENT_COMMAND: {
135        VLOG(1) << "Processing Client Command";
136        ProcessClientCommand(session);
137        next_step = VERIFY_UPDATES;
138        break;
139      }
140      case VERIFY_UPDATES: {
141        VLOG(1) << "Verifying Updates";
142        VerifyUpdatesCommand verify_updates;
143        verify_updates.Execute(session);
144        next_step = PROCESS_UPDATES;
145        break;
146      }
147      case PROCESS_UPDATES: {
148        VLOG(1) << "Processing Updates";
149        ProcessUpdatesCommand process_updates;
150        process_updates.Execute(session);
151        next_step = STORE_TIMESTAMPS;
152        break;
153      }
154      case STORE_TIMESTAMPS: {
155        VLOG(1) << "Storing timestamps";
156        StoreTimestampsCommand store_timestamps;
157        store_timestamps.Execute(session);
158        // We should download all of the updates before attempting to process
159        // them.
160        if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
161            !session->status_controller()->download_updates_succeeded()) {
162          next_step = APPLY_UPDATES;
163        } else {
164          next_step = DOWNLOAD_UPDATES;
165        }
166        break;
167      }
168      case APPLY_UPDATES: {
169        VLOG(1) << "Applying Updates";
170        ApplyUpdatesCommand apply_updates;
171        apply_updates.Execute(session);
172        next_step = BUILD_COMMIT_REQUEST;
173        break;
174      }
175      // These two steps are combined since they are executed within the same
176      // write transaction.
177      case BUILD_COMMIT_REQUEST: {
178        session->status_controller()->set_syncing(true);
179
180        VLOG(1) << "Processing Commit Request";
181        ScopedDirLookup dir(session->context()->directory_manager(),
182                            session->context()->account_name());
183        if (!dir.good()) {
184          LOG(ERROR) << "Scoped dir lookup failed!";
185          return;
186        }
187        WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
188        sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
189
190        VLOG(1) << "Getting the Commit IDs";
191        GetCommitIdsCommand get_commit_ids_command(
192            session->context()->max_commit_batch_size());
193        get_commit_ids_command.Execute(session);
194
195        if (!session->status_controller()->commit_ids().empty()) {
196          VLOG(1) << "Building a commit message";
197          BuildCommitCommand build_commit_command;
198          build_commit_command.Execute(session);
199
200          next_step = POST_COMMIT_MESSAGE;
201        } else {
202          next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
203        }
204
205        break;
206      }
207      case POST_COMMIT_MESSAGE: {
208        VLOG(1) << "Posting a commit request";
209        PostCommitMessageCommand post_commit_command;
210        post_commit_command.Execute(session);
211        next_step = PROCESS_COMMIT_RESPONSE;
212        break;
213      }
214      case PROCESS_COMMIT_RESPONSE: {
215        VLOG(1) << "Processing the commit response";
216        session->status_controller()->reset_num_conflicting_commits();
217        ProcessCommitResponseCommand process_response_command;
218        process_response_command.Execute(session);
219        next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
220        break;
221      }
222      case BUILD_AND_PROCESS_CONFLICT_SETS: {
223        VLOG(1) << "Building and Processing Conflict Sets";
224        BuildAndProcessConflictSetsCommand build_process_conflict_sets;
225        build_process_conflict_sets.Execute(session);
226        if (session->status_controller()->conflict_sets_built())
227          next_step = SYNCER_END;
228        else
229          next_step = RESOLVE_CONFLICTS;
230        break;
231      }
232      case RESOLVE_CONFLICTS: {
233        VLOG(1) << "Resolving Conflicts";
234
235        // Trigger the pre_conflict_resolution_closure_, which is a testing
236        // hook for the unit tests, if it is non-NULL.
237        if (pre_conflict_resolution_closure_) {
238          pre_conflict_resolution_closure_->Run();
239        }
240
241        StatusController* status = session->status_controller();
242        status->reset_conflicts_resolved();
243        ResolveConflictsCommand resolve_conflicts_command;
244        resolve_conflicts_command.Execute(session);
245        if (status->HasConflictingUpdates())
246          next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
247        else
248          next_step = SYNCER_END;
249        break;
250      }
251      case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
252        StatusController* status = session->status_controller();
253        VLOG(1) << "Applying updates to resolve conflicts";
254        ApplyUpdatesCommand apply_updates;
255        int before_conflicting_updates = status->TotalNumConflictingItems();
256        apply_updates.Execute(session);
257        int after_conflicting_updates = status->TotalNumConflictingItems();
258        status->update_conflicts_resolved(before_conflicting_updates >
259                                          after_conflicting_updates);
260        if (status->conflicts_resolved())
261          next_step = RESOLVE_CONFLICTS;
262        else
263          next_step = SYNCER_END;
264        break;
265      }
266      case CLEAR_PRIVATE_DATA: {
267        VLOG(1) << "Clear Private Data";
268        ClearDataCommand clear_data_command;
269        clear_data_command.Execute(session);
270        next_step = SYNCER_END;
271        break;
272      }
273      case SYNCER_END: {
274        break;
275      }
276      default:
277        LOG(ERROR) << "Unknown command: " << current_step;
278    }
279    if (last_step == current_step)
280      break;
281    current_step = next_step;
282  }
283
284  VLOG(1) << "Syncer End";
285  SyncerEndCommand syncer_end_command;
286  syncer_end_command.Execute(session);
287  return;
288}
289
290void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
291  const ClientToServerResponse& response =
292      session->status_controller()->updates_response();
293  if (!response.has_client_command())
294    return;
295  const ClientCommand& command = response.client_command();
296
297  // The server limits the number of items a client can commit in one batch.
298  if (command.has_max_commit_batch_size()) {
299    session->context()->set_max_commit_batch_size(
300        command.max_commit_batch_size());
301  }
302  if (command.has_set_sync_long_poll_interval()) {
303    session->delegate()->OnReceivedLongPollIntervalUpdate(
304        TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
305  }
306  if (command.has_set_sync_poll_interval()) {
307    session->delegate()->OnReceivedShortPollIntervalUpdate(
308        TimeDelta::FromSeconds(command.set_sync_poll_interval()));
309  }
310}
311
312void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
313  dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
314  dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
315  dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
316  dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
317  dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
318  dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
319  dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
320  dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
321  dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
322  dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
323}
324
325void ClearServerData(syncable::MutableEntry* entry) {
326  entry->Put(SERVER_NON_UNIQUE_NAME, "");
327  entry->Put(SERVER_PARENT_ID, syncable::kNullId);
328  entry->Put(SERVER_MTIME, 0);
329  entry->Put(SERVER_CTIME, 0);
330  entry->Put(SERVER_VERSION, 0);
331  entry->Put(SERVER_IS_DIR, false);
332  entry->Put(SERVER_IS_DEL, false);
333  entry->Put(IS_UNAPPLIED_UPDATE, false);
334  entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
335  entry->Put(SERVER_POSITION_IN_PARENT, 0);
336}
337
338}  // namespace browser_sync
339