syncer.cc revision 21d179b334e59e9a3bfcaed4c4430bef1bc5759d
1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync/engine/syncer.h" 6 7#include "base/message_loop.h" 8#include "base/time.h" 9#include "chrome/browser/sync/engine/apply_updates_command.h" 10#include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h" 11#include "chrome/browser/sync/engine/build_commit_command.h" 12#include "chrome/browser/sync/engine/cleanup_disabled_types_command.h" 13#include "chrome/browser/sync/engine/clear_data_command.h" 14#include "chrome/browser/sync/engine/conflict_resolver.h" 15#include "chrome/browser/sync/engine/download_updates_command.h" 16#include "chrome/browser/sync/engine/get_commit_ids_command.h" 17#include "chrome/browser/sync/engine/net/server_connection_manager.h" 18#include "chrome/browser/sync/engine/post_commit_message_command.h" 19#include "chrome/browser/sync/engine/process_commit_response_command.h" 20#include "chrome/browser/sync/engine/process_updates_command.h" 21#include "chrome/browser/sync/engine/resolve_conflicts_command.h" 22#include "chrome/browser/sync/engine/store_timestamps_command.h" 23#include "chrome/browser/sync/engine/syncer_end_command.h" 24#include "chrome/browser/sync/engine/syncer_types.h" 25#include "chrome/browser/sync/engine/syncer_util.h" 26#include "chrome/browser/sync/engine/syncproto.h" 27#include "chrome/browser/sync/engine/verify_updates_command.h" 28#include "chrome/browser/sync/syncable/directory_manager.h" 29#include "chrome/browser/sync/syncable/syncable-inl.h" 30#include "chrome/browser/sync/syncable/syncable.h" 31 32using base::TimeDelta; 33using sync_pb::ClientCommand; 34using syncable::Blob; 35using syncable::IS_UNAPPLIED_UPDATE; 36using syncable::SERVER_CTIME; 37using syncable::SERVER_IS_DEL; 38using syncable::SERVER_IS_DIR; 39using syncable::SERVER_MTIME; 40using syncable::SERVER_NON_UNIQUE_NAME; 41using syncable::SERVER_PARENT_ID; 42using syncable::SERVER_POSITION_IN_PARENT; 43using syncable::SERVER_SPECIFICS; 44using syncable::SERVER_VERSION; 45using syncable::SYNCER; 46using syncable::ScopedDirLookup; 47using syncable::WriteTransaction; 48 49namespace browser_sync { 50 51using sessions::ScopedSessionContextConflictResolver; 52using sessions::StatusController; 53using sessions::SyncSession; 54using sessions::ConflictProgress; 55 56Syncer::Syncer() 57 : early_exit_requested_(false), 58 max_commit_batch_size_(kDefaultMaxCommitBatchSize), 59 pre_conflict_resolution_closure_(NULL) { 60} 61 62Syncer::~Syncer() {} 63 64bool Syncer::ExitRequested() { 65 AutoLock lock(early_exit_requested_lock_); 66 return early_exit_requested_; 67} 68 69void Syncer::RequestEarlyExit() { 70 AutoLock lock(early_exit_requested_lock_); 71 early_exit_requested_ = true; 72} 73 74void Syncer::SyncShare(sessions::SyncSession* session) { 75 ScopedDirLookup dir(session->context()->directory_manager(), 76 session->context()->account_name()); 77 // The directory must be good here. 78 CHECK(dir.good()); 79 80 const sessions::SyncSourceInfo& source(session->source()); 81 if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA == source.first) { 82 SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END); 83 return; 84 } else { 85 // This isn't perfect, as we can end up bundling extensions activity 86 // intended for the next session into the current one. We could do a 87 // test-and-reset as with the source, but note that also falls short if 88 // the commit request fails (e.g. due to lost connection), as we will 89 // fall all the way back to the syncer thread main loop in that case, and 90 // wind up creating a new session when a connection is established, losing 91 // the records set here on the original attempt. This should provide us 92 // with the right data "most of the time", and we're only using this for 93 // analysis purposes, so Law of Large Numbers FTW. 94 session->context()->extensions_monitor()->GetAndClearRecords( 95 session->mutable_extensions_activity()); 96 SyncShare(session, SYNCER_BEGIN, SYNCER_END); 97 } 98} 99 100void Syncer::SyncShare(sessions::SyncSession* session, 101 const SyncerStep first_step, 102 const SyncerStep last_step) { 103 ScopedSessionContextConflictResolver scoped(session->context(), 104 &resolver_); 105 SyncerStep current_step = first_step; 106 107 SyncerStep next_step = current_step; 108 while (!ExitRequested()) { 109 switch (current_step) { 110 case SYNCER_BEGIN: 111 VLOG(1) << "Syncer Begin"; 112 next_step = CLEANUP_DISABLED_TYPES; 113 break; 114 case CLEANUP_DISABLED_TYPES: { 115 VLOG(1) << "Cleaning up disabled types"; 116 CleanupDisabledTypesCommand cleanup; 117 cleanup.Execute(session); 118 next_step = DOWNLOAD_UPDATES; 119 break; 120 } 121 case DOWNLOAD_UPDATES: { 122 VLOG(1) << "Downloading Updates"; 123 DownloadUpdatesCommand download_updates; 124 download_updates.Execute(session); 125 next_step = PROCESS_CLIENT_COMMAND; 126 break; 127 } 128 case PROCESS_CLIENT_COMMAND: { 129 VLOG(1) << "Processing Client Command"; 130 ProcessClientCommand(session); 131 next_step = VERIFY_UPDATES; 132 break; 133 } 134 case VERIFY_UPDATES: { 135 VLOG(1) << "Verifying Updates"; 136 VerifyUpdatesCommand verify_updates; 137 verify_updates.Execute(session); 138 next_step = PROCESS_UPDATES; 139 break; 140 } 141 case PROCESS_UPDATES: { 142 VLOG(1) << "Processing Updates"; 143 ProcessUpdatesCommand process_updates; 144 process_updates.Execute(session); 145 next_step = STORE_TIMESTAMPS; 146 break; 147 } 148 case STORE_TIMESTAMPS: { 149 VLOG(1) << "Storing timestamps"; 150 StoreTimestampsCommand store_timestamps; 151 store_timestamps.Execute(session); 152 // We should download all of the updates before attempting to process 153 // them. 154 if (session->status_controller()->ServerSaysNothingMoreToDownload() || 155 !session->status_controller()->download_updates_succeeded()) { 156 next_step = APPLY_UPDATES; 157 } else { 158 next_step = DOWNLOAD_UPDATES; 159 } 160 break; 161 } 162 case APPLY_UPDATES: { 163 VLOG(1) << "Applying Updates"; 164 ApplyUpdatesCommand apply_updates; 165 apply_updates.Execute(session); 166 next_step = BUILD_COMMIT_REQUEST; 167 break; 168 } 169 // These two steps are combined since they are executed within the same 170 // write transaction. 171 case BUILD_COMMIT_REQUEST: { 172 session->status_controller()->set_syncing(true); 173 174 VLOG(1) << "Processing Commit Request"; 175 ScopedDirLookup dir(session->context()->directory_manager(), 176 session->context()->account_name()); 177 if (!dir.good()) { 178 LOG(ERROR) << "Scoped dir lookup failed!"; 179 return; 180 } 181 WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__); 182 sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans); 183 184 VLOG(1) << "Getting the Commit IDs"; 185 GetCommitIdsCommand get_commit_ids_command(max_commit_batch_size_); 186 get_commit_ids_command.Execute(session); 187 188 if (!session->status_controller()->commit_ids().empty()) { 189 VLOG(1) << "Building a commit message"; 190 BuildCommitCommand build_commit_command; 191 build_commit_command.Execute(session); 192 193 next_step = POST_COMMIT_MESSAGE; 194 } else { 195 next_step = BUILD_AND_PROCESS_CONFLICT_SETS; 196 } 197 198 break; 199 } 200 case POST_COMMIT_MESSAGE: { 201 VLOG(1) << "Posting a commit request"; 202 PostCommitMessageCommand post_commit_command; 203 post_commit_command.Execute(session); 204 next_step = PROCESS_COMMIT_RESPONSE; 205 break; 206 } 207 case PROCESS_COMMIT_RESPONSE: { 208 VLOG(1) << "Processing the commit response"; 209 session->status_controller()->reset_num_conflicting_commits(); 210 ProcessCommitResponseCommand process_response_command; 211 process_response_command.Execute(session); 212 next_step = BUILD_AND_PROCESS_CONFLICT_SETS; 213 break; 214 } 215 case BUILD_AND_PROCESS_CONFLICT_SETS: { 216 VLOG(1) << "Building and Processing Conflict Sets"; 217 BuildAndProcessConflictSetsCommand build_process_conflict_sets; 218 build_process_conflict_sets.Execute(session); 219 if (session->status_controller()->conflict_sets_built()) 220 next_step = SYNCER_END; 221 else 222 next_step = RESOLVE_CONFLICTS; 223 break; 224 } 225 case RESOLVE_CONFLICTS: { 226 VLOG(1) << "Resolving Conflicts"; 227 228 // Trigger the pre_conflict_resolution_closure_, which is a testing 229 // hook for the unit tests, if it is non-NULL. 230 if (pre_conflict_resolution_closure_) { 231 pre_conflict_resolution_closure_->Run(); 232 } 233 234 StatusController* status = session->status_controller(); 235 status->reset_conflicts_resolved(); 236 ResolveConflictsCommand resolve_conflicts_command; 237 resolve_conflicts_command.Execute(session); 238 if (status->HasConflictingUpdates()) 239 next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS; 240 else 241 next_step = SYNCER_END; 242 break; 243 } 244 case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: { 245 StatusController* status = session->status_controller(); 246 VLOG(1) << "Applying updates to resolve conflicts"; 247 ApplyUpdatesCommand apply_updates; 248 int before_conflicting_updates = status->TotalNumConflictingItems(); 249 apply_updates.Execute(session); 250 int after_conflicting_updates = status->TotalNumConflictingItems(); 251 status->update_conflicts_resolved(before_conflicting_updates > 252 after_conflicting_updates); 253 if (status->conflicts_resolved()) 254 next_step = RESOLVE_CONFLICTS; 255 else 256 next_step = SYNCER_END; 257 break; 258 } 259 case CLEAR_PRIVATE_DATA: { 260 VLOG(1) << "Clear Private Data"; 261 ClearDataCommand clear_data_command; 262 clear_data_command.Execute(session); 263 next_step = SYNCER_END; 264 } 265 case SYNCER_END: { 266 VLOG(1) << "Syncer End"; 267 SyncerEndCommand syncer_end_command; 268 // This will set "syncing" to false, and send out a notification. 269 syncer_end_command.Execute(session); 270 goto post_while; 271 } 272 default: 273 LOG(ERROR) << "Unknown command: " << current_step; 274 } 275 if (last_step == current_step) 276 break; 277 current_step = next_step; 278 } 279 post_while: 280 return; 281} 282 283void Syncer::ProcessClientCommand(sessions::SyncSession* session) { 284 const ClientToServerResponse& response = 285 session->status_controller()->updates_response(); 286 if (!response.has_client_command()) 287 return; 288 const ClientCommand& command = response.client_command(); 289 290 // The server limits the number of items a client can commit in one batch. 291 if (command.has_max_commit_batch_size()) 292 max_commit_batch_size_ = command.max_commit_batch_size(); 293 if (command.has_set_sync_long_poll_interval()) { 294 session->delegate()->OnReceivedLongPollIntervalUpdate( 295 TimeDelta::FromSeconds(command.set_sync_long_poll_interval())); 296 } 297 if (command.has_set_sync_poll_interval()) { 298 session->delegate()->OnReceivedShortPollIntervalUpdate( 299 TimeDelta::FromSeconds(command.set_sync_poll_interval())); 300 } 301} 302 303void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) { 304 dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME)); 305 dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID)); 306 dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME)); 307 dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME)); 308 dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION)); 309 dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR)); 310 dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL)); 311 dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE)); 312 dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS)); 313 dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT)); 314} 315 316void ClearServerData(syncable::MutableEntry* entry) { 317 entry->Put(SERVER_NON_UNIQUE_NAME, ""); 318 entry->Put(SERVER_PARENT_ID, syncable::kNullId); 319 entry->Put(SERVER_MTIME, 0); 320 entry->Put(SERVER_CTIME, 0); 321 entry->Put(SERVER_VERSION, 0); 322 entry->Put(SERVER_IS_DIR, false); 323 entry->Put(SERVER_IS_DEL, false); 324 entry->Put(IS_UNAPPLIED_UPDATE, false); 325 entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance()); 326 entry->Put(SERVER_POSITION_IN_PARENT, 0); 327} 328 329} // namespace browser_sync 330