syncer.cc revision 3345a6884c488ff3a535c2c9acdd33d74b37e311
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync/engine/syncer.h"
6
7#include "base/message_loop.h"
8#include "base/time.h"
9#include "chrome/browser/chrome_thread.h"
10#include "chrome/browser/sync/engine/apply_updates_command.h"
11#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
12#include "chrome/browser/sync/engine/build_commit_command.h"
13#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
14#include "chrome/browser/sync/engine/clear_data_command.h"
15#include "chrome/browser/sync/engine/conflict_resolver.h"
16#include "chrome/browser/sync/engine/download_updates_command.h"
17#include "chrome/browser/sync/engine/get_commit_ids_command.h"
18#include "chrome/browser/sync/engine/net/server_connection_manager.h"
19#include "chrome/browser/sync/engine/post_commit_message_command.h"
20#include "chrome/browser/sync/engine/process_commit_response_command.h"
21#include "chrome/browser/sync/engine/process_updates_command.h"
22#include "chrome/browser/sync/engine/resolve_conflicts_command.h"
23#include "chrome/browser/sync/engine/store_timestamps_command.h"
24#include "chrome/browser/sync/engine/syncer_end_command.h"
25#include "chrome/browser/sync/engine/syncer_types.h"
26#include "chrome/browser/sync/engine/syncer_util.h"
27#include "chrome/browser/sync/engine/syncproto.h"
28#include "chrome/browser/sync/engine/verify_updates_command.h"
29#include "chrome/browser/sync/syncable/directory_manager.h"
30#include "chrome/browser/sync/syncable/syncable-inl.h"
31#include "chrome/browser/sync/syncable/syncable.h"
32
33using base::TimeDelta;
34using sync_pb::ClientCommand;
35using syncable::Blob;
36using syncable::IS_UNAPPLIED_UPDATE;
37using syncable::SERVER_CTIME;
38using syncable::SERVER_IS_DEL;
39using syncable::SERVER_IS_DIR;
40using syncable::SERVER_MTIME;
41using syncable::SERVER_NON_UNIQUE_NAME;
42using syncable::SERVER_PARENT_ID;
43using syncable::SERVER_POSITION_IN_PARENT;
44using syncable::SERVER_SPECIFICS;
45using syncable::SERVER_VERSION;
46using syncable::SYNCER;
47using syncable::ScopedDirLookup;
48using syncable::WriteTransaction;
49
50namespace browser_sync {
51
52using sessions::StatusController;
53using sessions::SyncSession;
54using sessions::ConflictProgress;
55
56Syncer::Syncer(sessions::SyncSessionContext* context)
57    : early_exit_requested_(false),
58      max_commit_batch_size_(kDefaultMaxCommitBatchSize),
59      syncer_event_channel_(new SyncerEventChannel()),
60      resolver_scoper_(context, &resolver_),
61      event_channel_scoper_(context, syncer_event_channel_.get()),
62      context_(context),
63      updates_source_(sync_pb::GetUpdatesCallerInfo::UNKNOWN),
64      pre_conflict_resolution_closure_(NULL) {
65  shutdown_channel_.reset(new ShutdownChannel());
66
67  ScopedDirLookup dir(context->directory_manager(), context->account_name());
68  // The directory must be good here.
69  CHECK(dir.good());
70}
71
72Syncer::~Syncer() {
73  syncer_event_channel_->Notify(
74      SyncerEvent(SyncerEvent::SHUTDOWN_USE_WITH_CARE));
75  shutdown_channel_->Notify(SyncerShutdownEvent(this));
76}
77
78bool Syncer::ExitRequested() {
79  AutoLock lock(early_exit_requested_lock_);
80  return early_exit_requested_;
81}
82
83void Syncer::RequestEarlyExit() {
84  AutoLock lock(early_exit_requested_lock_);
85  early_exit_requested_ = true;
86}
87
88void Syncer::RequestNudge(int milliseconds) {
89  SyncerEvent event(SyncerEvent::REQUEST_SYNC_NUDGE);
90  event.nudge_delay_milliseconds = milliseconds;
91  syncer_event_channel_->Notify(event);
92}
93
94bool Syncer::SyncShare(sessions::SyncSession::Delegate* delegate) {
95  sessions::SyncSession session(context_, delegate);
96  return SyncShare(&session);
97}
98
99bool Syncer::SyncShare(sessions::SyncSession* session) {
100  sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source =
101      TestAndSetUpdatesSource();
102  session->set_source(source);
103  if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA == source) {
104    SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
105    return false;
106  } else {
107    // This isn't perfect, as we can end up bundling extensions activity
108    // intended for the next session into the current one.  We could do a
109    // test-and-reset as with the source, but note that also falls short if
110    // the commit request fails (e.g. due to lost connection), as we will
111    // fall all the way back to the syncer thread main loop in that case, and
112    // wind up creating a new session when a connection is established, losing
113    // the records set here on the original attempt.  This should provide us
114    // with the right data "most of the time", and we're only using this for
115    // analysis purposes, so Law of Large Numbers FTW.
116    context_->extensions_monitor()->GetAndClearRecords(
117        session->mutable_extensions_activity());
118    SyncShare(session, SYNCER_BEGIN, SYNCER_END);
119    return session->HasMoreToSync();
120  }
121}
122
123bool Syncer::SyncShare(SyncerStep first_step, SyncerStep last_step,
124                       sessions::SyncSession::Delegate* delegate) {
125  sessions::SyncSession session(context_, delegate);
126  SyncShare(&session, first_step, last_step);
127  return session.HasMoreToSync();
128}
129
130void Syncer::SyncShare(sessions::SyncSession* session,
131                       const SyncerStep first_step,
132                       const SyncerStep last_step) {
133  SyncerStep current_step = first_step;
134
135  SyncerStep next_step = current_step;
136  while (!ExitRequested()) {
137    switch (current_step) {
138      case SYNCER_BEGIN:
139        LOG(INFO) << "Syncer Begin";
140        next_step = CLEANUP_DISABLED_TYPES;
141        break;
142      case CLEANUP_DISABLED_TYPES: {
143        LOG(INFO) << "Cleaning up disabled types";
144        CleanupDisabledTypesCommand cleanup;
145        cleanup.Execute(session);
146        next_step = DOWNLOAD_UPDATES;
147        break;
148      }
149      case DOWNLOAD_UPDATES: {
150        LOG(INFO) << "Downloading Updates";
151        DownloadUpdatesCommand download_updates;
152        download_updates.Execute(session);
153        next_step = PROCESS_CLIENT_COMMAND;
154        break;
155      }
156      case PROCESS_CLIENT_COMMAND: {
157        LOG(INFO) << "Processing Client Command";
158        ProcessClientCommand(session);
159        next_step = VERIFY_UPDATES;
160        break;
161      }
162      case VERIFY_UPDATES: {
163        LOG(INFO) << "Verifying Updates";
164        VerifyUpdatesCommand verify_updates;
165        verify_updates.Execute(session);
166        next_step = PROCESS_UPDATES;
167        break;
168      }
169      case PROCESS_UPDATES: {
170        LOG(INFO) << "Processing Updates";
171        ProcessUpdatesCommand process_updates;
172        process_updates.Execute(session);
173        next_step = STORE_TIMESTAMPS;
174        break;
175      }
176      case STORE_TIMESTAMPS: {
177        LOG(INFO) << "Storing timestamps";
178        StoreTimestampsCommand store_timestamps;
179        store_timestamps.Execute(session);
180        // We should download all of the updates before attempting to process
181        // them.
182        if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
183            !session->status_controller()->download_updates_succeeded()) {
184          next_step = APPLY_UPDATES;
185        } else {
186          next_step = DOWNLOAD_UPDATES;
187        }
188        break;
189      }
190      case APPLY_UPDATES: {
191        LOG(INFO) << "Applying Updates";
192        ApplyUpdatesCommand apply_updates;
193        apply_updates.Execute(session);
194        next_step = BUILD_COMMIT_REQUEST;
195        break;
196      }
197      // These two steps are combined since they are executed within the same
198      // write transaction.
199      case BUILD_COMMIT_REQUEST: {
200        session->status_controller()->set_syncing(true);
201
202        LOG(INFO) << "Processing Commit Request";
203        ScopedDirLookup dir(context_->directory_manager(),
204                            context_->account_name());
205        if (!dir.good()) {
206          LOG(ERROR) << "Scoped dir lookup failed!";
207          return;
208        }
209        WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
210        sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
211
212        LOG(INFO) << "Getting the Commit IDs";
213        GetCommitIdsCommand get_commit_ids_command(max_commit_batch_size_);
214        get_commit_ids_command.Execute(session);
215
216        if (!session->status_controller()->commit_ids().empty()) {
217          LOG(INFO) << "Building a commit message";
218          BuildCommitCommand build_commit_command;
219          build_commit_command.Execute(session);
220
221          next_step = POST_COMMIT_MESSAGE;
222        } else {
223          next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
224        }
225
226        break;
227      }
228      case POST_COMMIT_MESSAGE: {
229        LOG(INFO) << "Posting a commit request";
230        PostCommitMessageCommand post_commit_command;
231        post_commit_command.Execute(session);
232        next_step = PROCESS_COMMIT_RESPONSE;
233        break;
234      }
235      case PROCESS_COMMIT_RESPONSE: {
236        LOG(INFO) << "Processing the commit response";
237        session->status_controller()->reset_num_conflicting_commits();
238        ProcessCommitResponseCommand process_response_command;
239        process_response_command.Execute(session);
240        next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
241        break;
242      }
243      case BUILD_AND_PROCESS_CONFLICT_SETS: {
244        LOG(INFO) << "Building and Processing Conflict Sets";
245        BuildAndProcessConflictSetsCommand build_process_conflict_sets;
246        build_process_conflict_sets.Execute(session);
247        if (session->status_controller()->conflict_sets_built())
248          next_step = SYNCER_END;
249        else
250          next_step = RESOLVE_CONFLICTS;
251        break;
252      }
253      case RESOLVE_CONFLICTS: {
254        LOG(INFO) << "Resolving Conflicts";
255
256        // Trigger the pre_conflict_resolution_closure_, which is a testing
257        // hook for the unit tests, if it is non-NULL.
258        if (pre_conflict_resolution_closure_) {
259          pre_conflict_resolution_closure_->Run();
260        }
261
262        StatusController* status = session->status_controller();
263        status->reset_conflicts_resolved();
264        ResolveConflictsCommand resolve_conflicts_command;
265        resolve_conflicts_command.Execute(session);
266        if (status->HasConflictingUpdates())
267          next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
268        else
269          next_step = SYNCER_END;
270        break;
271      }
272      case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
273        StatusController* status = session->status_controller();
274        LOG(INFO) << "Applying updates to resolve conflicts";
275        ApplyUpdatesCommand apply_updates;
276        int before_conflicting_updates = status->TotalNumConflictingItems();
277        apply_updates.Execute(session);
278        int after_conflicting_updates = status->TotalNumConflictingItems();
279        status->update_conflicts_resolved(before_conflicting_updates >
280                                          after_conflicting_updates);
281        if (status->conflicts_resolved())
282          next_step = RESOLVE_CONFLICTS;
283        else
284          next_step = SYNCER_END;
285        break;
286      }
287      case CLEAR_PRIVATE_DATA: {
288        LOG(INFO) << "Clear Private Data";
289        ClearDataCommand clear_data_command;
290        clear_data_command.Execute(session);
291        next_step = SYNCER_END;
292      }
293      case SYNCER_END: {
294        LOG(INFO) << "Syncer End";
295        SyncerEndCommand syncer_end_command;
296        // This will set "syncing" to false, and send out a notification.
297        syncer_end_command.Execute(session);
298        goto post_while;
299      }
300      default:
301        LOG(ERROR) << "Unknown command: " << current_step;
302    }
303    if (last_step == current_step)
304      break;
305    current_step = next_step;
306  }
307 post_while:
308  return;
309}
310
311void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
312  const ClientToServerResponse& response =
313      session->status_controller()->updates_response();
314  if (!response.has_client_command())
315    return;
316  const ClientCommand& command = response.client_command();
317
318  // The server limits the number of items a client can commit in one batch.
319  if (command.has_max_commit_batch_size())
320    max_commit_batch_size_ = command.max_commit_batch_size();
321  if (command.has_set_sync_long_poll_interval()) {
322    session->delegate()->OnReceivedLongPollIntervalUpdate(
323        TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
324  }
325  if (command.has_set_sync_poll_interval()) {
326    session->delegate()->OnReceivedShortPollIntervalUpdate(
327        TimeDelta::FromSeconds(command.set_sync_poll_interval()));
328  }
329}
330
331void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
332  dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
333  dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
334  dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
335  dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
336  dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
337  dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
338  dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
339  dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
340  dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
341  dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
342}
343
344void ClearServerData(syncable::MutableEntry* entry) {
345  entry->Put(SERVER_NON_UNIQUE_NAME, "");
346  entry->Put(SERVER_PARENT_ID, syncable::kNullId);
347  entry->Put(SERVER_MTIME, 0);
348  entry->Put(SERVER_CTIME, 0);
349  entry->Put(SERVER_VERSION, 0);
350  entry->Put(SERVER_IS_DIR, false);
351  entry->Put(SERVER_IS_DEL, false);
352  entry->Put(IS_UNAPPLIED_UPDATE, false);
353  entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
354  entry->Put(SERVER_POSITION_IN_PARENT, 0);
355}
356
357}  // namespace browser_sync
358