syncer.cc revision 21d179b334e59e9a3bfcaed4c4430bef1bc5759d
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync/engine/syncer.h"
6
7#include "base/message_loop.h"
8#include "base/time.h"
9#include "chrome/browser/sync/engine/apply_updates_command.h"
10#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
11#include "chrome/browser/sync/engine/build_commit_command.h"
12#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
13#include "chrome/browser/sync/engine/clear_data_command.h"
14#include "chrome/browser/sync/engine/conflict_resolver.h"
15#include "chrome/browser/sync/engine/download_updates_command.h"
16#include "chrome/browser/sync/engine/get_commit_ids_command.h"
17#include "chrome/browser/sync/engine/net/server_connection_manager.h"
18#include "chrome/browser/sync/engine/post_commit_message_command.h"
19#include "chrome/browser/sync/engine/process_commit_response_command.h"
20#include "chrome/browser/sync/engine/process_updates_command.h"
21#include "chrome/browser/sync/engine/resolve_conflicts_command.h"
22#include "chrome/browser/sync/engine/store_timestamps_command.h"
23#include "chrome/browser/sync/engine/syncer_end_command.h"
24#include "chrome/browser/sync/engine/syncer_types.h"
25#include "chrome/browser/sync/engine/syncer_util.h"
26#include "chrome/browser/sync/engine/syncproto.h"
27#include "chrome/browser/sync/engine/verify_updates_command.h"
28#include "chrome/browser/sync/syncable/directory_manager.h"
29#include "chrome/browser/sync/syncable/syncable-inl.h"
30#include "chrome/browser/sync/syncable/syncable.h"
31
32using base::TimeDelta;
33using sync_pb::ClientCommand;
34using syncable::Blob;
35using syncable::IS_UNAPPLIED_UPDATE;
36using syncable::SERVER_CTIME;
37using syncable::SERVER_IS_DEL;
38using syncable::SERVER_IS_DIR;
39using syncable::SERVER_MTIME;
40using syncable::SERVER_NON_UNIQUE_NAME;
41using syncable::SERVER_PARENT_ID;
42using syncable::SERVER_POSITION_IN_PARENT;
43using syncable::SERVER_SPECIFICS;
44using syncable::SERVER_VERSION;
45using syncable::SYNCER;
46using syncable::ScopedDirLookup;
47using syncable::WriteTransaction;
48
49namespace browser_sync {
50
51using sessions::ScopedSessionContextConflictResolver;
52using sessions::StatusController;
53using sessions::SyncSession;
54using sessions::ConflictProgress;
55
56Syncer::Syncer()
57    : early_exit_requested_(false),
58      max_commit_batch_size_(kDefaultMaxCommitBatchSize),
59      pre_conflict_resolution_closure_(NULL) {
60}
61
62Syncer::~Syncer() {}
63
64bool Syncer::ExitRequested() {
65  AutoLock lock(early_exit_requested_lock_);
66  return early_exit_requested_;
67}
68
69void Syncer::RequestEarlyExit() {
70  AutoLock lock(early_exit_requested_lock_);
71  early_exit_requested_ = true;
72}
73
74void Syncer::SyncShare(sessions::SyncSession* session) {
75  ScopedDirLookup dir(session->context()->directory_manager(),
76                      session->context()->account_name());
77  // The directory must be good here.
78  CHECK(dir.good());
79
80  const sessions::SyncSourceInfo& source(session->source());
81  if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA == source.first) {
82    SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
83    return;
84  } else {
85    // This isn't perfect, as we can end up bundling extensions activity
86    // intended for the next session into the current one.  We could do a
87    // test-and-reset as with the source, but note that also falls short if
88    // the commit request fails (e.g. due to lost connection), as we will
89    // fall all the way back to the syncer thread main loop in that case, and
90    // wind up creating a new session when a connection is established, losing
91    // the records set here on the original attempt.  This should provide us
92    // with the right data "most of the time", and we're only using this for
93    // analysis purposes, so Law of Large Numbers FTW.
94    session->context()->extensions_monitor()->GetAndClearRecords(
95        session->mutable_extensions_activity());
96    SyncShare(session, SYNCER_BEGIN, SYNCER_END);
97  }
98}
99
100void Syncer::SyncShare(sessions::SyncSession* session,
101                       const SyncerStep first_step,
102                       const SyncerStep last_step) {
103  ScopedSessionContextConflictResolver scoped(session->context(),
104                                              &resolver_);
105  SyncerStep current_step = first_step;
106
107  SyncerStep next_step = current_step;
108  while (!ExitRequested()) {
109    switch (current_step) {
110      case SYNCER_BEGIN:
111        VLOG(1) << "Syncer Begin";
112        next_step = CLEANUP_DISABLED_TYPES;
113        break;
114      case CLEANUP_DISABLED_TYPES: {
115        VLOG(1) << "Cleaning up disabled types";
116        CleanupDisabledTypesCommand cleanup;
117        cleanup.Execute(session);
118        next_step = DOWNLOAD_UPDATES;
119        break;
120      }
121      case DOWNLOAD_UPDATES: {
122        VLOG(1) << "Downloading Updates";
123        DownloadUpdatesCommand download_updates;
124        download_updates.Execute(session);
125        next_step = PROCESS_CLIENT_COMMAND;
126        break;
127      }
128      case PROCESS_CLIENT_COMMAND: {
129        VLOG(1) << "Processing Client Command";
130        ProcessClientCommand(session);
131        next_step = VERIFY_UPDATES;
132        break;
133      }
134      case VERIFY_UPDATES: {
135        VLOG(1) << "Verifying Updates";
136        VerifyUpdatesCommand verify_updates;
137        verify_updates.Execute(session);
138        next_step = PROCESS_UPDATES;
139        break;
140      }
141      case PROCESS_UPDATES: {
142        VLOG(1) << "Processing Updates";
143        ProcessUpdatesCommand process_updates;
144        process_updates.Execute(session);
145        next_step = STORE_TIMESTAMPS;
146        break;
147      }
148      case STORE_TIMESTAMPS: {
149        VLOG(1) << "Storing timestamps";
150        StoreTimestampsCommand store_timestamps;
151        store_timestamps.Execute(session);
152        // We should download all of the updates before attempting to process
153        // them.
154        if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
155            !session->status_controller()->download_updates_succeeded()) {
156          next_step = APPLY_UPDATES;
157        } else {
158          next_step = DOWNLOAD_UPDATES;
159        }
160        break;
161      }
162      case APPLY_UPDATES: {
163        VLOG(1) << "Applying Updates";
164        ApplyUpdatesCommand apply_updates;
165        apply_updates.Execute(session);
166        next_step = BUILD_COMMIT_REQUEST;
167        break;
168      }
169      // These two steps are combined since they are executed within the same
170      // write transaction.
171      case BUILD_COMMIT_REQUEST: {
172        session->status_controller()->set_syncing(true);
173
174        VLOG(1) << "Processing Commit Request";
175        ScopedDirLookup dir(session->context()->directory_manager(),
176                            session->context()->account_name());
177        if (!dir.good()) {
178          LOG(ERROR) << "Scoped dir lookup failed!";
179          return;
180        }
181        WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
182        sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
183
184        VLOG(1) << "Getting the Commit IDs";
185        GetCommitIdsCommand get_commit_ids_command(max_commit_batch_size_);
186        get_commit_ids_command.Execute(session);
187
188        if (!session->status_controller()->commit_ids().empty()) {
189          VLOG(1) << "Building a commit message";
190          BuildCommitCommand build_commit_command;
191          build_commit_command.Execute(session);
192
193          next_step = POST_COMMIT_MESSAGE;
194        } else {
195          next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
196        }
197
198        break;
199      }
200      case POST_COMMIT_MESSAGE: {
201        VLOG(1) << "Posting a commit request";
202        PostCommitMessageCommand post_commit_command;
203        post_commit_command.Execute(session);
204        next_step = PROCESS_COMMIT_RESPONSE;
205        break;
206      }
207      case PROCESS_COMMIT_RESPONSE: {
208        VLOG(1) << "Processing the commit response";
209        session->status_controller()->reset_num_conflicting_commits();
210        ProcessCommitResponseCommand process_response_command;
211        process_response_command.Execute(session);
212        next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
213        break;
214      }
215      case BUILD_AND_PROCESS_CONFLICT_SETS: {
216        VLOG(1) << "Building and Processing Conflict Sets";
217        BuildAndProcessConflictSetsCommand build_process_conflict_sets;
218        build_process_conflict_sets.Execute(session);
219        if (session->status_controller()->conflict_sets_built())
220          next_step = SYNCER_END;
221        else
222          next_step = RESOLVE_CONFLICTS;
223        break;
224      }
225      case RESOLVE_CONFLICTS: {
226        VLOG(1) << "Resolving Conflicts";
227
228        // Trigger the pre_conflict_resolution_closure_, which is a testing
229        // hook for the unit tests, if it is non-NULL.
230        if (pre_conflict_resolution_closure_) {
231          pre_conflict_resolution_closure_->Run();
232        }
233
234        StatusController* status = session->status_controller();
235        status->reset_conflicts_resolved();
236        ResolveConflictsCommand resolve_conflicts_command;
237        resolve_conflicts_command.Execute(session);
238        if (status->HasConflictingUpdates())
239          next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
240        else
241          next_step = SYNCER_END;
242        break;
243      }
244      case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
245        StatusController* status = session->status_controller();
246        VLOG(1) << "Applying updates to resolve conflicts";
247        ApplyUpdatesCommand apply_updates;
248        int before_conflicting_updates = status->TotalNumConflictingItems();
249        apply_updates.Execute(session);
250        int after_conflicting_updates = status->TotalNumConflictingItems();
251        status->update_conflicts_resolved(before_conflicting_updates >
252                                          after_conflicting_updates);
253        if (status->conflicts_resolved())
254          next_step = RESOLVE_CONFLICTS;
255        else
256          next_step = SYNCER_END;
257        break;
258      }
259      case CLEAR_PRIVATE_DATA: {
260        VLOG(1) << "Clear Private Data";
261        ClearDataCommand clear_data_command;
262        clear_data_command.Execute(session);
263        next_step = SYNCER_END;
264      }
265      case SYNCER_END: {
266        VLOG(1) << "Syncer End";
267        SyncerEndCommand syncer_end_command;
268        // This will set "syncing" to false, and send out a notification.
269        syncer_end_command.Execute(session);
270        goto post_while;
271      }
272      default:
273        LOG(ERROR) << "Unknown command: " << current_step;
274    }
275    if (last_step == current_step)
276      break;
277    current_step = next_step;
278  }
279 post_while:
280  return;
281}
282
283void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
284  const ClientToServerResponse& response =
285      session->status_controller()->updates_response();
286  if (!response.has_client_command())
287    return;
288  const ClientCommand& command = response.client_command();
289
290  // The server limits the number of items a client can commit in one batch.
291  if (command.has_max_commit_batch_size())
292    max_commit_batch_size_ = command.max_commit_batch_size();
293  if (command.has_set_sync_long_poll_interval()) {
294    session->delegate()->OnReceivedLongPollIntervalUpdate(
295        TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
296  }
297  if (command.has_set_sync_poll_interval()) {
298    session->delegate()->OnReceivedShortPollIntervalUpdate(
299        TimeDelta::FromSeconds(command.set_sync_poll_interval()));
300  }
301}
302
303void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
304  dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
305  dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
306  dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
307  dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
308  dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
309  dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
310  dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
311  dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
312  dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
313  dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
314}
315
316void ClearServerData(syncable::MutableEntry* entry) {
317  entry->Put(SERVER_NON_UNIQUE_NAME, "");
318  entry->Put(SERVER_PARENT_ID, syncable::kNullId);
319  entry->Put(SERVER_MTIME, 0);
320  entry->Put(SERVER_CTIME, 0);
321  entry->Put(SERVER_VERSION, 0);
322  entry->Put(SERVER_IS_DIR, false);
323  entry->Put(SERVER_IS_DEL, false);
324  entry->Put(IS_UNAPPLIED_UPDATE, false);
325  entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
326  entry->Put(SERVER_POSITION_IN_PARENT, 0);
327}
328
329}  // namespace browser_sync
330