1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/chromeos/drive/change_list_loader.h"
6
7#include <set>
8
9#include "base/callback.h"
10#include "base/callback_helpers.h"
11#include "base/metrics/histogram.h"
12#include "base/strings/string_number_conversions.h"
13#include "base/time/time.h"
14#include "chrome/browser/chromeos/drive/change_list_loader_observer.h"
15#include "chrome/browser/chromeos/drive/change_list_processor.h"
16#include "chrome/browser/chromeos/drive/file_system_util.h"
17#include "chrome/browser/chromeos/drive/job_scheduler.h"
18#include "chrome/browser/chromeos/drive/resource_metadata.h"
19#include "chrome/browser/drive/event_logger.h"
20#include "content/public/browser/browser_thread.h"
21#include "google_apis/drive/drive_api_parser.h"
22#include "url/gurl.h"
23
24using content::BrowserThread;
25
26namespace drive {
27namespace internal {
28
29typedef base::Callback<void(FileError, ScopedVector<ChangeList>)>
30    FeedFetcherCallback;
31
32class ChangeListLoader::FeedFetcher {
33 public:
34  virtual ~FeedFetcher() {}
35  virtual void Run(const FeedFetcherCallback& callback) = 0;
36};
37
38namespace {
39
40// Fetches all the (currently available) resource entries from the server.
41class FullFeedFetcher : public ChangeListLoader::FeedFetcher {
42 public:
43  explicit FullFeedFetcher(JobScheduler* scheduler)
44      : scheduler_(scheduler),
45        weak_ptr_factory_(this) {
46  }
47
48  virtual ~FullFeedFetcher() {
49  }
50
51  virtual void Run(const FeedFetcherCallback& callback) OVERRIDE {
52    DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
53    DCHECK(!callback.is_null());
54
55    // Remember the time stamp for usage stats.
56    start_time_ = base::TimeTicks::Now();
57
58    // This is full resource list fetch.
59    scheduler_->GetAllFileList(
60        base::Bind(&FullFeedFetcher::OnFileListFetched,
61                   weak_ptr_factory_.GetWeakPtr(), callback));
62  }
63
64 private:
65  void OnFileListFetched(const FeedFetcherCallback& callback,
66                         google_apis::GDataErrorCode status,
67                         scoped_ptr<google_apis::FileList> file_list) {
68    DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
69    DCHECK(!callback.is_null());
70
71    FileError error = GDataToFileError(status);
72    if (error != FILE_ERROR_OK) {
73      callback.Run(error, ScopedVector<ChangeList>());
74      return;
75    }
76
77    DCHECK(file_list);
78    change_lists_.push_back(new ChangeList(*file_list));
79
80    if (!file_list->next_link().is_empty()) {
81      // There is the remaining result so fetch it.
82      scheduler_->GetRemainingFileList(
83          file_list->next_link(),
84          base::Bind(&FullFeedFetcher::OnFileListFetched,
85                     weak_ptr_factory_.GetWeakPtr(), callback));
86      return;
87    }
88
89    UMA_HISTOGRAM_LONG_TIMES("Drive.FullFeedLoadTime",
90                             base::TimeTicks::Now() - start_time_);
91
92    // Note: The fetcher is managed by ChangeListLoader, and the instance
93    // will be deleted in the callback. Do not touch the fields after this
94    // invocation.
95    callback.Run(FILE_ERROR_OK, change_lists_.Pass());
96  }
97
98  JobScheduler* scheduler_;
99  ScopedVector<ChangeList> change_lists_;
100  base::TimeTicks start_time_;
101  base::WeakPtrFactory<FullFeedFetcher> weak_ptr_factory_;
102  DISALLOW_COPY_AND_ASSIGN(FullFeedFetcher);
103};
104
105// Fetches the delta changes since |start_change_id|.
106class DeltaFeedFetcher : public ChangeListLoader::FeedFetcher {
107 public:
108  DeltaFeedFetcher(JobScheduler* scheduler, int64 start_change_id)
109      : scheduler_(scheduler),
110        start_change_id_(start_change_id),
111        weak_ptr_factory_(this) {
112  }
113
114  virtual ~DeltaFeedFetcher() {
115  }
116
117  virtual void Run(const FeedFetcherCallback& callback) OVERRIDE {
118    DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
119    DCHECK(!callback.is_null());
120
121    scheduler_->GetChangeList(
122        start_change_id_,
123        base::Bind(&DeltaFeedFetcher::OnChangeListFetched,
124                   weak_ptr_factory_.GetWeakPtr(), callback));
125  }
126
127 private:
128  void OnChangeListFetched(const FeedFetcherCallback& callback,
129                           google_apis::GDataErrorCode status,
130                           scoped_ptr<google_apis::ChangeList> change_list) {
131    DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
132    DCHECK(!callback.is_null());
133
134    FileError error = GDataToFileError(status);
135    if (error != FILE_ERROR_OK) {
136      callback.Run(error, ScopedVector<ChangeList>());
137      return;
138    }
139
140    DCHECK(change_list);
141    change_lists_.push_back(new ChangeList(*change_list));
142
143    if (!change_list->next_link().is_empty()) {
144      // There is the remaining result so fetch it.
145      scheduler_->GetRemainingChangeList(
146          change_list->next_link(),
147          base::Bind(&DeltaFeedFetcher::OnChangeListFetched,
148                     weak_ptr_factory_.GetWeakPtr(), callback));
149      return;
150    }
151
152    // Note: The fetcher is managed by ChangeListLoader, and the instance
153    // will be deleted in the callback. Do not touch the fields after this
154    // invocation.
155    callback.Run(FILE_ERROR_OK, change_lists_.Pass());
156  }
157
158  JobScheduler* scheduler_;
159  int64 start_change_id_;
160  ScopedVector<ChangeList> change_lists_;
161  base::WeakPtrFactory<DeltaFeedFetcher> weak_ptr_factory_;
162  DISALLOW_COPY_AND_ASSIGN(DeltaFeedFetcher);
163};
164
165}  // namespace
166
167LoaderController::LoaderController()
168    : lock_count_(0),
169      weak_ptr_factory_(this) {
170  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
171}
172
173LoaderController::~LoaderController() {
174  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
175}
176
177scoped_ptr<base::ScopedClosureRunner> LoaderController::GetLock() {
178  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
179
180  ++lock_count_;
181  return make_scoped_ptr(new base::ScopedClosureRunner(
182      base::Bind(&LoaderController::Unlock,
183                 weak_ptr_factory_.GetWeakPtr())));
184}
185
186void LoaderController::ScheduleRun(const base::Closure& task) {
187  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
188  DCHECK(!task.is_null());
189
190  if (lock_count_ > 0) {
191    pending_tasks_.push_back(task);
192  } else {
193    task.Run();
194  }
195}
196
197void LoaderController::Unlock() {
198  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
199  DCHECK_LT(0, lock_count_);
200
201  if (--lock_count_ > 0)
202    return;
203
204  std::vector<base::Closure> tasks;
205  tasks.swap(pending_tasks_);
206  for (size_t i = 0; i < tasks.size(); ++i)
207    tasks[i].Run();
208}
209
210AboutResourceLoader::AboutResourceLoader(JobScheduler* scheduler)
211    : scheduler_(scheduler),
212      current_update_task_id_(-1),
213      weak_ptr_factory_(this) {
214}
215
216AboutResourceLoader::~AboutResourceLoader() {}
217
218void AboutResourceLoader::GetAboutResource(
219    const google_apis::AboutResourceCallback& callback) {
220  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
221  DCHECK(!callback.is_null());
222
223  // If the latest UpdateAboutResource task is still running. Wait for it,
224  if (pending_callbacks_.count(current_update_task_id_)) {
225    pending_callbacks_[current_update_task_id_].push_back(callback);
226    return;
227  }
228
229  if (cached_about_resource_) {
230    base::MessageLoopProxy::current()->PostTask(
231        FROM_HERE,
232        base::Bind(
233            callback,
234            google_apis::HTTP_NO_CONTENT,
235            base::Passed(scoped_ptr<google_apis::AboutResource>(
236                new google_apis::AboutResource(*cached_about_resource_)))));
237  } else {
238    UpdateAboutResource(callback);
239  }
240}
241
242void AboutResourceLoader::UpdateAboutResource(
243    const google_apis::AboutResourceCallback& callback) {
244  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
245  DCHECK(!callback.is_null());
246
247  ++current_update_task_id_;
248  pending_callbacks_[current_update_task_id_].push_back(callback);
249
250  scheduler_->GetAboutResource(
251      base::Bind(&AboutResourceLoader::UpdateAboutResourceAfterGetAbout,
252                 weak_ptr_factory_.GetWeakPtr(),
253                 current_update_task_id_));
254}
255
256void AboutResourceLoader::UpdateAboutResourceAfterGetAbout(
257    int task_id,
258    google_apis::GDataErrorCode status,
259    scoped_ptr<google_apis::AboutResource> about_resource) {
260  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
261  FileError error = GDataToFileError(status);
262
263  const std::vector<google_apis::AboutResourceCallback> callbacks =
264      pending_callbacks_[task_id];
265  pending_callbacks_.erase(task_id);
266
267  if (error != FILE_ERROR_OK) {
268    for (size_t i = 0; i < callbacks.size(); ++i)
269      callbacks[i].Run(status, scoped_ptr<google_apis::AboutResource>());
270    return;
271  }
272
273  // Updates the cache when the resource is successfully obtained.
274  if (cached_about_resource_ &&
275      cached_about_resource_->largest_change_id() >
276      about_resource->largest_change_id()) {
277    LOG(WARNING) << "Local cached about resource is fresher than server, "
278                 << "local = " << cached_about_resource_->largest_change_id()
279                 << ", server = " << about_resource->largest_change_id();
280  }
281  cached_about_resource_.reset(new google_apis::AboutResource(*about_resource));
282
283  for (size_t i = 0; i < callbacks.size(); ++i) {
284    callbacks[i].Run(
285        status,
286        make_scoped_ptr(new google_apis::AboutResource(*about_resource)));
287  }
288}
289
290ChangeListLoader::ChangeListLoader(
291    EventLogger* logger,
292    base::SequencedTaskRunner* blocking_task_runner,
293    ResourceMetadata* resource_metadata,
294    JobScheduler* scheduler,
295    AboutResourceLoader* about_resource_loader,
296    LoaderController* loader_controller)
297    : logger_(logger),
298      blocking_task_runner_(blocking_task_runner),
299      resource_metadata_(resource_metadata),
300      scheduler_(scheduler),
301      about_resource_loader_(about_resource_loader),
302      loader_controller_(loader_controller),
303      loaded_(false),
304      weak_ptr_factory_(this) {
305}
306
307ChangeListLoader::~ChangeListLoader() {
308}
309
310bool ChangeListLoader::IsRefreshing() const {
311  // Callback for change list loading is stored in pending_load_callback_.
312  // It is non-empty if and only if there is an in-flight loading operation.
313  return !pending_load_callback_.empty();
314}
315
316void ChangeListLoader::AddObserver(ChangeListLoaderObserver* observer) {
317  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
318  observers_.AddObserver(observer);
319}
320
321void ChangeListLoader::RemoveObserver(ChangeListLoaderObserver* observer) {
322  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
323  observers_.RemoveObserver(observer);
324}
325
326void ChangeListLoader::CheckForUpdates(const FileOperationCallback& callback) {
327  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
328  DCHECK(!callback.is_null());
329
330  // We only start to check for updates iff the load is done.
331  // I.e., we ignore checking updates if not loaded to avoid starting the
332  // load without user's explicit interaction (such as opening Drive).
333  if (!loaded_ && !IsRefreshing())
334    return;
335
336  // For each CheckForUpdates() request, always refresh the changestamp info.
337  about_resource_loader_->UpdateAboutResource(
338      base::Bind(&ChangeListLoader::OnAboutResourceUpdated,
339                 weak_ptr_factory_.GetWeakPtr()));
340
341  if (IsRefreshing()) {
342    // There is in-flight loading. So keep the callback here, and check for
343    // updates when the in-flight loading is completed.
344    pending_update_check_callback_ = callback;
345    return;
346  }
347
348  DCHECK(loaded_);
349  logger_->Log(logging::LOG_INFO, "Checking for updates");
350  Load(callback);
351}
352
353void ChangeListLoader::LoadIfNeeded(const FileOperationCallback& callback) {
354  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
355  DCHECK(!callback.is_null());
356
357  // If the metadata is not yet loaded, start loading.
358  if (!loaded_ && !IsRefreshing())
359    Load(callback);
360}
361
362void ChangeListLoader::Load(const FileOperationCallback& callback) {
363  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
364  DCHECK(!callback.is_null());
365
366  // Check if this is the first time this ChangeListLoader do loading.
367  // Note: IsRefreshing() depends on pending_load_callback_ so check in advance.
368  const bool is_initial_load = (!loaded_ && !IsRefreshing());
369
370  // Register the callback function to be called when it is loaded.
371  pending_load_callback_.push_back(callback);
372
373  // If loading task is already running, do nothing.
374  if (pending_load_callback_.size() > 1)
375    return;
376
377  // Check the current status of local metadata, and start loading if needed.
378  int64* local_changestamp = new int64(0);
379  base::PostTaskAndReplyWithResult(
380      blocking_task_runner_.get(),
381      FROM_HERE,
382      base::Bind(&ResourceMetadata::GetLargestChangestamp,
383                 base::Unretained(resource_metadata_),
384                 local_changestamp),
385      base::Bind(&ChangeListLoader::LoadAfterGetLargestChangestamp,
386                 weak_ptr_factory_.GetWeakPtr(),
387                 is_initial_load,
388                 base::Owned(local_changestamp)));
389}
390
391void ChangeListLoader::LoadAfterGetLargestChangestamp(
392    bool is_initial_load,
393    const int64* local_changestamp,
394    FileError error) {
395  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
396
397  if (error != FILE_ERROR_OK) {
398    OnChangeListLoadComplete(error);
399    return;
400  }
401
402  if (is_initial_load && *local_changestamp > 0) {
403    // The local data is usable. Flush callbacks to tell loading was successful.
404    OnChangeListLoadComplete(FILE_ERROR_OK);
405
406    // Continues to load from server in background.
407    // Put dummy callbacks to indicate that fetching is still continuing.
408    pending_load_callback_.push_back(
409        base::Bind(&util::EmptyFileOperationCallback));
410  }
411
412  about_resource_loader_->GetAboutResource(
413      base::Bind(&ChangeListLoader::LoadAfterGetAboutResource,
414                 weak_ptr_factory_.GetWeakPtr(),
415                 *local_changestamp));
416}
417
418void ChangeListLoader::LoadAfterGetAboutResource(
419    int64 local_changestamp,
420    google_apis::GDataErrorCode status,
421    scoped_ptr<google_apis::AboutResource> about_resource) {
422  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
423
424  FileError error = GDataToFileError(status);
425  if (error != FILE_ERROR_OK) {
426    OnChangeListLoadComplete(error);
427    return;
428  }
429
430  DCHECK(about_resource);
431
432  int64 remote_changestamp = about_resource->largest_change_id();
433  int64 start_changestamp = local_changestamp > 0 ? local_changestamp + 1 : 0;
434  if (local_changestamp >= remote_changestamp) {
435    if (local_changestamp > remote_changestamp) {
436      LOG(WARNING) << "Local resource metadata is fresher than server, "
437                   << "local = " << local_changestamp
438                   << ", server = " << remote_changestamp;
439    }
440
441    // No changes detected, tell the client that the loading was successful.
442    OnChangeListLoadComplete(FILE_ERROR_OK);
443  } else {
444    // Start loading the change list.
445    LoadChangeListFromServer(start_changestamp);
446  }
447}
448
449void ChangeListLoader::OnChangeListLoadComplete(FileError error) {
450  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
451
452  if (!loaded_ && error == FILE_ERROR_OK) {
453    loaded_ = true;
454    FOR_EACH_OBSERVER(ChangeListLoaderObserver,
455                      observers_,
456                      OnInitialLoadComplete());
457  }
458
459  for (size_t i = 0; i < pending_load_callback_.size(); ++i) {
460    base::MessageLoopProxy::current()->PostTask(
461        FROM_HERE,
462        base::Bind(pending_load_callback_[i], error));
463  }
464  pending_load_callback_.clear();
465
466  // If there is pending update check, try to load the change from the server
467  // again, because there may exist an update during the completed loading.
468  if (!pending_update_check_callback_.is_null()) {
469    Load(base::ResetAndReturn(&pending_update_check_callback_));
470  }
471}
472
473void ChangeListLoader::OnAboutResourceUpdated(
474    google_apis::GDataErrorCode error,
475    scoped_ptr<google_apis::AboutResource> resource) {
476  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
477
478  if (drive::GDataToFileError(error) != drive::FILE_ERROR_OK) {
479    logger_->Log(logging::LOG_ERROR,
480                 "Failed to update the about resource: %s",
481                 google_apis::GDataErrorCodeToString(error).c_str());
482    return;
483  }
484  logger_->Log(logging::LOG_INFO,
485               "About resource updated to: %s",
486               base::Int64ToString(resource->largest_change_id()).c_str());
487}
488
489void ChangeListLoader::LoadChangeListFromServer(int64 start_changestamp) {
490  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
491  DCHECK(!change_feed_fetcher_);
492  DCHECK(about_resource_loader_->cached_about_resource());
493
494  bool is_delta_update = start_changestamp != 0;
495
496  // Set up feed fetcher.
497  if (is_delta_update) {
498    change_feed_fetcher_.reset(
499        new DeltaFeedFetcher(scheduler_, start_changestamp));
500  } else {
501    change_feed_fetcher_.reset(new FullFeedFetcher(scheduler_));
502  }
503
504  // Make a copy of cached_about_resource_ to remember at which changestamp we
505  // are fetching change list.
506  change_feed_fetcher_->Run(
507      base::Bind(&ChangeListLoader::LoadChangeListFromServerAfterLoadChangeList,
508                 weak_ptr_factory_.GetWeakPtr(),
509                 base::Passed(make_scoped_ptr(new google_apis::AboutResource(
510                     *about_resource_loader_->cached_about_resource()))),
511                 is_delta_update));
512}
513
514void ChangeListLoader::LoadChangeListFromServerAfterLoadChangeList(
515    scoped_ptr<google_apis::AboutResource> about_resource,
516    bool is_delta_update,
517    FileError error,
518    ScopedVector<ChangeList> change_lists) {
519  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
520  DCHECK(about_resource);
521
522  // Delete the fetcher first.
523  change_feed_fetcher_.reset();
524
525  if (error != FILE_ERROR_OK) {
526    OnChangeListLoadComplete(error);
527    return;
528  }
529
530  ChangeListProcessor* change_list_processor =
531      new ChangeListProcessor(resource_metadata_);
532  // Don't send directory content change notification while performing
533  // the initial content retrieval.
534  const bool should_notify_changed_directories = is_delta_update;
535
536  logger_->Log(logging::LOG_INFO,
537               "Apply change lists (is delta: %d)",
538               is_delta_update);
539  loader_controller_->ScheduleRun(base::Bind(
540      base::IgnoreResult(
541          &base::PostTaskAndReplyWithResult<FileError, FileError>),
542      blocking_task_runner_,
543      FROM_HERE,
544      base::Bind(&ChangeListProcessor::Apply,
545                 base::Unretained(change_list_processor),
546                 base::Passed(&about_resource),
547                 base::Passed(&change_lists),
548                 is_delta_update),
549      base::Bind(&ChangeListLoader::LoadChangeListFromServerAfterUpdate,
550                 weak_ptr_factory_.GetWeakPtr(),
551                 base::Owned(change_list_processor),
552                 should_notify_changed_directories,
553                 base::Time::Now())));
554}
555
556void ChangeListLoader::LoadChangeListFromServerAfterUpdate(
557    ChangeListProcessor* change_list_processor,
558    bool should_notify_changed_directories,
559    const base::Time& start_time,
560    FileError error) {
561  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
562
563  const base::TimeDelta elapsed = base::Time::Now() - start_time;
564  logger_->Log(logging::LOG_INFO,
565               "Change lists applied (elapsed time: %sms)",
566               base::Int64ToString(elapsed.InMilliseconds()).c_str());
567
568  if (should_notify_changed_directories) {
569    FOR_EACH_OBSERVER(ChangeListLoaderObserver,
570                      observers_,
571                      OnFileChanged(change_list_processor->changed_files()));
572  }
573
574  OnChangeListLoadComplete(error);
575
576  FOR_EACH_OBSERVER(ChangeListLoaderObserver,
577                    observers_,
578                    OnLoadFromServerComplete());
579}
580
581}  // namespace internal
582}  // namespace drive
583