1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/base/upload_data_stream.h"
6
7#include "base/logging.h"
8#include "net/base/io_buffer.h"
9#include "net/base/net_errors.h"
10#include "net/base/upload_bytes_element_reader.h"
11#include "net/base/upload_element_reader.h"
12
13namespace net {
14
15UploadDataStream::UploadDataStream(
16    ScopedVector<UploadElementReader> element_readers,
17    int64 identifier)
18    : element_readers_(element_readers.Pass()),
19      element_index_(0),
20      total_size_(0),
21      current_position_(0),
22      identifier_(identifier),
23      is_chunked_(false),
24      last_chunk_appended_(false),
25      read_failed_(false),
26      initialized_successfully_(false),
27      weak_ptr_factory_(this) {
28}
29
30UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier)
31    : element_index_(0),
32      total_size_(0),
33      current_position_(0),
34      identifier_(identifier),
35      is_chunked_(true),
36      last_chunk_appended_(false),
37      read_failed_(false),
38      initialized_successfully_(false),
39      weak_ptr_factory_(this) {
40}
41
42UploadDataStream::~UploadDataStream() {
43}
44
45UploadDataStream* UploadDataStream::CreateWithReader(
46    scoped_ptr<UploadElementReader> reader,
47    int64 identifier) {
48  ScopedVector<UploadElementReader> readers;
49  readers.push_back(reader.release());
50  return new UploadDataStream(readers.Pass(), identifier);
51}
52
53int UploadDataStream::Init(const CompletionCallback& callback) {
54  Reset();
55  return InitInternal(0, callback);
56}
57
58int UploadDataStream::Read(IOBuffer* buf,
59                           int buf_len,
60                           const CompletionCallback& callback) {
61  DCHECK(initialized_successfully_);
62  DCHECK_GT(buf_len, 0);
63  return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback);
64}
65
66bool UploadDataStream::IsEOF() const {
67  DCHECK(initialized_successfully_);
68  if (!is_chunked_)
69    return current_position_ == total_size_;
70
71  // If the upload data is chunked, check if the last chunk is appended and all
72  // elements are consumed.
73  return element_index_ == element_readers_.size() && last_chunk_appended_;
74}
75
76bool UploadDataStream::IsInMemory() const {
77  // Chunks are in memory, but UploadData does not have all the chunks at
78  // once. Chunks are provided progressively with AppendChunk() as chunks
79  // are ready. Check is_chunked_ here, rather than relying on the loop
80  // below, as there is a case that is_chunked_ is set to true, but the
81  // first chunk is not yet delivered.
82  if (is_chunked_)
83    return false;
84
85  for (size_t i = 0; i < element_readers_.size(); ++i) {
86    if (!element_readers_[i]->IsInMemory())
87      return false;
88  }
89  return true;
90}
91
92void UploadDataStream::AppendChunk(const char* bytes,
93                                   int bytes_len,
94                                   bool is_last_chunk) {
95  DCHECK(is_chunked_);
96  DCHECK(!last_chunk_appended_);
97  last_chunk_appended_ = is_last_chunk;
98
99  // Initialize a reader for the newly appended chunk. We leave |total_size_| at
100  // zero, since for chunked uploads, we may not know the total size.
101  std::vector<char> data(bytes, bytes + bytes_len);
102  UploadElementReader* reader = new UploadOwnedBytesElementReader(&data);
103  const int rv = reader->Init(net::CompletionCallback());
104  DCHECK_EQ(OK, rv);
105  element_readers_.push_back(reader);
106
107  // Resume pending read.
108  if (!pending_chunked_read_callback_.is_null()) {
109    base::Closure callback = pending_chunked_read_callback_;
110    pending_chunked_read_callback_.Reset();
111    callback.Run();
112  }
113}
114
115void UploadDataStream::Reset() {
116  weak_ptr_factory_.InvalidateWeakPtrs();
117  pending_chunked_read_callback_.Reset();
118  initialized_successfully_ = false;
119  read_failed_ = false;
120  current_position_ = 0;
121  total_size_ = 0;
122  element_index_ = 0;
123}
124
125int UploadDataStream::InitInternal(int start_index,
126                                   const CompletionCallback& callback) {
127  DCHECK(!initialized_successfully_);
128
129  // Call Init() for all elements.
130  for (size_t i = start_index; i < element_readers_.size(); ++i) {
131    UploadElementReader* reader = element_readers_[i];
132    // When new_result is ERR_IO_PENDING, InitInternal() will be called
133    // with start_index == i + 1 when reader->Init() finishes.
134    const int result = reader->Init(
135        base::Bind(&UploadDataStream::ResumePendingInit,
136                   weak_ptr_factory_.GetWeakPtr(),
137                   i + 1,
138                   callback));
139    if (result != OK) {
140      DCHECK(result != ERR_IO_PENDING || !callback.is_null());
141      return result;
142    }
143  }
144
145  // Finalize initialization.
146  if (!is_chunked_) {
147    uint64 total_size = 0;
148    for (size_t i = 0; i < element_readers_.size(); ++i) {
149      UploadElementReader* reader = element_readers_[i];
150      total_size += reader->GetContentLength();
151    }
152    total_size_ = total_size;
153  }
154  initialized_successfully_ = true;
155  return OK;
156}
157
158void UploadDataStream::ResumePendingInit(int start_index,
159                                         const CompletionCallback& callback,
160                                         int previous_result) {
161  DCHECK(!initialized_successfully_);
162  DCHECK(!callback.is_null());
163  DCHECK_NE(ERR_IO_PENDING, previous_result);
164
165  // Check the last result.
166  if (previous_result != OK) {
167    callback.Run(previous_result);
168    return;
169  }
170
171  const int result = InitInternal(start_index, callback);
172  if (result != ERR_IO_PENDING)
173    callback.Run(result);
174}
175
176int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
177                                   const CompletionCallback& callback) {
178  DCHECK(initialized_successfully_);
179
180  while (!read_failed_ && element_index_ < element_readers_.size()) {
181    UploadElementReader* reader = element_readers_[element_index_];
182
183    if (reader->BytesRemaining() == 0) {
184      ++element_index_;
185      continue;
186    }
187
188    if (buf->BytesRemaining() == 0)
189      break;
190
191    int result = reader->Read(
192        buf.get(),
193        buf->BytesRemaining(),
194        base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead),
195                   weak_ptr_factory_.GetWeakPtr(),
196                   buf,
197                   callback));
198    if (result == ERR_IO_PENDING) {
199      DCHECK(!callback.is_null());
200      return ERR_IO_PENDING;
201    }
202    ProcessReadResult(buf, result);
203  }
204
205  if (read_failed_) {
206    // Chunked transfers may only contain byte readers, so cannot have read
207    // failures.
208    DCHECK(!is_chunked_);
209
210    // If an error occured during read operation, then pad with zero.
211    // Otherwise the server will hang waiting for the rest of the data.
212    const int num_bytes_to_fill =
213        std::min(static_cast<uint64>(buf->BytesRemaining()),
214                 size() - position() - buf->BytesConsumed());
215    DCHECK_LE(0, num_bytes_to_fill);
216    memset(buf->data(), 0, num_bytes_to_fill);
217    buf->DidConsume(num_bytes_to_fill);
218  }
219
220  const int bytes_copied = buf->BytesConsumed();
221  current_position_ += bytes_copied;
222  DCHECK(is_chunked_ || total_size_ >= current_position_);
223
224  if (is_chunked_ && !IsEOF() && bytes_copied == 0) {
225    DCHECK(!callback.is_null());
226    DCHECK(pending_chunked_read_callback_.is_null());
227    pending_chunked_read_callback_ =
228        base::Bind(&UploadDataStream::ResumePendingRead,
229                   weak_ptr_factory_.GetWeakPtr(),
230                   buf,
231                   callback,
232                   OK);
233    return ERR_IO_PENDING;
234  }
235
236  // Returning 0 is allowed only when IsEOF() == true.
237  DCHECK(bytes_copied != 0 || IsEOF());
238  return bytes_copied;
239}
240
241void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,
242                                         const CompletionCallback& callback,
243                                         int previous_result) {
244  DCHECK(!callback.is_null());
245
246  ProcessReadResult(buf, previous_result);
247
248  const int result = ReadInternal(buf, callback);
249  if (result != ERR_IO_PENDING)
250    callback.Run(result);
251}
252
253void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,
254                                         int result) {
255  DCHECK_NE(ERR_IO_PENDING, result);
256  DCHECK(!read_failed_);
257
258  if (result >= 0)
259    buf->DidConsume(result);
260  else
261    read_failed_ = true;
262}
263
264}  // namespace net
265