syncer.cc revision 3345a6884c488ff3a535c2c9acdd33d74b37e311
1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync/engine/syncer.h" 6 7#include "base/message_loop.h" 8#include "base/time.h" 9#include "chrome/browser/chrome_thread.h" 10#include "chrome/browser/sync/engine/apply_updates_command.h" 11#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h" 12#include "chrome/browser/sync/engine/build_commit_command.h" 13#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h" 14#include "chrome/browser/sync/engine/clear_data_command.h" 15#include "chrome/browser/sync/engine/conflict_resolver.h" 16#include "chrome/browser/sync/engine/download_updates_command.h" 17#include "chrome/browser/sync/engine/get_commit_ids_command.h" 18#include "chrome/browser/sync/engine/net/server_connection_manager.h" 19#include "chrome/browser/sync/engine/post_commit_message_command.h" 20#include "chrome/browser/sync/engine/process_commit_response_command.h" 21#include "chrome/browser/sync/engine/process_updates_command.h" 22#include "chrome/browser/sync/engine/resolve_conflicts_command.h" 23#include "chrome/browser/sync/engine/store_timestamps_command.h" 24#include "chrome/browser/sync/engine/syncer_end_command.h" 25#include "chrome/browser/sync/engine/syncer_types.h" 26#include "chrome/browser/sync/engine/syncer_util.h" 27#include "chrome/browser/sync/engine/syncproto.h" 28#include "chrome/browser/sync/engine/verify_updates_command.h" 29#include "chrome/browser/sync/syncable/directory_manager.h" 30#include "chrome/browser/sync/syncable/syncable-inl.h" 31#include "chrome/browser/sync/syncable/syncable.h" 32 33using base::TimeDelta; 34using sync_pb::ClientCommand; 35using syncable::Blob; 36using syncable::IS_UNAPPLIED_UPDATE; 37using syncable::SERVER_CTIME; 38using syncable::SERVER_IS_DEL; 39using syncable::SERVER_IS_DIR; 40using syncable::SERVER_MTIME; 41using syncable::SERVER_NON_UNIQUE_NAME; 42using syncable::SERVER_PARENT_ID; 43using syncable::SERVER_POSITION_IN_PARENT; 44using syncable::SERVER_SPECIFICS; 45using syncable::SERVER_VERSION; 46using syncable::SYNCER; 47using syncable::ScopedDirLookup; 48using syncable::WriteTransaction; 49 50namespace browser_sync { 51 52using sessions::StatusController; 53using sessions::SyncSession; 54using sessions::ConflictProgress; 55 56Syncer::Syncer(sessions::SyncSessionContext* context) 57 : early_exit_requested_(false), 58 max_commit_batch_size_(kDefaultMaxCommitBatchSize), 59 syncer_event_channel_(new SyncerEventChannel()), 60 resolver_scoper_(context, &resolver_), 61 event_channel_scoper_(context, syncer_event_channel_.get()), 62 context_(context), 63 updates_source_(sync_pb::GetUpdatesCallerInfo::UNKNOWN), 64 pre_conflict_resolution_closure_(NULL) { 65 shutdown_channel_.reset(new ShutdownChannel()); 66 67 ScopedDirLookup dir(context->directory_manager(), context->account_name()); 68 // The directory must be good here. 69 CHECK(dir.good()); 70} 71 72Syncer::~Syncer() { 73 syncer_event_channel_->Notify( 74 SyncerEvent(SyncerEvent::SHUTDOWN_USE_WITH_CARE)); 75 shutdown_channel_->Notify(SyncerShutdownEvent(this)); 76} 77 78bool Syncer::ExitRequested() { 79 AutoLock lock(early_exit_requested_lock_); 80 return early_exit_requested_; 81} 82 83void Syncer::RequestEarlyExit() { 84 AutoLock lock(early_exit_requested_lock_); 85 early_exit_requested_ = true; 86} 87 88void Syncer::RequestNudge(int milliseconds) { 89 SyncerEvent event(SyncerEvent::REQUEST_SYNC_NUDGE); 90 event.nudge_delay_milliseconds = milliseconds; 91 syncer_event_channel_->Notify(event); 92} 93 94bool Syncer::SyncShare(sessions::SyncSession::Delegate* delegate) { 95 sessions::SyncSession session(context_, delegate); 96 return SyncShare(&session); 97} 98 99bool Syncer::SyncShare(sessions::SyncSession* session) { 100 sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source = 101 TestAndSetUpdatesSource(); 102 session->set_source(source); 103 if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA == source) { 104 SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END); 105 return false; 106 } else { 107 // This isn't perfect, as we can end up bundling extensions activity 108 // intended for the next session into the current one. We could do a 109 // test-and-reset as with the source, but note that also falls short if 110 // the commit request fails (e.g. due to lost connection), as we will 111 // fall all the way back to the syncer thread main loop in that case, and 112 // wind up creating a new session when a connection is established, losing 113 // the records set here on the original attempt. This should provide us 114 // with the right data "most of the time", and we're only using this for 115 // analysis purposes, so Law of Large Numbers FTW. 116 context_->extensions_monitor()->GetAndClearRecords( 117 session->mutable_extensions_activity()); 118 SyncShare(session, SYNCER_BEGIN, SYNCER_END); 119 return session->HasMoreToSync(); 120 } 121} 122 123bool Syncer::SyncShare(SyncerStep first_step, SyncerStep last_step, 124 sessions::SyncSession::Delegate* delegate) { 125 sessions::SyncSession session(context_, delegate); 126 SyncShare(&session, first_step, last_step); 127 return session.HasMoreToSync(); 128} 129 130void Syncer::SyncShare(sessions::SyncSession* session, 131 const SyncerStep first_step, 132 const SyncerStep last_step) { 133 SyncerStep current_step = first_step; 134 135 SyncerStep next_step = current_step; 136 while (!ExitRequested()) { 137 switch (current_step) { 138 case SYNCER_BEGIN: 139 LOG(INFO) << "Syncer Begin"; 140 next_step = CLEANUP_DISABLED_TYPES; 141 break; 142 case CLEANUP_DISABLED_TYPES: { 143 LOG(INFO) << "Cleaning up disabled types"; 144 CleanupDisabledTypesCommand cleanup; 145 cleanup.Execute(session); 146 next_step = DOWNLOAD_UPDATES; 147 break; 148 } 149 case DOWNLOAD_UPDATES: { 150 LOG(INFO) << "Downloading Updates"; 151 DownloadUpdatesCommand download_updates; 152 download_updates.Execute(session); 153 next_step = PROCESS_CLIENT_COMMAND; 154 break; 155 } 156 case PROCESS_CLIENT_COMMAND: { 157 LOG(INFO) << "Processing Client Command"; 158 ProcessClientCommand(session); 159 next_step = VERIFY_UPDATES; 160 break; 161 } 162 case VERIFY_UPDATES: { 163 LOG(INFO) << "Verifying Updates"; 164 VerifyUpdatesCommand verify_updates; 165 verify_updates.Execute(session); 166 next_step = PROCESS_UPDATES; 167 break; 168 } 169 case PROCESS_UPDATES: { 170 LOG(INFO) << "Processing Updates"; 171 ProcessUpdatesCommand process_updates; 172 process_updates.Execute(session); 173 next_step = STORE_TIMESTAMPS; 174 break; 175 } 176 case STORE_TIMESTAMPS: { 177 LOG(INFO) << "Storing timestamps"; 178 StoreTimestampsCommand store_timestamps; 179 store_timestamps.Execute(session); 180 // We should download all of the updates before attempting to process 181 // them. 182 if (session->status_controller()->ServerSaysNothingMoreToDownload() || 183 !session->status_controller()->download_updates_succeeded()) { 184 next_step = APPLY_UPDATES; 185 } else { 186 next_step = DOWNLOAD_UPDATES; 187 } 188 break; 189 } 190 case APPLY_UPDATES: { 191 LOG(INFO) << "Applying Updates"; 192 ApplyUpdatesCommand apply_updates; 193 apply_updates.Execute(session); 194 next_step = BUILD_COMMIT_REQUEST; 195 break; 196 } 197 // These two steps are combined since they are executed within the same 198 // write transaction. 199 case BUILD_COMMIT_REQUEST: { 200 session->status_controller()->set_syncing(true); 201 202 LOG(INFO) << "Processing Commit Request"; 203 ScopedDirLookup dir(context_->directory_manager(), 204 context_->account_name()); 205 if (!dir.good()) { 206 LOG(ERROR) << "Scoped dir lookup failed!"; 207 return; 208 } 209 WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__); 210 sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans); 211 212 LOG(INFO) << "Getting the Commit IDs"; 213 GetCommitIdsCommand get_commit_ids_command(max_commit_batch_size_); 214 get_commit_ids_command.Execute(session); 215 216 if (!session->status_controller()->commit_ids().empty()) { 217 LOG(INFO) << "Building a commit message"; 218 BuildCommitCommand build_commit_command; 219 build_commit_command.Execute(session); 220 221 next_step = POST_COMMIT_MESSAGE; 222 } else { 223 next_step = BUILD_AND_PROCESS_CONFLICT_SETS; 224 } 225 226 break; 227 } 228 case POST_COMMIT_MESSAGE: { 229 LOG(INFO) << "Posting a commit request"; 230 PostCommitMessageCommand post_commit_command; 231 post_commit_command.Execute(session); 232 next_step = PROCESS_COMMIT_RESPONSE; 233 break; 234 } 235 case PROCESS_COMMIT_RESPONSE: { 236 LOG(INFO) << "Processing the commit response"; 237 session->status_controller()->reset_num_conflicting_commits(); 238 ProcessCommitResponseCommand process_response_command; 239 process_response_command.Execute(session); 240 next_step = BUILD_AND_PROCESS_CONFLICT_SETS; 241 break; 242 } 243 case BUILD_AND_PROCESS_CONFLICT_SETS: { 244 LOG(INFO) << "Building and Processing Conflict Sets"; 245 BuildAndProcessConflictSetsCommand build_process_conflict_sets; 246 build_process_conflict_sets.Execute(session); 247 if (session->status_controller()->conflict_sets_built()) 248 next_step = SYNCER_END; 249 else 250 next_step = RESOLVE_CONFLICTS; 251 break; 252 } 253 case RESOLVE_CONFLICTS: { 254 LOG(INFO) << "Resolving Conflicts"; 255 256 // Trigger the pre_conflict_resolution_closure_, which is a testing 257 // hook for the unit tests, if it is non-NULL. 258 if (pre_conflict_resolution_closure_) { 259 pre_conflict_resolution_closure_->Run(); 260 } 261 262 StatusController* status = session->status_controller(); 263 status->reset_conflicts_resolved(); 264 ResolveConflictsCommand resolve_conflicts_command; 265 resolve_conflicts_command.Execute(session); 266 if (status->HasConflictingUpdates()) 267 next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS; 268 else 269 next_step = SYNCER_END; 270 break; 271 } 272 case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: { 273 StatusController* status = session->status_controller(); 274 LOG(INFO) << "Applying updates to resolve conflicts"; 275 ApplyUpdatesCommand apply_updates; 276 int before_conflicting_updates = status->TotalNumConflictingItems(); 277 apply_updates.Execute(session); 278 int after_conflicting_updates = status->TotalNumConflictingItems(); 279 status->update_conflicts_resolved(before_conflicting_updates > 280 after_conflicting_updates); 281 if (status->conflicts_resolved()) 282 next_step = RESOLVE_CONFLICTS; 283 else 284 next_step = SYNCER_END; 285 break; 286 } 287 case CLEAR_PRIVATE_DATA: { 288 LOG(INFO) << "Clear Private Data"; 289 ClearDataCommand clear_data_command; 290 clear_data_command.Execute(session); 291 next_step = SYNCER_END; 292 } 293 case SYNCER_END: { 294 LOG(INFO) << "Syncer End"; 295 SyncerEndCommand syncer_end_command; 296 // This will set "syncing" to false, and send out a notification. 297 syncer_end_command.Execute(session); 298 goto post_while; 299 } 300 default: 301 LOG(ERROR) << "Unknown command: " << current_step; 302 } 303 if (last_step == current_step) 304 break; 305 current_step = next_step; 306 } 307 post_while: 308 return; 309} 310 311void Syncer::ProcessClientCommand(sessions::SyncSession* session) { 312 const ClientToServerResponse& response = 313 session->status_controller()->updates_response(); 314 if (!response.has_client_command()) 315 return; 316 const ClientCommand& command = response.client_command(); 317 318 // The server limits the number of items a client can commit in one batch. 319 if (command.has_max_commit_batch_size()) 320 max_commit_batch_size_ = command.max_commit_batch_size(); 321 if (command.has_set_sync_long_poll_interval()) { 322 session->delegate()->OnReceivedLongPollIntervalUpdate( 323 TimeDelta::FromSeconds(command.set_sync_long_poll_interval())); 324 } 325 if (command.has_set_sync_poll_interval()) { 326 session->delegate()->OnReceivedShortPollIntervalUpdate( 327 TimeDelta::FromSeconds(command.set_sync_poll_interval())); 328 } 329} 330 331void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) { 332 dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME)); 333 dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID)); 334 dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME)); 335 dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME)); 336 dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION)); 337 dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR)); 338 dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL)); 339 dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE)); 340 dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS)); 341 dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT)); 342} 343 344void ClearServerData(syncable::MutableEntry* entry) { 345 entry->Put(SERVER_NON_UNIQUE_NAME, ""); 346 entry->Put(SERVER_PARENT_ID, syncable::kNullId); 347 entry->Put(SERVER_MTIME, 0); 348 entry->Put(SERVER_CTIME, 0); 349 entry->Put(SERVER_VERSION, 0); 350 entry->Put(SERVER_IS_DIR, false); 351 entry->Put(SERVER_IS_DEL, false); 352 entry->Put(IS_UNAPPLIED_UPDATE, false); 353 entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance()); 354 entry->Put(SERVER_POSITION_IN_PARENT, 0); 355} 356 357} // namespace browser_sync 358