1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/internal_api/public/attachments/attachment_service_impl.h" 6 7#include <iterator> 8 9#include "base/bind.h" 10#include "base/message_loop/message_loop.h" 11#include "base/thread_task_runner_handle.h" 12#include "base/time/time.h" 13#include "sync/api/attachments/attachment.h" 14#include "sync/api/attachments/fake_attachment_store.h" 15#include "sync/internal_api/public/attachments/fake_attachment_downloader.h" 16#include "sync/internal_api/public/attachments/fake_attachment_uploader.h" 17 18namespace syncer { 19 20// GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls. 21// GetOrDownloadState tracks completion of these calls and posts callback for 22// consumer once all attachments are either retrieved or reported unavailable. 23class AttachmentServiceImpl::GetOrDownloadState 24 : public base::RefCounted<GetOrDownloadState>, 25 public base::NonThreadSafe { 26 public: 27 // GetOrDownloadState gets parameter from values passed to 28 // AttachmentService::GetOrDownloadAttachments. 29 // |attachment_ids| is a list of attachmens to retrieve. 30 // |callback| will be posted on current thread when all attachments retrieved 31 // or confirmed unavailable. 32 GetOrDownloadState(const AttachmentIdList& attachment_ids, 33 const GetOrDownloadCallback& callback); 34 35 // Attachment was just retrieved. Add it to retrieved attachments. 36 void AddAttachment(const Attachment& attachment); 37 38 // Both reading from local store and downloading attachment failed. 39 // Add it to unavailable set. 40 void AddUnavailableAttachmentId(const AttachmentId& attachment_id); 41 42 private: 43 friend class base::RefCounted<GetOrDownloadState>; 44 virtual ~GetOrDownloadState(); 45 46 // If all attachment requests completed then post callback to consumer with 47 // results. 48 void PostResultIfAllRequestsCompleted(); 49 50 GetOrDownloadCallback callback_; 51 52 // Requests for these attachments are still in progress. 53 AttachmentIdSet in_progress_attachments_; 54 55 AttachmentIdSet unavailable_attachments_; 56 scoped_ptr<AttachmentMap> retrieved_attachments_; 57 58 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState); 59}; 60 61AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState( 62 const AttachmentIdList& attachment_ids, 63 const GetOrDownloadCallback& callback) 64 : callback_(callback), retrieved_attachments_(new AttachmentMap()) { 65 std::copy( 66 attachment_ids.begin(), 67 attachment_ids.end(), 68 std::inserter(in_progress_attachments_, in_progress_attachments_.end())); 69 PostResultIfAllRequestsCompleted(); 70} 71 72AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() { 73 DCHECK(CalledOnValidThread()); 74} 75 76void AttachmentServiceImpl::GetOrDownloadState::AddAttachment( 77 const Attachment& attachment) { 78 DCHECK(CalledOnValidThread()); 79 DCHECK(retrieved_attachments_->find(attachment.GetId()) == 80 retrieved_attachments_->end()); 81 retrieved_attachments_->insert( 82 std::make_pair(attachment.GetId(), attachment)); 83 DCHECK(in_progress_attachments_.find(attachment.GetId()) != 84 in_progress_attachments_.end()); 85 in_progress_attachments_.erase(attachment.GetId()); 86 PostResultIfAllRequestsCompleted(); 87} 88 89void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId( 90 const AttachmentId& attachment_id) { 91 DCHECK(CalledOnValidThread()); 92 DCHECK(unavailable_attachments_.find(attachment_id) == 93 unavailable_attachments_.end()); 94 unavailable_attachments_.insert(attachment_id); 95 DCHECK(in_progress_attachments_.find(attachment_id) != 96 in_progress_attachments_.end()); 97 in_progress_attachments_.erase(attachment_id); 98 PostResultIfAllRequestsCompleted(); 99} 100 101void 102AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() { 103 if (in_progress_attachments_.empty()) { 104 // All requests completed. Let's notify consumer. 105 GetOrDownloadResult result = 106 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR; 107 base::MessageLoop::current()->PostTask( 108 FROM_HERE, 109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_))); 110 } 111} 112 113AttachmentServiceImpl::AttachmentServiceImpl( 114 scoped_refptr<AttachmentStore> attachment_store, 115 scoped_ptr<AttachmentUploader> attachment_uploader, 116 scoped_ptr<AttachmentDownloader> attachment_downloader, 117 Delegate* delegate, 118 const base::TimeDelta& initial_backoff_delay, 119 const base::TimeDelta& max_backoff_delay) 120 : attachment_store_(attachment_store), 121 attachment_uploader_(attachment_uploader.Pass()), 122 attachment_downloader_(attachment_downloader.Pass()), 123 delegate_(delegate), 124 weak_ptr_factory_(this) { 125 DCHECK(CalledOnValidThread()); 126 DCHECK(attachment_store_.get()); 127 128 // TODO(maniscalco): Observe network connectivity change events. When the 129 // network becomes disconnected, consider suspending queue dispatch. When 130 // connectivity is restored, consider clearing any dispatch backoff (bug 131 // 411981). 132 upload_task_queue_.reset(new TaskQueue<AttachmentId>( 133 base::Bind(&AttachmentServiceImpl::BeginUpload, 134 weak_ptr_factory_.GetWeakPtr()), 135 initial_backoff_delay, 136 max_backoff_delay)); 137 138 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 139} 140 141AttachmentServiceImpl::~AttachmentServiceImpl() { 142 DCHECK(CalledOnValidThread()); 143 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 144} 145 146// Static. 147scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { 148 scoped_refptr<syncer::AttachmentStore> attachment_store( 149 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get())); 150 scoped_ptr<AttachmentUploader> attachment_uploader( 151 new FakeAttachmentUploader); 152 scoped_ptr<AttachmentDownloader> attachment_downloader( 153 new FakeAttachmentDownloader()); 154 scoped_ptr<syncer::AttachmentService> attachment_service( 155 new syncer::AttachmentServiceImpl(attachment_store, 156 attachment_uploader.Pass(), 157 attachment_downloader.Pass(), 158 NULL, 159 base::TimeDelta(), 160 base::TimeDelta())); 161 return attachment_service.Pass(); 162} 163 164AttachmentStore* AttachmentServiceImpl::GetStore() { 165 return attachment_store_.get(); 166} 167 168void AttachmentServiceImpl::GetOrDownloadAttachments( 169 const AttachmentIdList& attachment_ids, 170 const GetOrDownloadCallback& callback) { 171 DCHECK(CalledOnValidThread()); 172 scoped_refptr<GetOrDownloadState> state( 173 new GetOrDownloadState(attachment_ids, callback)); 174 attachment_store_->Read(attachment_ids, 175 base::Bind(&AttachmentServiceImpl::ReadDone, 176 weak_ptr_factory_.GetWeakPtr(), 177 state)); 178} 179 180void AttachmentServiceImpl::DropAttachments( 181 const AttachmentIdList& attachment_ids, 182 const DropCallback& callback) { 183 DCHECK(CalledOnValidThread()); 184 attachment_store_->Drop(attachment_ids, 185 base::Bind(&AttachmentServiceImpl::DropDone, 186 weak_ptr_factory_.GetWeakPtr(), 187 callback)); 188} 189 190void AttachmentServiceImpl::ReadDone( 191 const scoped_refptr<GetOrDownloadState>& state, 192 const AttachmentStore::Result& result, 193 scoped_ptr<AttachmentMap> attachments, 194 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { 195 // Add read attachments to result. 196 for (AttachmentMap::const_iterator iter = attachments->begin(); 197 iter != attachments->end(); 198 ++iter) { 199 state->AddAttachment(iter->second); 200 } 201 202 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); 203 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); 204 if (attachment_downloader_.get()) { 205 // Try to download locally unavailable attachments. 206 for (; iter != end; ++iter) { 207 attachment_downloader_->DownloadAttachment( 208 *iter, 209 base::Bind(&AttachmentServiceImpl::DownloadDone, 210 weak_ptr_factory_.GetWeakPtr(), 211 state, 212 *iter)); 213 } 214 } else { 215 // No downloader so all locally unavailable attachments are unavailable. 216 for (; iter != end; ++iter) { 217 state->AddUnavailableAttachmentId(*iter); 218 } 219 } 220} 221 222void AttachmentServiceImpl::DropDone(const DropCallback& callback, 223 const AttachmentStore::Result& result) { 224 AttachmentService::DropResult drop_result = 225 AttachmentService::DROP_UNSPECIFIED_ERROR; 226 if (result == AttachmentStore::SUCCESS) { 227 drop_result = AttachmentService::DROP_SUCCESS; 228 } 229 // TODO(maniscalco): Deal with case where an error occurred (bug 361251). 230 base::MessageLoop::current()->PostTask(FROM_HERE, 231 base::Bind(callback, drop_result)); 232} 233 234void AttachmentServiceImpl::UploadDone( 235 const AttachmentUploader::UploadResult& result, 236 const AttachmentId& attachment_id) { 237 DCHECK(CalledOnValidThread()); 238 switch (result) { 239 case AttachmentUploader::UPLOAD_SUCCESS: 240 upload_task_queue_->MarkAsSucceeded(attachment_id); 241 if (delegate_) { 242 delegate_->OnAttachmentUploaded(attachment_id); 243 } 244 break; 245 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: 246 upload_task_queue_->MarkAsFailed(attachment_id); 247 upload_task_queue_->AddToQueue(attachment_id); 248 break; 249 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: 250 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. 251 upload_task_queue_->MarkAsFailed(attachment_id); 252 break; 253 } 254} 255 256void AttachmentServiceImpl::DownloadDone( 257 const scoped_refptr<GetOrDownloadState>& state, 258 const AttachmentId& attachment_id, 259 const AttachmentDownloader::DownloadResult& result, 260 scoped_ptr<Attachment> attachment) { 261 switch (result) { 262 case AttachmentDownloader::DOWNLOAD_SUCCESS: 263 state->AddAttachment(*attachment.get()); 264 break; 265 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: 266 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: 267 state->AddUnavailableAttachmentId(attachment_id); 268 break; 269 } 270} 271 272void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { 273 DCHECK(CalledOnValidThread()); 274 AttachmentIdList attachment_ids; 275 attachment_ids.push_back(attachment_id); 276 attachment_store_->Read(attachment_ids, 277 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, 278 weak_ptr_factory_.GetWeakPtr())); 279} 280 281void AttachmentServiceImpl::UploadAttachments( 282 const AttachmentIdSet& attachment_ids) { 283 DCHECK(CalledOnValidThread()); 284 if (!attachment_uploader_.get()) { 285 return; 286 } 287 AttachmentIdSet::const_iterator iter = attachment_ids.begin(); 288 AttachmentIdSet::const_iterator end = attachment_ids.end(); 289 for (; iter != end; ++iter) { 290 upload_task_queue_->AddToQueue(*iter); 291 } 292} 293 294void AttachmentServiceImpl::OnNetworkChanged( 295 net::NetworkChangeNotifier::ConnectionType type) { 296 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) { 297 upload_task_queue_->ResetBackoff(); 298 } 299} 300 301void AttachmentServiceImpl::ReadDoneNowUpload( 302 const AttachmentStore::Result& result, 303 scoped_ptr<AttachmentMap> attachments, 304 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { 305 DCHECK(CalledOnValidThread()); 306 if (!unavailable_attachment_ids->empty()) { 307 // TODO(maniscalco): We failed to read some attachments. What should we do 308 // now? 309 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); 310 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); 311 for (; iter != end; ++iter) { 312 upload_task_queue_->Cancel(*iter); 313 } 314 } 315 316 AttachmentMap::const_iterator iter = attachments->begin(); 317 AttachmentMap::const_iterator end = attachments->end(); 318 for (; iter != end; ++iter) { 319 attachment_uploader_->UploadAttachment( 320 iter->second, 321 base::Bind(&AttachmentServiceImpl::UploadDone, 322 weak_ptr_factory_.GetWeakPtr())); 323 } 324} 325 326void AttachmentServiceImpl::SetTimerForTest(scoped_ptr<base::Timer> timer) { 327 upload_task_queue_->SetTimerForTest(timer.Pass()); 328} 329 330} // namespace syncer 331