1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/internal_api/public/attachments/attachment_uploader_impl.h" 6 7#include "base/bind.h" 8#include "base/memory/weak_ptr.h" 9#include "base/message_loop/message_loop.h" 10#include "base/threading/non_thread_safe.h" 11#include "google_apis/gaia/gaia_constants.h" 12#include "net/base/load_flags.h" 13#include "net/http/http_status_code.h" 14#include "net/url_request/url_fetcher.h" 15#include "net/url_request/url_fetcher_delegate.h" 16#include "sync/api/attachments/attachment.h" 17#include "sync/protocol/sync.pb.h" 18 19namespace { 20 21const char kContentType[] = "application/octet-stream"; 22const char kAttachments[] = "attachments/"; 23 24} // namespace 25 26namespace syncer { 27 28// Encapsulates all the state associated with a single upload. 29class AttachmentUploaderImpl::UploadState : public net::URLFetcherDelegate, 30 public OAuth2TokenService::Consumer, 31 public base::NonThreadSafe { 32 public: 33 // Construct an UploadState. 34 // 35 // UploadState encapsulates the state associated with a single upload. When 36 // the upload completes, the UploadState object becomes "stopped". 37 // 38 // |owner| is a pointer to the object that owns this UploadState. Upon 39 // completion this object will PostTask to owner's OnUploadStateStopped 40 // method. 41 UploadState( 42 const GURL& upload_url, 43 const scoped_refptr<net::URLRequestContextGetter>& 44 url_request_context_getter, 45 const Attachment& attachment, 46 const UploadCallback& user_callback, 47 const std::string& account_id, 48 const OAuth2TokenService::ScopeSet& scopes, 49 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider, 50 const base::WeakPtr<AttachmentUploaderImpl>& owner); 51 52 virtual ~UploadState(); 53 54 // Returns true if this object is stopped. Once stopped, this object is 55 // effectively dead and can be destroyed. 56 bool IsStopped() const; 57 58 // Add |user_callback| to the list of callbacks to be invoked when this upload 59 // completed. 60 // 61 // It is an error to call |AddUserCallback| on a stopped UploadState (see 62 // |IsStopped|). 63 void AddUserCallback(const UploadCallback& user_callback); 64 65 // Return the Attachment this object is uploading. 66 const Attachment& GetAttachment(); 67 68 // URLFetcher implementation. 69 virtual void OnURLFetchComplete(const net::URLFetcher* source) OVERRIDE; 70 71 // OAuth2TokenService::Consumer. 72 virtual void OnGetTokenSuccess(const OAuth2TokenService::Request* request, 73 const std::string& access_token, 74 const base::Time& expiration_time) OVERRIDE; 75 virtual void OnGetTokenFailure(const OAuth2TokenService::Request* request, 76 const GoogleServiceAuthError& error) OVERRIDE; 77 78 private: 79 typedef std::vector<UploadCallback> UploadCallbackList; 80 81 void GetToken(); 82 83 void StopAndReportResult(const UploadResult& result, 84 const AttachmentId& attachment_id); 85 86 bool is_stopped_; 87 GURL upload_url_; 88 const scoped_refptr<net::URLRequestContextGetter>& 89 url_request_context_getter_; 90 Attachment attachment_; 91 UploadCallbackList user_callbacks_; 92 scoped_ptr<net::URLFetcher> fetcher_; 93 std::string account_id_; 94 OAuth2TokenService::ScopeSet scopes_; 95 std::string access_token_; 96 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider_; 97 // Pointer to the AttachmentUploaderImpl that owns this object. 98 base::WeakPtr<AttachmentUploaderImpl> owner_; 99 scoped_ptr<OAuth2TokenServiceRequest> access_token_request_; 100 101 DISALLOW_COPY_AND_ASSIGN(UploadState); 102}; 103 104AttachmentUploaderImpl::UploadState::UploadState( 105 const GURL& upload_url, 106 const scoped_refptr<net::URLRequestContextGetter>& 107 url_request_context_getter, 108 const Attachment& attachment, 109 const UploadCallback& user_callback, 110 const std::string& account_id, 111 const OAuth2TokenService::ScopeSet& scopes, 112 OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider, 113 const base::WeakPtr<AttachmentUploaderImpl>& owner) 114 : OAuth2TokenService::Consumer("attachment-uploader-impl"), 115 is_stopped_(false), 116 upload_url_(upload_url), 117 url_request_context_getter_(url_request_context_getter), 118 attachment_(attachment), 119 user_callbacks_(1, user_callback), 120 account_id_(account_id), 121 scopes_(scopes), 122 token_service_provider_(token_service_provider), 123 owner_(owner) { 124 DCHECK(upload_url_.is_valid()); 125 DCHECK(url_request_context_getter_.get()); 126 DCHECK(!account_id_.empty()); 127 DCHECK(!scopes_.empty()); 128 DCHECK(token_service_provider_); 129 GetToken(); 130} 131 132AttachmentUploaderImpl::UploadState::~UploadState() { 133} 134 135bool AttachmentUploaderImpl::UploadState::IsStopped() const { 136 DCHECK(CalledOnValidThread()); 137 return is_stopped_; 138} 139 140void AttachmentUploaderImpl::UploadState::AddUserCallback( 141 const UploadCallback& user_callback) { 142 DCHECK(CalledOnValidThread()); 143 DCHECK(!is_stopped_); 144 user_callbacks_.push_back(user_callback); 145} 146 147const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() { 148 DCHECK(CalledOnValidThread()); 149 return attachment_; 150} 151 152void AttachmentUploaderImpl::UploadState::OnURLFetchComplete( 153 const net::URLFetcher* source) { 154 DCHECK(CalledOnValidThread()); 155 if (is_stopped_) { 156 return; 157 } 158 159 UploadResult result = UPLOAD_TRANSIENT_ERROR; 160 AttachmentId attachment_id = attachment_.GetId(); 161 const int response_code = source->GetResponseCode(); 162 if (response_code == net::HTTP_OK) { 163 result = UPLOAD_SUCCESS; 164 } else if (response_code == net::HTTP_UNAUTHORIZED) { 165 // Server tells us we've got a bad token so invalidate it. 166 OAuth2TokenServiceRequest::InvalidateToken( 167 token_service_provider_, account_id_, scopes_, access_token_); 168 // Fail the request, but indicate that it may be successful if retried. 169 result = UPLOAD_TRANSIENT_ERROR; 170 } else if (response_code == net::HTTP_FORBIDDEN) { 171 // User is not allowed to use attachments. Retrying won't help. 172 result = UPLOAD_UNSPECIFIED_ERROR; 173 } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) { 174 result = UPLOAD_TRANSIENT_ERROR; 175 } 176 StopAndReportResult(result, attachment_id); 177} 178 179void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess( 180 const OAuth2TokenService::Request* request, 181 const std::string& access_token, 182 const base::Time& expiration_time) { 183 DCHECK(CalledOnValidThread()); 184 if (is_stopped_) { 185 return; 186 } 187 188 DCHECK_EQ(access_token_request_.get(), request); 189 access_token_request_.reset(); 190 access_token_ = access_token; 191 fetcher_.reset( 192 net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this)); 193 fetcher_->SetAutomaticallyRetryOn5xx(false); 194 fetcher_->SetRequestContext(url_request_context_getter_.get()); 195 // TODO(maniscalco): Is there a better way? Copying the attachment data into 196 // a string feels wrong given how large attachments may be (several MBs). If 197 // we may end up switching from URLFetcher to URLRequest, this copy won't be 198 // necessary. 199 scoped_refptr<base::RefCountedMemory> memory = attachment_.GetData(); 200 const std::string upload_content(memory->front_as<char>(), memory->size()); 201 fetcher_->SetUploadData(kContentType, upload_content); 202 const std::string auth_header("Authorization: Bearer " + access_token_); 203 fetcher_->AddExtraRequestHeader(auth_header); 204 fetcher_->SetLoadFlags(net::LOAD_DO_NOT_SAVE_COOKIES | 205 net::LOAD_DO_NOT_SEND_COOKIES | 206 net::LOAD_DISABLE_CACHE); 207 // TODO(maniscalco): Set an appropriate headers (User-Agent, Content-type, and 208 // Content-length) on the request and include the content's MD5, 209 // AttachmentId's unique_id and the "sync birthday" (bug 371521). 210 fetcher_->Start(); 211} 212 213void AttachmentUploaderImpl::UploadState::OnGetTokenFailure( 214 const OAuth2TokenService::Request* request, 215 const GoogleServiceAuthError& error) { 216 DCHECK(CalledOnValidThread()); 217 if (is_stopped_) { 218 return; 219 } 220 221 DCHECK_EQ(access_token_request_.get(), request); 222 access_token_request_.reset(); 223 // TODO(maniscalco): We treat this as a transient error, but it may in fact be 224 // a very long lived error and require user action. Consider differentiating 225 // between the causes of GetToken failure and act accordingly. Think about 226 // the causes of GetToken failure. Are there (bug 412802). 227 StopAndReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId()); 228} 229 230void AttachmentUploaderImpl::UploadState::GetToken() { 231 access_token_request_ = OAuth2TokenServiceRequest::CreateAndStart( 232 token_service_provider_, account_id_, scopes_, this); 233} 234 235void AttachmentUploaderImpl::UploadState::StopAndReportResult( 236 const UploadResult& result, 237 const AttachmentId& attachment_id) { 238 DCHECK(!is_stopped_); 239 is_stopped_ = true; 240 UploadCallbackList::const_iterator iter = user_callbacks_.begin(); 241 UploadCallbackList::const_iterator end = user_callbacks_.end(); 242 for (; iter != end; ++iter) { 243 base::MessageLoop::current()->PostTask( 244 FROM_HERE, base::Bind(*iter, result, attachment_id)); 245 } 246 base::MessageLoop::current()->PostTask( 247 FROM_HERE, 248 base::Bind(&AttachmentUploaderImpl::OnUploadStateStopped, 249 owner_, 250 attachment_id.GetProto().unique_id())); 251} 252 253AttachmentUploaderImpl::AttachmentUploaderImpl( 254 const GURL& sync_service_url, 255 const scoped_refptr<net::URLRequestContextGetter>& 256 url_request_context_getter, 257 const std::string& account_id, 258 const OAuth2TokenService::ScopeSet& scopes, 259 const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider>& 260 token_service_provider) 261 : sync_service_url_(sync_service_url), 262 url_request_context_getter_(url_request_context_getter), 263 account_id_(account_id), 264 scopes_(scopes), 265 token_service_provider_(token_service_provider), 266 weak_ptr_factory_(this) { 267 DCHECK(CalledOnValidThread()); 268 DCHECK(!account_id.empty()); 269 DCHECK(!scopes.empty()); 270 DCHECK(token_service_provider_.get()); 271} 272 273AttachmentUploaderImpl::~AttachmentUploaderImpl() { 274 DCHECK(CalledOnValidThread()); 275} 276 277void AttachmentUploaderImpl::UploadAttachment(const Attachment& attachment, 278 const UploadCallback& callback) { 279 DCHECK(CalledOnValidThread()); 280 const AttachmentId attachment_id = attachment.GetId(); 281 const std::string unique_id = attachment_id.GetProto().unique_id(); 282 DCHECK(!unique_id.empty()); 283 StateMap::iterator iter = state_map_.find(unique_id); 284 if (iter != state_map_.end()) { 285 // We have an old upload request for this attachment... 286 if (!iter->second->IsStopped()) { 287 // "join" to it. 288 DCHECK(attachment.GetData() 289 ->Equals(iter->second->GetAttachment().GetData())); 290 iter->second->AddUserCallback(callback); 291 return; 292 } else { 293 // It's stopped so we can't use it. Delete it. 294 state_map_.erase(iter); 295 } 296 } 297 298 const GURL url = GetURLForAttachmentId(sync_service_url_, attachment_id); 299 scoped_ptr<UploadState> upload_state( 300 new UploadState(url, 301 url_request_context_getter_, 302 attachment, 303 callback, 304 account_id_, 305 scopes_, 306 token_service_provider_.get(), 307 weak_ptr_factory_.GetWeakPtr())); 308 state_map_.add(unique_id, upload_state.Pass()); 309} 310 311// Static. 312GURL AttachmentUploaderImpl::GetURLForAttachmentId( 313 const GURL& sync_service_url, 314 const AttachmentId& attachment_id) { 315 std::string path = sync_service_url.path(); 316 if (path.empty() || *path.rbegin() != '/') { 317 path += '/'; 318 } 319 path += kAttachments; 320 path += attachment_id.GetProto().unique_id(); 321 GURL::Replacements replacements; 322 replacements.SetPathStr(path); 323 return sync_service_url.ReplaceComponents(replacements); 324} 325 326void AttachmentUploaderImpl::OnUploadStateStopped(const UniqueId& unique_id) { 327 StateMap::iterator iter = state_map_.find(unique_id); 328 // Only erase if stopped. Because this method is called asynchronously, it's 329 // possible that a new request for this same id arrived after the UploadState 330 // stopped, but before this method was invoked. In that case the UploadState 331 // in the map might be a new one. 332 if (iter != state_map_.end() && iter->second->IsStopped()) { 333 state_map_.erase(iter); 334 } 335} 336 337} // namespace syncer 338