1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/engine/sync_scheduler_impl.h"
6
7#include <algorithm>
8#include <cstring>
9
10#include "base/auto_reset.h"
11#include "base/bind.h"
12#include "base/bind_helpers.h"
13#include "base/compiler_specific.h"
14#include "base/location.h"
15#include "base/logging.h"
16#include "base/message_loop/message_loop.h"
17#include "sync/engine/backoff_delay_provider.h"
18#include "sync/engine/syncer.h"
19#include "sync/protocol/proto_enum_conversions.h"
20#include "sync/protocol/sync.pb.h"
21#include "sync/util/data_type_histogram.h"
22#include "sync/util/logging.h"
23
24using base::TimeDelta;
25using base::TimeTicks;
26
27namespace syncer {
28
29using sessions::SyncSession;
30using sessions::SyncSessionSnapshot;
31using sync_pb::GetUpdatesCallerInfo;
32
33namespace {
34
35bool IsConfigRelatedUpdateSourceValue(
36    GetUpdatesCallerInfo::GetUpdatesSource source) {
37  switch (source) {
38    case GetUpdatesCallerInfo::RECONFIGURATION:
39    case GetUpdatesCallerInfo::MIGRATION:
40    case GetUpdatesCallerInfo::NEW_CLIENT:
41    case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
42    case GetUpdatesCallerInfo::PROGRAMMATIC:
43      return true;
44    default:
45      return false;
46  }
47}
48
49bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
50  switch (error.error_type) {
51    case SYNC_SUCCESS:
52    case MIGRATION_DONE:
53    case THROTTLED:
54    case TRANSIENT_ERROR:
55      return false;
56    case NOT_MY_BIRTHDAY:
57    case CLEAR_PENDING:
58    case DISABLED_BY_ADMIN:
59    case USER_ROLLBACK:
60      // If we send terminate sync early then |sync_cycle_ended| notification
61      // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
62      // notification wouldnt be sent either. Then the UI layer would be left
63      // waiting forever. So assert we would send something.
64      DCHECK_NE(error.action, UNKNOWN_ACTION);
65      return true;
66    case INVALID_CREDENTIAL:
67      // The notification for this is handled by PostAndProcessHeaders|.
68      // Server does no have to send any action for this.
69      return true;
70    // Make the default a NOTREACHED. So if a new error is introduced we
71    // think about its expected functionality.
72    default:
73      NOTREACHED();
74      return false;
75  }
76}
77
78bool IsActionableError(
79    const SyncProtocolError& error) {
80  return (error.action != UNKNOWN_ACTION);
81}
82
83}  // namespace
84
85ConfigurationParams::ConfigurationParams()
86    : source(GetUpdatesCallerInfo::UNKNOWN) {}
87ConfigurationParams::ConfigurationParams(
88    const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
89    ModelTypeSet types_to_download,
90    const ModelSafeRoutingInfo& routing_info,
91    const base::Closure& ready_task,
92    const base::Closure& retry_task)
93    : source(source),
94      types_to_download(types_to_download),
95      routing_info(routing_info),
96      ready_task(ready_task),
97      retry_task(retry_task) {
98  DCHECK(!ready_task.is_null());
99  DCHECK(!retry_task.is_null());
100}
101ConfigurationParams::~ConfigurationParams() {}
102
103SyncSchedulerImpl::WaitInterval::WaitInterval()
104    : mode(UNKNOWN) {}
105
106SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
107    : mode(mode), length(length) {}
108
109SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
110
111#define ENUM_CASE(x) case x: return #x; break;
112
113const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
114  switch (mode) {
115    ENUM_CASE(UNKNOWN);
116    ENUM_CASE(EXPONENTIAL_BACKOFF);
117    ENUM_CASE(THROTTLED);
118  }
119  NOTREACHED();
120  return "";
121}
122
123GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
124    NudgeSource source) {
125  switch (source) {
126    case NUDGE_SOURCE_NOTIFICATION:
127      return GetUpdatesCallerInfo::NOTIFICATION;
128    case NUDGE_SOURCE_LOCAL:
129      return GetUpdatesCallerInfo::LOCAL;
130    case NUDGE_SOURCE_LOCAL_REFRESH:
131      return GetUpdatesCallerInfo::DATATYPE_REFRESH;
132    case NUDGE_SOURCE_UNKNOWN:
133      return GetUpdatesCallerInfo::UNKNOWN;
134    default:
135      NOTREACHED();
136      return GetUpdatesCallerInfo::UNKNOWN;
137  }
138}
139
140// Helper macros to log with the syncer thread name; useful when there
141// are multiple syncer threads involved.
142
143#define SLOG(severity) LOG(severity) << name_ << ": "
144
145#define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
146
147#define SDVLOG_LOC(from_here, verbose_level)             \
148  DVLOG_LOC(from_here, verbose_level) << name_ << ": "
149
150SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
151                                     BackoffDelayProvider* delay_provider,
152                                     sessions::SyncSessionContext* context,
153                                     Syncer* syncer)
154    : name_(name),
155      started_(false),
156      syncer_short_poll_interval_seconds_(
157          TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
158      syncer_long_poll_interval_seconds_(
159          TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
160      mode_(NORMAL_MODE),
161      delay_provider_(delay_provider),
162      syncer_(syncer),
163      session_context_(context),
164      no_scheduling_allowed_(false),
165      do_poll_after_credentials_updated_(false),
166      next_sync_session_job_priority_(NORMAL_PRIORITY),
167      weak_ptr_factory_(this),
168      weak_ptr_factory_for_weak_handle_(this) {
169  weak_handle_this_ = MakeWeakHandle(
170      weak_ptr_factory_for_weak_handle_.GetWeakPtr());
171}
172
173SyncSchedulerImpl::~SyncSchedulerImpl() {
174  DCHECK(CalledOnValidThread());
175  Stop();
176}
177
178void SyncSchedulerImpl::OnCredentialsUpdated() {
179  DCHECK(CalledOnValidThread());
180
181  if (HttpResponse::SYNC_AUTH_ERROR ==
182      session_context_->connection_manager()->server_status()) {
183    OnServerConnectionErrorFixed();
184  }
185}
186
187void SyncSchedulerImpl::OnConnectionStatusChange() {
188  if (HttpResponse::CONNECTION_UNAVAILABLE  ==
189      session_context_->connection_manager()->server_status()) {
190    // Optimistically assume that the connection is fixed and try
191    // connecting.
192    OnServerConnectionErrorFixed();
193  }
194}
195
196void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
197  // There could be a pending nudge or configuration job in several cases:
198  //
199  // 1. We're in exponential backoff.
200  // 2. We're silenced / throttled.
201  // 3. A nudge was saved previously due to not having a valid auth token.
202  // 4. A nudge was scheduled + saved while in configuration mode.
203  //
204  // In all cases except (2), we want to retry contacting the server. We
205  // call TryCanaryJob to achieve this, and note that nothing -- not even a
206  // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
207  // has the authority to do that is the Unthrottle timer.
208  TryCanaryJob();
209}
210
211void SyncSchedulerImpl::Start(Mode mode) {
212  DCHECK(CalledOnValidThread());
213  std::string thread_name = base::MessageLoop::current()->thread_name();
214  if (thread_name.empty())
215    thread_name = "<Main thread>";
216  SDVLOG(2) << "Start called from thread "
217            << thread_name << " with mode " << GetModeString(mode);
218  if (!started_) {
219    started_ = true;
220    SendInitialSnapshot();
221  }
222
223  DCHECK(!session_context_->account_name().empty());
224  DCHECK(syncer_.get());
225  Mode old_mode = mode_;
226  mode_ = mode;
227  AdjustPolling(UPDATE_INTERVAL);  // Will kick start poll timer if needed.
228
229  if (old_mode != mode_ && mode_ == NORMAL_MODE) {
230    // We just got back to normal mode.  Let's try to run the work that was
231    // queued up while we were configuring.
232
233    // Update our current time before checking IsRetryRequired().
234    nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
235    if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
236      TrySyncSessionJob();
237    }
238  }
239}
240
241ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
242  ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
243  ModelTypeSet enabled_protocol_types =
244      Intersection(ProtocolTypes(), enabled_types);
245  ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
246  return Difference(enabled_protocol_types, throttled_types);
247}
248
249void SyncSchedulerImpl::SendInitialSnapshot() {
250  DCHECK(CalledOnValidThread());
251  scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
252  SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
253  event.snapshot = dummy->TakeSnapshot();
254  FOR_EACH_OBSERVER(SyncEngineEventListener,
255                    *session_context_->listeners(),
256                    OnSyncCycleEvent(event));
257}
258
259namespace {
260
261// Helper to extract the routing info corresponding to types in
262// |types_to_download| from |current_routes|.
263void BuildModelSafeParams(
264    ModelTypeSet types_to_download,
265    const ModelSafeRoutingInfo& current_routes,
266    ModelSafeRoutingInfo* result_routes) {
267  for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
268       iter.Inc()) {
269    ModelType type = iter.Get();
270    ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
271    DCHECK(route != current_routes.end());
272    ModelSafeGroup group = route->second;
273    (*result_routes)[type] = group;
274  }
275}
276
277}  // namespace.
278
279void SyncSchedulerImpl::ScheduleConfiguration(
280    const ConfigurationParams& params) {
281  DCHECK(CalledOnValidThread());
282  DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
283  DCHECK_EQ(CONFIGURATION_MODE, mode_);
284  DCHECK(!params.ready_task.is_null());
285  CHECK(started_) << "Scheduler must be running to configure.";
286  SDVLOG(2) << "Reconfiguring syncer.";
287
288  // Only one configuration is allowed at a time. Verify we're not waiting
289  // for a pending configure job.
290  DCHECK(!pending_configure_params_);
291
292  ModelSafeRoutingInfo restricted_routes;
293  BuildModelSafeParams(params.types_to_download,
294                       params.routing_info,
295                       &restricted_routes);
296  session_context_->SetRoutingInfo(restricted_routes);
297
298  // Only reconfigure if we have types to download.
299  if (!params.types_to_download.Empty()) {
300    pending_configure_params_.reset(new ConfigurationParams(params));
301    TrySyncSessionJob();
302  } else {
303    SDVLOG(2) << "No change in routing info, calling ready task directly.";
304    params.ready_task.Run();
305  }
306}
307
308bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
309  DCHECK(CalledOnValidThread());
310  if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
311    SDVLOG(1) << "Unable to run a job because we're throttled.";
312    return false;
313  }
314
315  if (wait_interval_
316      && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
317      && priority != CANARY_PRIORITY) {
318    SDVLOG(1) << "Unable to run a job because we're backing off.";
319    return false;
320  }
321
322  if (session_context_->connection_manager()->HasInvalidAuthToken()) {
323    SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
324    return false;
325  }
326
327  return true;
328}
329
330bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
331  DCHECK(CalledOnValidThread());
332
333  if (!CanRunJobNow(priority)) {
334    SDVLOG(1) << "Unable to run a nudge job right now";
335    return false;
336  }
337
338  const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
339  if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
340    SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
341    return false;
342  }
343
344  if (mode_ == CONFIGURATION_MODE) {
345    SDVLOG(1) << "Not running nudge because we're in configuration mode.";
346    return false;
347  }
348
349  return true;
350}
351
352void SyncSchedulerImpl::ScheduleLocalNudge(
353    ModelTypeSet types,
354    const tracked_objects::Location& nudge_location) {
355  DCHECK(CalledOnValidThread());
356  DCHECK(!types.Empty());
357
358  SDVLOG_LOC(nudge_location, 2)
359      << "Scheduling sync because of local change to "
360      << ModelTypeSetToString(types);
361  UpdateNudgeTimeRecords(types);
362  base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
363  ScheduleNudgeImpl(nudge_delay, nudge_location);
364}
365
366void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
367    ModelTypeSet types,
368    const tracked_objects::Location& nudge_location) {
369  DCHECK(CalledOnValidThread());
370  DCHECK(!types.Empty());
371
372  SDVLOG_LOC(nudge_location, 2)
373      << "Scheduling sync because of local refresh request for "
374      << ModelTypeSetToString(types);
375  base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
376  ScheduleNudgeImpl(nudge_delay, nudge_location);
377}
378
379void SyncSchedulerImpl::ScheduleInvalidationNudge(
380    syncer::ModelType model_type,
381    scoped_ptr<InvalidationInterface> invalidation,
382    const tracked_objects::Location& nudge_location) {
383  DCHECK(CalledOnValidThread());
384
385  SDVLOG_LOC(nudge_location, 2)
386      << "Scheduling sync because we received invalidation for "
387      << ModelTypeToString(model_type);
388  base::TimeDelta nudge_delay =
389      nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
390  ScheduleNudgeImpl(nudge_delay, nudge_location);
391}
392
393void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
394  DCHECK(CalledOnValidThread());
395
396  SDVLOG(2) << "Scheduling non-blocking initial sync for "
397            << ModelTypeToString(model_type);
398  nudge_tracker_.RecordInitialSyncRequired(model_type);
399  ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
400}
401
402// TODO(zea): Consider adding separate throttling/backoff for datatype
403// refresh requests.
404void SyncSchedulerImpl::ScheduleNudgeImpl(
405    const TimeDelta& delay,
406    const tracked_objects::Location& nudge_location) {
407  DCHECK(CalledOnValidThread());
408
409  if (no_scheduling_allowed_) {
410    NOTREACHED() << "Illegal to schedule job while session in progress.";
411    return;
412  }
413
414  if (!started_) {
415    SDVLOG_LOC(nudge_location, 2)
416        << "Dropping nudge, scheduler is not running.";
417    return;
418  }
419
420  SDVLOG_LOC(nudge_location, 2)
421      << "In ScheduleNudgeImpl with delay "
422      << delay.InMilliseconds() << " ms";
423
424  if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
425    return;
426
427  TimeTicks incoming_run_time = TimeTicks::Now() + delay;
428  if (!scheduled_nudge_time_.is_null() &&
429    (scheduled_nudge_time_ < incoming_run_time)) {
430    // Old job arrives sooner than this one.  Don't reschedule it.
431    return;
432  }
433
434  // Either there is no existing nudge in flight or the incoming nudge should be
435  // made to arrive first (preempt) the existing nudge.  We reschedule in either
436  // case.
437  SDVLOG_LOC(nudge_location, 2)
438      << "Scheduling a nudge with "
439      << delay.InMilliseconds() << " ms delay";
440  scheduled_nudge_time_ = incoming_run_time;
441  pending_wakeup_timer_.Start(
442      nudge_location,
443      delay,
444      base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
445                 weak_ptr_factory_.GetWeakPtr()));
446}
447
448const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
449  switch (mode) {
450    ENUM_CASE(CONFIGURATION_MODE);
451    ENUM_CASE(NORMAL_MODE);
452  }
453  return "";
454}
455
456void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
457  DCHECK(CalledOnValidThread());
458  nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
459}
460
461void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
462  DCHECK(CalledOnValidThread());
463  DCHECK(CanRunNudgeJobNow(priority));
464
465  DVLOG(2) << "Will run normal mode sync cycle with types "
466           << ModelTypeSetToString(session_context_->GetEnabledTypes());
467  scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
468  bool premature_exit = !syncer_->NormalSyncShare(
469      GetEnabledAndUnthrottledTypes(),
470      nudge_tracker_,
471      session.get());
472  AdjustPolling(FORCE_RESET);
473  // Don't run poll job till the next time poll timer fires.
474  do_poll_after_credentials_updated_ = false;
475
476  bool success = !premature_exit
477      && !sessions::HasSyncerError(
478          session->status_controller().model_neutral_state());
479
480  if (success) {
481    // That cycle took care of any outstanding work we had.
482    SDVLOG(2) << "Nudge succeeded.";
483    nudge_tracker_.RecordSuccessfulSyncCycle();
484    scheduled_nudge_time_ = base::TimeTicks();
485
486    // If we're here, then we successfully reached the server.  End all backoff.
487    wait_interval_.reset();
488    NotifyRetryTime(base::Time());
489  } else {
490    HandleFailure(session->status_controller().model_neutral_state());
491  }
492}
493
494void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
495  DCHECK(CalledOnValidThread());
496  DCHECK_EQ(mode_, CONFIGURATION_MODE);
497  DCHECK(pending_configure_params_ != NULL);
498
499  if (!CanRunJobNow(priority)) {
500    SDVLOG(2) << "Unable to run configure job right now.";
501    if (!pending_configure_params_->retry_task.is_null()) {
502      pending_configure_params_->retry_task.Run();
503      pending_configure_params_->retry_task.Reset();
504    }
505    return;
506  }
507
508  SDVLOG(2) << "Will run configure SyncShare with types "
509            << ModelTypeSetToString(session_context_->GetEnabledTypes());
510  scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
511  bool premature_exit = !syncer_->ConfigureSyncShare(
512      pending_configure_params_->types_to_download,
513      pending_configure_params_->source,
514      session.get());
515  AdjustPolling(FORCE_RESET);
516  // Don't run poll job till the next time poll timer fires.
517  do_poll_after_credentials_updated_ = false;
518
519  bool success = !premature_exit
520      && !sessions::HasSyncerError(
521          session->status_controller().model_neutral_state());
522
523  if (success) {
524    SDVLOG(2) << "Configure succeeded.";
525    pending_configure_params_->ready_task.Run();
526    pending_configure_params_.reset();
527
528    // If we're here, then we successfully reached the server.  End all backoff.
529    wait_interval_.reset();
530    NotifyRetryTime(base::Time());
531  } else {
532    HandleFailure(session->status_controller().model_neutral_state());
533    // Sync cycle might receive response from server that causes scheduler to
534    // stop and draws pending_configure_params_ invalid.
535    if (started_ && !pending_configure_params_->retry_task.is_null()) {
536      pending_configure_params_->retry_task.Run();
537      pending_configure_params_->retry_task.Reset();
538    }
539  }
540}
541
542void SyncSchedulerImpl::HandleFailure(
543    const sessions::ModelNeutralState& model_neutral_state) {
544  if (IsCurrentlyThrottled()) {
545    SDVLOG(2) << "Was throttled during previous sync cycle.";
546    RestartWaiting();
547  } else if (!IsBackingOff()) {
548    // Setup our backoff if this is our first such failure.
549    TimeDelta length = delay_provider_->GetDelay(
550        delay_provider_->GetInitialDelay(model_neutral_state));
551    wait_interval_.reset(
552        new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
553    SDVLOG(2) << "Sync cycle failed.  Will back off for "
554        << wait_interval_->length.InMilliseconds() << "ms.";
555    RestartWaiting();
556  }
557}
558
559void SyncSchedulerImpl::DoPollSyncSessionJob() {
560  base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
561
562  SDVLOG(2) << "Polling with types "
563            << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
564  scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
565  syncer_->PollSyncShare(
566      GetEnabledAndUnthrottledTypes(),
567      session.get());
568
569  AdjustPolling(FORCE_RESET);
570
571  if (IsCurrentlyThrottled()) {
572    SDVLOG(2) << "Poll request got us throttled.";
573    // The OnSilencedUntil() call set up the WaitInterval for us.  All we need
574    // to do is start the timer.
575    RestartWaiting();
576  }
577}
578
579void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
580  DCHECK(CalledOnValidThread());
581  base::TimeTicks now = TimeTicks::Now();
582  // Update timing information for how often datatypes are triggering nudges.
583  for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
584    base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
585    last_local_nudges_by_model_type_[iter.Get()] = now;
586    if (previous.is_null())
587      continue;
588
589#define PER_DATA_TYPE_MACRO(type_str) \
590    SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
591    SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
592#undef PER_DATA_TYPE_MACRO
593  }
594}
595
596TimeDelta SyncSchedulerImpl::GetPollInterval() {
597  return (!session_context_->notifications_enabled() ||
598          !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
599      syncer_short_poll_interval_seconds_ :
600      syncer_long_poll_interval_seconds_;
601}
602
603void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
604  DCHECK(CalledOnValidThread());
605
606  TimeDelta poll = GetPollInterval();
607  bool rate_changed = !poll_timer_.IsRunning() ||
608                       poll != poll_timer_.GetCurrentDelay();
609
610  if (type == FORCE_RESET) {
611    last_poll_reset_ = base::TimeTicks::Now();
612    if (!rate_changed)
613      poll_timer_.Reset();
614  }
615
616  if (!rate_changed)
617    return;
618
619  // Adjust poll rate.
620  poll_timer_.Stop();
621  poll_timer_.Start(FROM_HERE, poll, this,
622                    &SyncSchedulerImpl::PollTimerCallback);
623}
624
625void SyncSchedulerImpl::RestartWaiting() {
626  CHECK(wait_interval_.get());
627  DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
628  NotifyRetryTime(base::Time::Now() + wait_interval_->length);
629  SDVLOG(2) << "Starting WaitInterval timer of length "
630      << wait_interval_->length.InMilliseconds() << "ms.";
631  if (wait_interval_->mode == WaitInterval::THROTTLED) {
632    pending_wakeup_timer_.Start(
633        FROM_HERE,
634        wait_interval_->length,
635        base::Bind(&SyncSchedulerImpl::Unthrottle,
636                   weak_ptr_factory_.GetWeakPtr()));
637  } else {
638    pending_wakeup_timer_.Start(
639        FROM_HERE,
640        wait_interval_->length,
641        base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
642                   weak_ptr_factory_.GetWeakPtr()));
643  }
644}
645
646void SyncSchedulerImpl::Stop() {
647  DCHECK(CalledOnValidThread());
648  SDVLOG(2) << "Stop called";
649
650  // Kill any in-flight method calls.
651  weak_ptr_factory_.InvalidateWeakPtrs();
652  wait_interval_.reset();
653  NotifyRetryTime(base::Time());
654  poll_timer_.Stop();
655  pending_wakeup_timer_.Stop();
656  pending_configure_params_.reset();
657  if (started_)
658    started_ = false;
659}
660
661// This is the only place where we invoke DoSyncSessionJob with canary
662// privileges.  Everyone else should use NORMAL_PRIORITY.
663void SyncSchedulerImpl::TryCanaryJob() {
664  next_sync_session_job_priority_ = CANARY_PRIORITY;
665  TrySyncSessionJob();
666}
667
668void SyncSchedulerImpl::TrySyncSessionJob() {
669  // Post call to TrySyncSessionJobImpl on current thread. Later request for
670  // access token will be here.
671  base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
672      &SyncSchedulerImpl::TrySyncSessionJobImpl,
673      weak_ptr_factory_.GetWeakPtr()));
674}
675
676void SyncSchedulerImpl::TrySyncSessionJobImpl() {
677  JobPriority priority = next_sync_session_job_priority_;
678  next_sync_session_job_priority_ = NORMAL_PRIORITY;
679
680  nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
681
682  DCHECK(CalledOnValidThread());
683  if (mode_ == CONFIGURATION_MODE) {
684    if (pending_configure_params_) {
685      SDVLOG(2) << "Found pending configure job";
686      DoConfigurationSyncSessionJob(priority);
687    }
688  } else if (CanRunNudgeJobNow(priority)) {
689    if (nudge_tracker_.IsSyncRequired()) {
690      SDVLOG(2) << "Found pending nudge job";
691      DoNudgeSyncSessionJob(priority);
692    } else if (do_poll_after_credentials_updated_ ||
693        ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
694      DoPollSyncSessionJob();
695      // Poll timer fires infrequently. Usually by this time access token is
696      // already expired and poll job will fail with auth error. Set flag to
697      // retry poll once ProfileSyncService gets new access token, TryCanaryJob
698      // will be called after access token is retrieved.
699      if (HttpResponse::SYNC_AUTH_ERROR ==
700          session_context_->connection_manager()->server_status()) {
701        do_poll_after_credentials_updated_ = true;
702      }
703    }
704  }
705
706  if (priority == CANARY_PRIORITY) {
707    // If this is canary job then whatever result was don't run poll job till
708    // the next time poll timer fires.
709    do_poll_after_credentials_updated_ = false;
710  }
711
712  if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
713    // If we succeeded, our wait interval would have been cleared.  If it hasn't
714    // been cleared, then we should increase our backoff interval and schedule
715    // another retry.
716    TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
717    wait_interval_.reset(
718      new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
719    SDVLOG(2) << "Sync cycle failed.  Will back off for "
720        << wait_interval_->length.InMilliseconds() << "ms.";
721    RestartWaiting();
722  }
723}
724
725void SyncSchedulerImpl::PollTimerCallback() {
726  DCHECK(CalledOnValidThread());
727  if (no_scheduling_allowed_) {
728    // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
729    // functions that are called only on the sync thread.  This function is also
730    // called only on the sync thread, and only when it is posted by an expiring
731    // timer.  If we find that no_scheduling_allowed_ is set here, then
732    // something is very wrong.  Maybe someone mistakenly called us directly, or
733    // mishandled the book-keeping for no_scheduling_allowed_.
734    NOTREACHED() << "Illegal to schedule job while session in progress.";
735    return;
736  }
737
738  TrySyncSessionJob();
739}
740
741void SyncSchedulerImpl::RetryTimerCallback() {
742  TrySyncSessionJob();
743}
744
745void SyncSchedulerImpl::Unthrottle() {
746  DCHECK(CalledOnValidThread());
747  DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
748
749  // We're no longer throttled, so clear the wait interval.
750  wait_interval_.reset();
751  NotifyRetryTime(base::Time());
752  NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
753
754  // We treat this as a 'canary' in the sense that it was originally scheduled
755  // to run some time ago, failed, and we now want to retry, versus a job that
756  // was just created (e.g via ScheduleNudgeImpl). The main implication is
757  // that we're careful to update routing info (etc) with such potentially
758  // stale canary jobs.
759  TryCanaryJob();
760}
761
762void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
763  DCHECK(CalledOnValidThread());
764  nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
765  NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
766
767  if (nudge_tracker_.IsAnyTypeThrottled()) {
768    const base::TimeTicks now = base::TimeTicks::Now();
769    base::TimeDelta time_until_next_unthrottle =
770        nudge_tracker_.GetTimeUntilNextUnthrottle(now);
771    type_unthrottle_timer_.Start(
772        FROM_HERE,
773        time_until_next_unthrottle,
774        base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
775                   weak_ptr_factory_.GetWeakPtr(),
776                   now + time_until_next_unthrottle));
777  }
778
779  // Maybe this is a good time to run a nudge job.  Let's try it.
780  if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
781    TrySyncSessionJob();
782}
783
784void SyncSchedulerImpl::PerformDelayedNudge() {
785  // Circumstances may have changed since we scheduled this delayed nudge.
786  // We must check to see if it's OK to run the job before we do so.
787  if (CanRunNudgeJobNow(NORMAL_PRIORITY))
788    TrySyncSessionJob();
789
790  // We're not responsible for setting up any retries here.  The functions that
791  // first put us into a state that prevents successful sync cycles (eg. global
792  // throttling, type throttling, network errors, transient errors) will also
793  // setup the appropriate retry logic (eg. retry after timeout, exponential
794  // backoff, retry when the network changes).
795}
796
797void SyncSchedulerImpl::ExponentialBackoffRetry() {
798  TryCanaryJob();
799}
800
801void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
802  FOR_EACH_OBSERVER(SyncEngineEventListener,
803                    *session_context_->listeners(),
804                    OnRetryTimeChanged(retry_time));
805}
806
807void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
808  FOR_EACH_OBSERVER(SyncEngineEventListener,
809                    *session_context_->listeners(),
810                    OnThrottledTypesChanged(types));
811}
812
813bool SyncSchedulerImpl::IsBackingOff() const {
814  DCHECK(CalledOnValidThread());
815  return wait_interval_.get() && wait_interval_->mode ==
816      WaitInterval::EXPONENTIAL_BACKOFF;
817}
818
819void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
820  DCHECK(CalledOnValidThread());
821  wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
822                                        throttle_duration));
823  NotifyRetryTime(base::Time::Now() + wait_interval_->length);
824  NotifyThrottledTypesChanged(ModelTypeSet::All());
825}
826
827void SyncSchedulerImpl::OnTypesThrottled(
828    ModelTypeSet types,
829    const base::TimeDelta& throttle_duration) {
830  base::TimeTicks now = base::TimeTicks::Now();
831
832  nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
833  base::TimeDelta time_until_next_unthrottle =
834      nudge_tracker_.GetTimeUntilNextUnthrottle(now);
835  type_unthrottle_timer_.Start(
836      FROM_HERE,
837      time_until_next_unthrottle,
838      base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
839                 weak_ptr_factory_.GetWeakPtr(),
840                 now + time_until_next_unthrottle));
841  NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
842}
843
844bool SyncSchedulerImpl::IsCurrentlyThrottled() {
845  DCHECK(CalledOnValidThread());
846  return wait_interval_.get() && wait_interval_->mode ==
847      WaitInterval::THROTTLED;
848}
849
850void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
851    const base::TimeDelta& new_interval) {
852  DCHECK(CalledOnValidThread());
853  syncer_short_poll_interval_seconds_ = new_interval;
854}
855
856void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
857    const base::TimeDelta& new_interval) {
858  DCHECK(CalledOnValidThread());
859  syncer_long_poll_interval_seconds_ = new_interval;
860}
861
862void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
863    const std::map<ModelType, base::TimeDelta>& nudge_delays) {
864  DCHECK(CalledOnValidThread());
865  nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
866}
867
868void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
869  if (size > 0)
870    nudge_tracker_.SetHintBufferSize(size);
871  else
872    NOTREACHED() << "Hint buffer size should be > 0.";
873}
874
875void SyncSchedulerImpl::OnSyncProtocolError(
876    const SyncProtocolError& sync_protocol_error) {
877  DCHECK(CalledOnValidThread());
878  if (ShouldRequestEarlyExit(sync_protocol_error)) {
879    SDVLOG(2) << "Sync Scheduler requesting early exit.";
880    Stop();
881  }
882  if (IsActionableError(sync_protocol_error)) {
883    SDVLOG(2) << "OnActionableError";
884    FOR_EACH_OBSERVER(SyncEngineEventListener,
885                      *session_context_->listeners(),
886                      OnActionableError(sync_protocol_error));
887  }
888}
889
890void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
891  nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
892  retry_timer_.Start(FROM_HERE, delay, this,
893                     &SyncSchedulerImpl::RetryTimerCallback);
894}
895
896void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
897    FOR_EACH_OBSERVER(SyncEngineEventListener,
898                      *session_context_->listeners(),
899                      OnMigrationRequested(types));
900}
901
902void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
903  DCHECK(CalledOnValidThread());
904  session_context_->set_notifications_enabled(notifications_enabled);
905  if (notifications_enabled)
906    nudge_tracker_.OnInvalidationsEnabled();
907  else
908    nudge_tracker_.OnInvalidationsDisabled();
909}
910
911#undef SDVLOG_LOC
912
913#undef SDVLOG
914
915#undef SLOG
916
917#undef ENUM_CASE
918
919}  // namespace syncer
920