spdy_http_stream.cc revision ddb351dbec246cf1fab5ec20d2d5520909041de1
1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_http_stream.h"
6
7#include <algorithm>
8#include <list>
9#include <string>
10
11#include "base/logging.h"
12#include "base/message_loop.h"
13#include "net/base/address_list.h"
14#include "net/base/host_port_pair.h"
15#include "net/base/load_flags.h"
16#include "net/base/net_util.h"
17#include "net/http/http_request_headers.h"
18#include "net/http/http_request_info.h"
19#include "net/http/http_response_info.h"
20#include "net/http/http_util.h"
21#include "net/spdy/spdy_http_utils.h"
22#include "net/spdy/spdy_session.h"
23
24namespace net {
25
26SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session,
27                               bool direct)
28    : ALLOW_THIS_IN_INITIALIZER_LIST(read_callback_factory_(this)),
29      stream_(NULL),
30      spdy_session_(spdy_session),
31      response_info_(NULL),
32      download_finished_(false),
33      response_headers_received_(false),
34      user_callback_(NULL),
35      user_buffer_len_(0),
36      buffered_read_callback_pending_(false),
37      more_read_data_pending_(false),
38      direct_(direct) { }
39
40void SpdyHttpStream::InitializeWithExistingStream(SpdyStream* spdy_stream) {
41  stream_ = spdy_stream;
42  stream_->SetDelegate(this);
43  response_headers_received_ = true;
44}
45
46SpdyHttpStream::~SpdyHttpStream() {
47  if (stream_)
48    stream_->DetachDelegate();
49}
50
51int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
52                                     const BoundNetLog& stream_net_log,
53                                     CompletionCallback* callback) {
54  DCHECK(!stream_.get());
55  if (spdy_session_->IsClosed())
56   return ERR_CONNECTION_CLOSED;
57
58  request_info_ = request_info;
59  if (request_info_->method == "GET") {
60    int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
61                                             stream_net_log);
62    if (error != OK)
63      return error;
64  }
65
66  if (stream_.get())
67    return OK;
68
69  return spdy_session_->CreateStream(request_info_->url,
70                                     request_info_->priority, &stream_,
71                                     stream_net_log, callback);
72}
73
74const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
75  return response_info_;
76}
77
78uint64 SpdyHttpStream::GetUploadProgress() const {
79  if (!request_body_stream_.get())
80    return 0;
81
82  return request_body_stream_->position();
83}
84
85int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) {
86  CHECK(callback);
87  CHECK(!stream_->cancelled());
88
89  if (stream_->closed())
90    return stream_->response_status();
91
92  // Check if we already have the response headers. If so, return synchronously.
93  if(stream_->response_received()) {
94    CHECK(stream_->is_idle());
95    return OK;
96  }
97
98  // Still waiting for the response, return IO_PENDING.
99  CHECK(!user_callback_);
100  user_callback_ = callback;
101  return ERR_IO_PENDING;
102}
103
104int SpdyHttpStream::ReadResponseBody(
105    IOBuffer* buf, int buf_len, CompletionCallback* callback) {
106  CHECK(stream_->is_idle());
107  CHECK(buf);
108  CHECK(buf_len);
109  CHECK(callback);
110
111  // If we have data buffered, complete the IO immediately.
112  if (!response_body_.empty()) {
113    int bytes_read = 0;
114    while (!response_body_.empty() && buf_len > 0) {
115      scoped_refptr<IOBufferWithSize> data = response_body_.front();
116      const int bytes_to_copy = std::min(buf_len, data->size());
117      memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
118      buf_len -= bytes_to_copy;
119      if (bytes_to_copy == data->size()) {
120        response_body_.pop_front();
121      } else {
122        const int bytes_remaining = data->size() - bytes_to_copy;
123        IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
124        memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
125               bytes_remaining);
126        response_body_.pop_front();
127        response_body_.push_front(make_scoped_refptr(new_buffer));
128      }
129      bytes_read += bytes_to_copy;
130    }
131    if (SpdySession::flow_control())
132      stream_->IncreaseRecvWindowSize(bytes_read);
133    return bytes_read;
134  } else if (stream_->closed()) {
135    return stream_->response_status();
136  }
137
138  CHECK(!user_callback_);
139  CHECK(!user_buffer_);
140  CHECK_EQ(0, user_buffer_len_);
141
142  user_callback_ = callback;
143  user_buffer_ = buf;
144  user_buffer_len_ = buf_len;
145  return ERR_IO_PENDING;
146}
147
148void SpdyHttpStream::Close(bool not_reusable) {
149  // Note: the not_reusable flag has no meaning for SPDY streams.
150
151  Cancel();
152}
153
154HttpStream* SpdyHttpStream::RenewStreamForAuth() {
155  return NULL;
156}
157
158bool SpdyHttpStream::IsResponseBodyComplete() const {
159  if (!stream_)
160    return false;
161  return stream_->closed();
162}
163
164bool SpdyHttpStream::CanFindEndOfResponse() const {
165  return true;
166}
167
168bool SpdyHttpStream::IsMoreDataBuffered() const {
169  return false;
170}
171
172bool SpdyHttpStream::IsConnectionReused() const {
173  return spdy_session_->IsReused();
174}
175
176void SpdyHttpStream::SetConnectionReused() {
177  // SPDY doesn't need an indicator here.
178}
179
180bool SpdyHttpStream::IsConnectionReusable() const {
181  // SPDY streams aren't considered reusable.
182  return false;
183}
184
185void SpdyHttpStream::set_chunk_callback(ChunkCallback* callback) {
186  if (request_body_stream_ != NULL)
187    request_body_stream_->set_chunk_callback(callback);
188}
189
190int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
191                                UploadDataStream* request_body,
192                                HttpResponseInfo* response,
193                                CompletionCallback* callback) {
194  base::Time request_time = base::Time::Now();
195  CHECK(stream_.get());
196
197  stream_->SetDelegate(this);
198
199  linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
200  CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers,
201                                   headers.get(), direct_);
202  stream_->set_spdy_headers(headers);
203
204  stream_->SetRequestTime(request_time);
205  // This should only get called in the case of a request occurring
206  // during server push that has already begun but hasn't finished,
207  // so we set the response's request time to be the actual one
208  if (response_info_)
209    response_info_->request_time = request_time;
210
211  CHECK(!request_body_stream_.get());
212  if (request_body) {
213    if (request_body->size() || request_body->is_chunked())
214      request_body_stream_.reset(request_body);
215    else
216      delete request_body;
217  }
218
219  CHECK(callback);
220  CHECK(!stream_->cancelled());
221  CHECK(response);
222
223  if (!stream_->pushed() && stream_->closed()) {
224    if (stream_->response_status() == OK)
225      return ERR_FAILED;
226    else
227      return stream_->response_status();
228  }
229
230  // SendRequest can be called in two cases.
231  //
232  // a) A client initiated request. In this case, |response_info_| should be
233  //    NULL to start with.
234  // b) A client request which matches a response that the server has already
235  //    pushed.
236  if (push_response_info_.get()) {
237    *response = *(push_response_info_.get());
238    push_response_info_.reset();
239  }
240  else
241    DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
242
243  response_info_ = response;
244
245  // Put the peer's IP address and port into the response.
246  AddressList address;
247  int result = stream_->GetPeerAddress(&address);
248  if (result != OK)
249    return result;
250  response_info_->socket_address = HostPortPair::FromAddrInfo(address.head());
251
252  bool has_upload_data = request_body_stream_.get() != NULL;
253  result = stream_->SendRequest(has_upload_data);
254  if (result == ERR_IO_PENDING) {
255    CHECK(!user_callback_);
256    user_callback_ = callback;
257  }
258  return result;
259}
260
261void SpdyHttpStream::Cancel() {
262  if (spdy_session_)
263    spdy_session_->CancelPendingCreateStreams(&stream_);
264  user_callback_ = NULL;
265  if (stream_)
266    stream_->Cancel();
267}
268
269bool SpdyHttpStream::OnSendHeadersComplete(int status) {
270  if (user_callback_)
271    DoCallback(status);
272  return request_body_stream_.get() == NULL;
273}
274
275int SpdyHttpStream::OnSendBody() {
276  CHECK(request_body_stream_.get());
277
278  int buf_len = static_cast<int>(request_body_stream_->buf_len());
279  if (!buf_len)
280    return OK;
281  bool is_chunked = request_body_stream_->is_chunked();
282  // TODO(satish): For non-chunked POST data, we set DATA_FLAG_FIN for all
283  // blocks of data written out. This is wrong if the POST data was larger than
284  // UploadDataStream::kBufSize as that is the largest buffer that
285  // UploadDataStream returns at a time and we'll be setting the FIN flag for
286  // each block of data written out.
287  bool eof = !is_chunked || request_body_stream_->IsOnLastChunk();
288  return stream_->WriteStreamData(
289      request_body_stream_->buf(), buf_len,
290      eof ? spdy::DATA_FLAG_FIN : spdy::DATA_FLAG_NONE);
291}
292
293int SpdyHttpStream::OnSendBodyComplete(int status, bool* eof) {
294  CHECK(request_body_stream_.get());
295
296  request_body_stream_->MarkConsumedAndFillBuffer(status);
297  *eof = request_body_stream_->eof();
298  if (!*eof &&
299      request_body_stream_->is_chunked() &&
300      !request_body_stream_->buf_len())
301    return ERR_IO_PENDING;
302
303  return OK;
304}
305
306int SpdyHttpStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response,
307                                       base::Time response_time,
308                                       int status) {
309  if (!response_info_) {
310    DCHECK(stream_->pushed());
311    push_response_info_.reset(new HttpResponseInfo);
312    response_info_ = push_response_info_.get();
313  }
314
315  // If the response is already received, these headers are too late.
316  if (response_headers_received_) {
317    LOG(WARNING) << "SpdyHttpStream headers received after response started.";
318    return OK;
319  }
320
321  // TODO(mbelshe): This is the time of all headers received, not just time
322  // to first byte.
323  response_info_->response_time = base::Time::Now();
324
325  if (!SpdyHeadersToHttpResponse(response, response_info_)) {
326    // We might not have complete headers yet.
327    return ERR_INCOMPLETE_SPDY_HEADERS;
328  }
329
330  response_headers_received_ = true;
331  // Don't store the SSLInfo in the response here, HttpNetworkTransaction
332  // will take care of that part.
333  SSLInfo ssl_info;
334  stream_->GetSSLInfo(&ssl_info,
335                      &response_info_->was_npn_negotiated);
336  response_info_->request_time = stream_->GetRequestTime();
337  response_info_->vary_data.Init(*request_info_, *response_info_->headers);
338  // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control
339  // frame has been received and processed.  Move to framer?
340  response_info_->response_time = response_time;
341
342  if (user_callback_)
343    DoCallback(status);
344  return status;
345}
346
347void SpdyHttpStream::OnDataReceived(const char* data, int length) {
348  // SpdyStream won't call us with data if the header block didn't contain a
349  // valid set of headers.  So we don't expect to not have headers received
350  // here.
351  DCHECK(response_headers_received_);
352
353  // Note that data may be received for a SpdyStream prior to the user calling
354  // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
355  // happen for server initiated streams.
356  DCHECK(!stream_->closed() || stream_->pushed());
357  if (length > 0) {
358    // Save the received data.
359    IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
360    memcpy(io_buffer->data(), data, length);
361    response_body_.push_back(make_scoped_refptr(io_buffer));
362
363    if (user_buffer_) {
364      // Handing small chunks of data to the caller creates measurable overhead.
365      // We buffer data in short time-spans and send a single read notification.
366      ScheduleBufferedReadCallback();
367    }
368  }
369}
370
371void SpdyHttpStream::OnDataSent(int length) {
372  // For HTTP streams, no data is sent from the client while in the OPEN state,
373  // so it is never called.
374  NOTREACHED();
375}
376
377void SpdyHttpStream::OnClose(int status) {
378  bool invoked_callback = false;
379  if (status == net::OK) {
380    // We need to complete any pending buffered read now.
381    invoked_callback = DoBufferedReadCallback();
382  }
383  if (!invoked_callback && user_callback_)
384    DoCallback(status);
385}
386
387void SpdyHttpStream::ScheduleBufferedReadCallback() {
388  // If there is already a scheduled DoBufferedReadCallback, don't issue
389  // another one.  Mark that we have received more data and return.
390  if (buffered_read_callback_pending_) {
391    more_read_data_pending_ = true;
392    return;
393  }
394
395  more_read_data_pending_ = false;
396  buffered_read_callback_pending_ = true;
397  const int kBufferTimeMs = 1;
398  MessageLoop::current()->PostDelayedTask(FROM_HERE, read_callback_factory_.
399      NewRunnableMethod(&SpdyHttpStream::DoBufferedReadCallback),
400      kBufferTimeMs);
401}
402
403// Checks to see if we should wait for more buffered data before notifying
404// the caller.  Returns true if we should wait, false otherwise.
405bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
406  // If the response is complete, there is no point in waiting.
407  if (stream_->closed())
408    return false;
409
410  int bytes_buffered = 0;
411  std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it;
412  for (it = response_body_.begin();
413       it != response_body_.end() && bytes_buffered < user_buffer_len_;
414       ++it)
415    bytes_buffered += (*it)->size();
416
417  return bytes_buffered < user_buffer_len_;
418}
419
420bool SpdyHttpStream::DoBufferedReadCallback() {
421  read_callback_factory_.RevokeAll();
422  buffered_read_callback_pending_ = false;
423
424  // If the transaction is cancelled or errored out, we don't need to complete
425  // the read.
426  if (!stream_ || stream_->response_status() != OK || stream_->cancelled())
427    return false;
428
429  // When more_read_data_pending_ is true, it means that more data has
430  // arrived since we started waiting.  Wait a little longer and continue
431  // to buffer.
432  if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
433    ScheduleBufferedReadCallback();
434    return false;
435  }
436
437  int rv = 0;
438  if (user_buffer_) {
439    rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
440    CHECK_NE(rv, ERR_IO_PENDING);
441    user_buffer_ = NULL;
442    user_buffer_len_ = 0;
443    DoCallback(rv);
444    return true;
445  }
446  return false;
447}
448
449void SpdyHttpStream::DoCallback(int rv) {
450  CHECK_NE(rv, ERR_IO_PENDING);
451  CHECK(user_callback_);
452
453  // Since Run may result in being called back, clear user_callback_ in advance.
454  CompletionCallback* c = user_callback_;
455  user_callback_ = NULL;
456  c->Run(rv);
457}
458
459void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
460  DCHECK(stream_);
461  bool using_npn;
462  stream_->GetSSLInfo(ssl_info, &using_npn);
463}
464
465void SpdyHttpStream::GetSSLCertRequestInfo(
466    SSLCertRequestInfo* cert_request_info) {
467  DCHECK(stream_);
468  stream_->GetSSLCertRequestInfo(cert_request_info);
469}
470
471bool SpdyHttpStream::IsSpdyHttpStream() const {
472  return true;
473}
474
475}  // namespace net
476