1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/internal_api/public/attachments/attachment_uploader_impl.h"
6
7#include "base/bind.h"
8#include "base/memory/weak_ptr.h"
9#include "base/message_loop/message_loop.h"
10#include "base/threading/non_thread_safe.h"
11#include "google_apis/gaia/gaia_constants.h"
12#include "net/base/load_flags.h"
13#include "net/http/http_status_code.h"
14#include "net/url_request/url_fetcher.h"
15#include "net/url_request/url_fetcher_delegate.h"
16#include "sync/api/attachments/attachment.h"
17#include "sync/protocol/sync.pb.h"
18
19namespace {
20
21const char kContentType[] = "application/octet-stream";
22const char kAttachments[] = "attachments/";
23
24}  // namespace
25
26namespace syncer {
27
28// Encapsulates all the state associated with a single upload.
29class AttachmentUploaderImpl::UploadState : public net::URLFetcherDelegate,
30                                            public OAuth2TokenService::Consumer,
31                                            public base::NonThreadSafe {
32 public:
33  // Construct an UploadState.
34  //
35  // UploadState encapsulates the state associated with a single upload.  When
36  // the upload completes, the UploadState object becomes "stopped".
37  //
38  // |owner| is a pointer to the object that owns this UploadState.  Upon
39  // completion this object will PostTask to owner's OnUploadStateStopped
40  // method.
41  UploadState(
42      const GURL& upload_url,
43      const scoped_refptr<net::URLRequestContextGetter>&
44          url_request_context_getter,
45      const Attachment& attachment,
46      const UploadCallback& user_callback,
47      const std::string& account_id,
48      const OAuth2TokenService::ScopeSet& scopes,
49      OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
50      const base::WeakPtr<AttachmentUploaderImpl>& owner);
51
52  virtual ~UploadState();
53
54  // Returns true if this object is stopped.  Once stopped, this object is
55  // effectively dead and can be destroyed.
56  bool IsStopped() const;
57
58  // Add |user_callback| to the list of callbacks to be invoked when this upload
59  // completed.
60  //
61  // It is an error to call |AddUserCallback| on a stopped UploadState (see
62  // |IsStopped|).
63  void AddUserCallback(const UploadCallback& user_callback);
64
65  // Return the Attachment this object is uploading.
66  const Attachment& GetAttachment();
67
68  // URLFetcher implementation.
69  virtual void OnURLFetchComplete(const net::URLFetcher* source) OVERRIDE;
70
71  // OAuth2TokenService::Consumer.
72  virtual void OnGetTokenSuccess(const OAuth2TokenService::Request* request,
73                                 const std::string& access_token,
74                                 const base::Time& expiration_time) OVERRIDE;
75  virtual void OnGetTokenFailure(const OAuth2TokenService::Request* request,
76                                 const GoogleServiceAuthError& error) OVERRIDE;
77
78 private:
79  typedef std::vector<UploadCallback> UploadCallbackList;
80
81  void GetToken();
82
83  void StopAndReportResult(const UploadResult& result,
84                           const AttachmentId& attachment_id);
85
86  bool is_stopped_;
87  GURL upload_url_;
88  const scoped_refptr<net::URLRequestContextGetter>&
89      url_request_context_getter_;
90  Attachment attachment_;
91  UploadCallbackList user_callbacks_;
92  scoped_ptr<net::URLFetcher> fetcher_;
93  std::string account_id_;
94  OAuth2TokenService::ScopeSet scopes_;
95  std::string access_token_;
96  OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider_;
97  // Pointer to the AttachmentUploaderImpl that owns this object.
98  base::WeakPtr<AttachmentUploaderImpl> owner_;
99  scoped_ptr<OAuth2TokenServiceRequest> access_token_request_;
100
101  DISALLOW_COPY_AND_ASSIGN(UploadState);
102};
103
104AttachmentUploaderImpl::UploadState::UploadState(
105    const GURL& upload_url,
106    const scoped_refptr<net::URLRequestContextGetter>&
107        url_request_context_getter,
108    const Attachment& attachment,
109    const UploadCallback& user_callback,
110    const std::string& account_id,
111    const OAuth2TokenService::ScopeSet& scopes,
112    OAuth2TokenServiceRequest::TokenServiceProvider* token_service_provider,
113    const base::WeakPtr<AttachmentUploaderImpl>& owner)
114    : OAuth2TokenService::Consumer("attachment-uploader-impl"),
115      is_stopped_(false),
116      upload_url_(upload_url),
117      url_request_context_getter_(url_request_context_getter),
118      attachment_(attachment),
119      user_callbacks_(1, user_callback),
120      account_id_(account_id),
121      scopes_(scopes),
122      token_service_provider_(token_service_provider),
123      owner_(owner) {
124  DCHECK(upload_url_.is_valid());
125  DCHECK(url_request_context_getter_.get());
126  DCHECK(!account_id_.empty());
127  DCHECK(!scopes_.empty());
128  DCHECK(token_service_provider_);
129  GetToken();
130}
131
132AttachmentUploaderImpl::UploadState::~UploadState() {
133}
134
135bool AttachmentUploaderImpl::UploadState::IsStopped() const {
136  DCHECK(CalledOnValidThread());
137  return is_stopped_;
138}
139
140void AttachmentUploaderImpl::UploadState::AddUserCallback(
141    const UploadCallback& user_callback) {
142  DCHECK(CalledOnValidThread());
143  DCHECK(!is_stopped_);
144  user_callbacks_.push_back(user_callback);
145}
146
147const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() {
148  DCHECK(CalledOnValidThread());
149  return attachment_;
150}
151
152void AttachmentUploaderImpl::UploadState::OnURLFetchComplete(
153    const net::URLFetcher* source) {
154  DCHECK(CalledOnValidThread());
155  if (is_stopped_) {
156    return;
157  }
158
159  UploadResult result = UPLOAD_TRANSIENT_ERROR;
160  AttachmentId attachment_id = attachment_.GetId();
161  const int response_code = source->GetResponseCode();
162  if (response_code == net::HTTP_OK) {
163    result = UPLOAD_SUCCESS;
164  } else if (response_code == net::HTTP_UNAUTHORIZED) {
165    // Server tells us we've got a bad token so invalidate it.
166    OAuth2TokenServiceRequest::InvalidateToken(
167        token_service_provider_, account_id_, scopes_, access_token_);
168    // Fail the request, but indicate that it may be successful if retried.
169    result = UPLOAD_TRANSIENT_ERROR;
170  } else if (response_code == net::HTTP_FORBIDDEN) {
171    // User is not allowed to use attachments.  Retrying won't help.
172    result = UPLOAD_UNSPECIFIED_ERROR;
173  } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
174    result = UPLOAD_TRANSIENT_ERROR;
175  }
176  StopAndReportResult(result, attachment_id);
177}
178
179void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess(
180    const OAuth2TokenService::Request* request,
181    const std::string& access_token,
182    const base::Time& expiration_time) {
183  DCHECK(CalledOnValidThread());
184  if (is_stopped_) {
185    return;
186  }
187
188  DCHECK_EQ(access_token_request_.get(), request);
189  access_token_request_.reset();
190  access_token_ = access_token;
191  fetcher_.reset(
192      net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this));
193  fetcher_->SetAutomaticallyRetryOn5xx(false);
194  fetcher_->SetRequestContext(url_request_context_getter_.get());
195  // TODO(maniscalco): Is there a better way?  Copying the attachment data into
196  // a string feels wrong given how large attachments may be (several MBs).  If
197  // we may end up switching from URLFetcher to URLRequest, this copy won't be
198  // necessary.
199  scoped_refptr<base::RefCountedMemory> memory = attachment_.GetData();
200  const std::string upload_content(memory->front_as<char>(), memory->size());
201  fetcher_->SetUploadData(kContentType, upload_content);
202  const std::string auth_header("Authorization: Bearer " + access_token_);
203  fetcher_->AddExtraRequestHeader(auth_header);
204  fetcher_->SetLoadFlags(net::LOAD_DO_NOT_SAVE_COOKIES |
205                         net::LOAD_DO_NOT_SEND_COOKIES |
206                         net::LOAD_DISABLE_CACHE);
207  // TODO(maniscalco): Set an appropriate headers (User-Agent, Content-type, and
208  // Content-length) on the request and include the content's MD5,
209  // AttachmentId's unique_id and the "sync birthday" (bug 371521).
210  fetcher_->Start();
211}
212
213void AttachmentUploaderImpl::UploadState::OnGetTokenFailure(
214    const OAuth2TokenService::Request* request,
215    const GoogleServiceAuthError& error) {
216  DCHECK(CalledOnValidThread());
217  if (is_stopped_) {
218    return;
219  }
220
221  DCHECK_EQ(access_token_request_.get(), request);
222  access_token_request_.reset();
223  // TODO(maniscalco): We treat this as a transient error, but it may in fact be
224  // a very long lived error and require user action.  Consider differentiating
225  // between the causes of GetToken failure and act accordingly.  Think about
226  // the causes of GetToken failure. Are there (bug 412802).
227  StopAndReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId());
228}
229
230void AttachmentUploaderImpl::UploadState::GetToken() {
231  access_token_request_ = OAuth2TokenServiceRequest::CreateAndStart(
232      token_service_provider_, account_id_, scopes_, this);
233}
234
235void AttachmentUploaderImpl::UploadState::StopAndReportResult(
236    const UploadResult& result,
237    const AttachmentId& attachment_id) {
238  DCHECK(!is_stopped_);
239  is_stopped_ = true;
240  UploadCallbackList::const_iterator iter = user_callbacks_.begin();
241  UploadCallbackList::const_iterator end = user_callbacks_.end();
242  for (; iter != end; ++iter) {
243    base::MessageLoop::current()->PostTask(
244        FROM_HERE, base::Bind(*iter, result, attachment_id));
245  }
246  base::MessageLoop::current()->PostTask(
247      FROM_HERE,
248      base::Bind(&AttachmentUploaderImpl::OnUploadStateStopped,
249                 owner_,
250                 attachment_id.GetProto().unique_id()));
251}
252
253AttachmentUploaderImpl::AttachmentUploaderImpl(
254    const GURL& sync_service_url,
255    const scoped_refptr<net::URLRequestContextGetter>&
256        url_request_context_getter,
257    const std::string& account_id,
258    const OAuth2TokenService::ScopeSet& scopes,
259    const scoped_refptr<OAuth2TokenServiceRequest::TokenServiceProvider>&
260        token_service_provider)
261    : sync_service_url_(sync_service_url),
262      url_request_context_getter_(url_request_context_getter),
263      account_id_(account_id),
264      scopes_(scopes),
265      token_service_provider_(token_service_provider),
266      weak_ptr_factory_(this) {
267  DCHECK(CalledOnValidThread());
268  DCHECK(!account_id.empty());
269  DCHECK(!scopes.empty());
270  DCHECK(token_service_provider_.get());
271}
272
273AttachmentUploaderImpl::~AttachmentUploaderImpl() {
274  DCHECK(CalledOnValidThread());
275}
276
277void AttachmentUploaderImpl::UploadAttachment(const Attachment& attachment,
278                                              const UploadCallback& callback) {
279  DCHECK(CalledOnValidThread());
280  const AttachmentId attachment_id = attachment.GetId();
281  const std::string unique_id = attachment_id.GetProto().unique_id();
282  DCHECK(!unique_id.empty());
283  StateMap::iterator iter = state_map_.find(unique_id);
284  if (iter != state_map_.end()) {
285    // We have an old upload request for this attachment...
286    if (!iter->second->IsStopped()) {
287      // "join" to it.
288      DCHECK(attachment.GetData()
289                 ->Equals(iter->second->GetAttachment().GetData()));
290      iter->second->AddUserCallback(callback);
291      return;
292    } else {
293      // It's stopped so we can't use it.  Delete it.
294      state_map_.erase(iter);
295    }
296  }
297
298  const GURL url = GetURLForAttachmentId(sync_service_url_, attachment_id);
299  scoped_ptr<UploadState> upload_state(
300      new UploadState(url,
301                      url_request_context_getter_,
302                      attachment,
303                      callback,
304                      account_id_,
305                      scopes_,
306                      token_service_provider_.get(),
307                      weak_ptr_factory_.GetWeakPtr()));
308  state_map_.add(unique_id, upload_state.Pass());
309}
310
311// Static.
312GURL AttachmentUploaderImpl::GetURLForAttachmentId(
313    const GURL& sync_service_url,
314    const AttachmentId& attachment_id) {
315  std::string path = sync_service_url.path();
316  if (path.empty() || *path.rbegin() != '/') {
317    path += '/';
318  }
319  path += kAttachments;
320  path += attachment_id.GetProto().unique_id();
321  GURL::Replacements replacements;
322  replacements.SetPathStr(path);
323  return sync_service_url.ReplaceComponents(replacements);
324}
325
326void AttachmentUploaderImpl::OnUploadStateStopped(const UniqueId& unique_id) {
327  StateMap::iterator iter = state_map_.find(unique_id);
328  // Only erase if stopped.  Because this method is called asynchronously, it's
329  // possible that a new request for this same id arrived after the UploadState
330  // stopped, but before this method was invoked.  In that case the UploadState
331  // in the map might be a new one.
332  if (iter != state_map_.end() && iter->second->IsStopped()) {
333    state_map_.erase(iter);
334  }
335}
336
337}  // namespace syncer
338