quic_http_stream.cc revision d0247b1b59f9c528cb6df88b4f2b9afaf80d181e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_http_stream.h"
6
7#include "base/callback_helpers.h"
8#include "base/strings/stringprintf.h"
9#include "net/base/io_buffer.h"
10#include "net/base/net_errors.h"
11#include "net/http/http_response_headers.h"
12#include "net/http/http_util.h"
13#include "net/quic/quic_client_session.h"
14#include "net/quic/quic_http_utils.h"
15#include "net/quic/quic_reliable_client_stream.h"
16#include "net/quic/quic_utils.h"
17#include "net/socket/next_proto.h"
18#include "net/spdy/spdy_frame_builder.h"
19#include "net/spdy/spdy_framer.h"
20#include "net/spdy/spdy_http_utils.h"
21#include "net/ssl/ssl_info.h"
22
23namespace net {
24
25static const size_t kHeaderBufInitialSize = 4096;
26
27QuicHttpStream::QuicHttpStream(const base::WeakPtr<QuicClientSession> session)
28    : next_state_(STATE_NONE),
29      session_(session),
30      stream_(NULL),
31      request_info_(NULL),
32      request_body_stream_(NULL),
33      priority_(MINIMUM_PRIORITY),
34      response_info_(NULL),
35      response_status_(OK),
36      response_headers_received_(false),
37      read_buf_(new GrowableIOBuffer()),
38      user_buffer_len_(0),
39      weak_factory_(this) {
40  DCHECK(session_);
41}
42
43QuicHttpStream::~QuicHttpStream() {
44  Close(false);
45}
46
47int QuicHttpStream::InitializeStream(const HttpRequestInfo* request_info,
48                                     RequestPriority priority,
49                                     const BoundNetLog& stream_net_log,
50                                     const CompletionCallback& callback) {
51  DCHECK(!stream_);
52  if (!session_)
53    return ERR_CONNECTION_CLOSED;
54
55  stream_net_log_ = stream_net_log;
56  request_info_ = request_info;
57  priority_ = priority;
58
59  int rv = stream_request_.StartRequest(
60      session_, &stream_, base::Bind(&QuicHttpStream::OnStreamReady,
61                                     weak_factory_.GetWeakPtr()));
62  if (rv == ERR_IO_PENDING)
63    callback_ = callback;
64
65  if (rv == OK)
66    stream_->SetDelegate(this);
67
68  return rv;
69}
70
71void QuicHttpStream::OnStreamReady(int rv) {
72  DCHECK(rv == OK || !stream_);
73  if (rv == OK)
74    stream_->SetDelegate(this);
75
76  ResetAndReturn(&callback_).Run(rv);
77}
78
79int QuicHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
80                                HttpResponseInfo* response,
81                                const CompletionCallback& callback) {
82  CHECK(stream_);
83  CHECK(!request_body_stream_);
84  CHECK(!response_info_);
85  CHECK(!callback.is_null());
86  CHECK(response);
87
88  QuicPriority priority = ConvertRequestPriorityToQuicPriority(priority_);
89  stream_->set_priority(priority);
90  // Store the serialized request headers.
91  SpdyHeaderBlock headers;
92  CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers,
93                                   &headers, 3, /*direct=*/true);
94  if (session_->connection()->version() < QUIC_VERSION_9) {
95    request_ = stream_->compressor()->CompressHeaders(headers);
96  } else {
97    request_ = stream_->compressor()->CompressHeadersWithPriority(priority,
98                                                                  headers);
99  }
100  // Log the actual request with the URL Request's net log.
101  stream_net_log_.AddEvent(
102      NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
103      base::Bind(&SpdyHeaderBlockNetLogCallback, &headers));
104  // Also log to the QuicSession's net log.
105  stream_->net_log().AddEvent(
106      NetLog::TYPE_QUIC_HTTP_STREAM_SEND_REQUEST_HEADERS,
107      base::Bind(&SpdyHeaderBlockNetLogCallback, &headers));
108
109  // Store the request body.
110  request_body_stream_ = request_info_->upload_data_stream;
111  if (request_body_stream_) {
112    // TODO(rch): Can we be more precise about when to allocate
113    // raw_request_body_buf_. Removed the following check. DoReadRequestBody()
114    // was being called even if we didn't yet allocate raw_request_body_buf_.
115    //   && (request_body_stream_->size() ||
116    //       request_body_stream_->is_chunked()))
117    //
118    // Use kMaxPacketSize as the buffer size, since the request
119    // body data is written with this size at a time.
120    // TODO(rch): use a smarter value since we can't write an entire
121    // packet due to overhead.
122    raw_request_body_buf_ = new IOBufferWithSize(kMaxPacketSize);
123    // The request body buffer is empty at first.
124    request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), 0);
125  }
126
127  // Store the response info.
128  response_info_ = response;
129
130  next_state_ = STATE_SEND_HEADERS;
131  int rv = DoLoop(OK);
132  if (rv == ERR_IO_PENDING)
133    callback_ = callback;
134
135  return rv > 0 ? OK : rv;
136}
137
138UploadProgress QuicHttpStream::GetUploadProgress() const {
139  if (!request_body_stream_)
140    return UploadProgress();
141
142  return UploadProgress(request_body_stream_->position(),
143                        request_body_stream_->size());
144}
145
146int QuicHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
147  CHECK(!callback.is_null());
148
149  if (stream_ == NULL)
150    return response_status_;
151
152  // Check if we already have the response headers. If so, return synchronously.
153  if (response_headers_received_)
154    return OK;
155
156  // Still waiting for the response, return IO_PENDING.
157  CHECK(callback_.is_null());
158  callback_ = callback;
159  return ERR_IO_PENDING;
160}
161
162const HttpResponseInfo* QuicHttpStream::GetResponseInfo() const {
163  return response_info_;
164}
165
166int QuicHttpStream::ReadResponseBody(
167    IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
168  CHECK(buf);
169  CHECK(buf_len);
170  CHECK(!callback.is_null());
171
172  // If we have data buffered, complete the IO immediately.
173  if (!response_body_.empty()) {
174    int bytes_read = 0;
175    while (!response_body_.empty() && buf_len > 0) {
176      scoped_refptr<IOBufferWithSize> data = response_body_.front();
177      const int bytes_to_copy = std::min(buf_len, data->size());
178      memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
179      buf_len -= bytes_to_copy;
180      if (bytes_to_copy == data->size()) {
181        response_body_.pop_front();
182      } else {
183        const int bytes_remaining = data->size() - bytes_to_copy;
184        IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
185        memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
186               bytes_remaining);
187        response_body_.pop_front();
188        response_body_.push_front(make_scoped_refptr(new_buffer));
189      }
190      bytes_read += bytes_to_copy;
191    }
192    return bytes_read;
193  }
194
195  if (!stream_) {
196    // If the stream is already closed, there is no body to read.
197    return response_status_;
198  }
199
200  CHECK(callback_.is_null());
201  CHECK(!user_buffer_.get());
202  CHECK_EQ(0, user_buffer_len_);
203
204  callback_ = callback;
205  user_buffer_ = buf;
206  user_buffer_len_ = buf_len;
207  return ERR_IO_PENDING;
208}
209
210void QuicHttpStream::Close(bool not_reusable) {
211  // Note: the not_reusable flag has no meaning for SPDY streams.
212  if (stream_) {
213    stream_->SetDelegate(NULL);
214    // TODO(rch): use new CANCELLED error code here once quic 11
215    // is everywhere.
216    stream_->Close(QUIC_ERROR_PROCESSING_STREAM);
217    stream_ = NULL;
218  }
219}
220
221HttpStream* QuicHttpStream::RenewStreamForAuth() {
222  return NULL;
223}
224
225bool QuicHttpStream::IsResponseBodyComplete() const {
226  return next_state_ == STATE_OPEN && !stream_;
227}
228
229bool QuicHttpStream::CanFindEndOfResponse() const {
230  return true;
231}
232
233bool QuicHttpStream::IsConnectionReused() const {
234  // TODO(rch): do something smarter here.
235  return stream_ && stream_->id() > 1;
236}
237
238void QuicHttpStream::SetConnectionReused() {
239  // QUIC doesn't need an indicator here.
240}
241
242bool QuicHttpStream::IsConnectionReusable() const {
243  // QUIC streams aren't considered reusable.
244  return false;
245}
246
247bool QuicHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
248  // TODO(mmenke):  Figure out what to do here.
249  return true;
250}
251
252void QuicHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
253  DCHECK(stream_);
254  stream_->GetSSLInfo(ssl_info);
255}
256
257void QuicHttpStream::GetSSLCertRequestInfo(
258    SSLCertRequestInfo* cert_request_info) {
259  DCHECK(stream_);
260  NOTIMPLEMENTED();
261}
262
263bool QuicHttpStream::IsSpdyHttpStream() const {
264  return false;
265}
266
267void QuicHttpStream::Drain(HttpNetworkSession* session) {
268  Close(false);
269  delete this;
270}
271
272void QuicHttpStream::SetPriority(RequestPriority priority) {
273  priority_ = priority;
274}
275
276int QuicHttpStream::OnSendData() {
277  // TODO(rch): Change QUIC IO to provide notifications to the streams.
278  NOTREACHED();
279  return OK;
280}
281
282int QuicHttpStream::OnSendDataComplete(int status, bool* eof) {
283  // TODO(rch): Change QUIC IO to provide notifications to the streams.
284  NOTREACHED();
285  return OK;
286}
287
288int QuicHttpStream::OnDataReceived(const char* data, int length) {
289  DCHECK_NE(0, length);
290  // Are we still reading the response headers.
291  if (!response_headers_received_) {
292    // Grow the read buffer if necessary.
293    if (read_buf_->RemainingCapacity() < length) {
294      size_t additional_capacity = length - read_buf_->RemainingCapacity();
295      if (additional_capacity < kHeaderBufInitialSize)
296        additional_capacity = kHeaderBufInitialSize;
297      read_buf_->SetCapacity(read_buf_->capacity() + additional_capacity);
298    }
299    memcpy(read_buf_->data(), data, length);
300    read_buf_->set_offset(read_buf_->offset() + length);
301    int rv = ParseResponseHeaders();
302    if (rv != ERR_IO_PENDING && !callback_.is_null()) {
303      DoCallback(rv);
304    }
305    return OK;
306  }
307
308  if (callback_.is_null()) {
309    BufferResponseBody(data, length);
310    return OK;
311  }
312
313  if (length <= user_buffer_len_) {
314    memcpy(user_buffer_->data(), data, length);
315  } else {
316    memcpy(user_buffer_->data(), data, user_buffer_len_);
317    int delta = length - user_buffer_len_;
318    BufferResponseBody(data + user_buffer_len_, delta);
319  }
320  user_buffer_ = NULL;
321  user_buffer_len_ = 0;
322  DoCallback(length);
323  return OK;
324}
325
326void QuicHttpStream::OnClose(QuicErrorCode error) {
327  if (error != QUIC_NO_ERROR) {
328    response_status_ = ERR_QUIC_PROTOCOL_ERROR;
329  } else if (!response_headers_received_) {
330    response_status_ = ERR_ABORTED;
331  }
332
333  stream_ = NULL;
334  if (!callback_.is_null())
335    DoCallback(response_status_);
336}
337
338void QuicHttpStream::OnError(int error) {
339  stream_ = NULL;
340  response_status_ = error;
341  if (!callback_.is_null())
342    DoCallback(response_status_);
343}
344
345bool QuicHttpStream::HasSendHeadersComplete() {
346  return next_state_ > STATE_SEND_HEADERS_COMPLETE;
347}
348
349void QuicHttpStream::OnIOComplete(int rv) {
350  rv = DoLoop(rv);
351
352  if (rv != ERR_IO_PENDING && !callback_.is_null()) {
353    DoCallback(rv);
354  }
355}
356
357void QuicHttpStream::DoCallback(int rv) {
358  CHECK_NE(rv, ERR_IO_PENDING);
359  CHECK(!callback_.is_null());
360
361  // The client callback can do anything, including destroying this class,
362  // so any pending callback must be issued after everything else is done.
363  base::ResetAndReturn(&callback_).Run(rv);
364}
365
366int QuicHttpStream::DoLoop(int rv) {
367  do {
368    State state = next_state_;
369    next_state_ = STATE_NONE;
370    switch (state) {
371      case STATE_SEND_HEADERS:
372        CHECK_EQ(OK, rv);
373        rv = DoSendHeaders();
374        break;
375      case STATE_SEND_HEADERS_COMPLETE:
376        rv = DoSendHeadersComplete(rv);
377        break;
378      case STATE_READ_REQUEST_BODY:
379        CHECK_EQ(OK, rv);
380        rv = DoReadRequestBody();
381        break;
382      case STATE_READ_REQUEST_BODY_COMPLETE:
383        rv = DoReadRequestBodyComplete(rv);
384        break;
385      case STATE_SEND_BODY:
386        CHECK_EQ(OK, rv);
387        rv = DoSendBody();
388        break;
389      case STATE_SEND_BODY_COMPLETE:
390        rv = DoSendBodyComplete(rv);
391        break;
392      case STATE_OPEN:
393        CHECK_EQ(OK, rv);
394        break;
395      default:
396        NOTREACHED() << "next_state_: " << next_state_;
397        break;
398    }
399  } while (next_state_ != STATE_NONE && next_state_ != STATE_OPEN &&
400           rv != ERR_IO_PENDING);
401
402  return rv;
403}
404
405int QuicHttpStream::DoSendHeaders() {
406  if (!stream_)
407    return ERR_UNEXPECTED;
408
409  bool has_upload_data = request_body_stream_ != NULL;
410
411  next_state_ = STATE_SEND_HEADERS_COMPLETE;
412  return stream_->WriteStreamData(
413      request_, !has_upload_data,
414      base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr()));
415}
416
417int QuicHttpStream::DoSendHeadersComplete(int rv) {
418  if (rv < 0)
419    return rv;
420
421  next_state_ = request_body_stream_ ?
422      STATE_READ_REQUEST_BODY : STATE_OPEN;
423
424  return OK;
425}
426
427int QuicHttpStream::DoReadRequestBody() {
428  next_state_ = STATE_READ_REQUEST_BODY_COMPLETE;
429  return request_body_stream_->Read(
430      raw_request_body_buf_.get(),
431      raw_request_body_buf_->size(),
432      base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr()));
433}
434
435int QuicHttpStream::DoReadRequestBodyComplete(int rv) {
436  // |rv| is the result of read from the request body from the last call to
437  // DoSendBody().
438  if (rv < 0)
439    return rv;
440
441  request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), rv);
442  if (rv == 0) {  // Reached the end.
443    DCHECK(request_body_stream_->IsEOF());
444  }
445
446  next_state_ = STATE_SEND_BODY;
447  return OK;
448}
449
450int QuicHttpStream::DoSendBody() {
451  if (!stream_)
452    return ERR_UNEXPECTED;
453
454  CHECK(request_body_stream_);
455  CHECK(request_body_buf_.get());
456  const bool eof = request_body_stream_->IsEOF();
457  int len = request_body_buf_->BytesRemaining();
458  if (len > 0 || eof) {
459    next_state_ = STATE_SEND_BODY_COMPLETE;
460    base::StringPiece data(request_body_buf_->data(), len);
461    return stream_->WriteStreamData(
462        data, eof,
463        base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr()));
464  }
465
466  next_state_ = STATE_OPEN;
467  return OK;
468}
469
470int QuicHttpStream::DoSendBodyComplete(int rv) {
471  if (rv < 0)
472    return rv;
473
474  request_body_buf_->DidConsume(request_body_buf_->BytesRemaining());
475
476  if (!request_body_stream_->IsEOF()) {
477    next_state_ = STATE_READ_REQUEST_BODY;
478    return OK;
479  }
480
481  next_state_ = STATE_OPEN;
482  return OK;
483}
484
485int QuicHttpStream::ParseResponseHeaders() {
486  size_t read_buf_len = static_cast<size_t>(read_buf_->offset());
487  SpdyFramer framer(SPDY3);
488  SpdyHeaderBlock headers;
489  char* data = read_buf_->StartOfBuffer();
490  size_t len = framer.ParseHeaderBlockInBuffer(data, read_buf_->offset(),
491                                               &headers);
492
493  if (len == 0) {
494    return ERR_IO_PENDING;
495  }
496
497  // Save the remaining received data.
498  size_t delta = read_buf_len - len;
499  if (delta > 0) {
500    BufferResponseBody(data + len, delta);
501  }
502
503  // The URLRequest logs these headers, so only log to the QuicSession's
504  // net log.
505  stream_->net_log().AddEvent(
506      NetLog::TYPE_QUIC_HTTP_STREAM_READ_RESPONSE_HEADERS,
507      base::Bind(&SpdyHeaderBlockNetLogCallback, &headers));
508
509  if (!SpdyHeadersToHttpResponse(headers, 3, response_info_)) {
510    DLOG(WARNING) << "Invalid headers";
511    return ERR_QUIC_PROTOCOL_ERROR;
512  }
513  // Put the peer's IP address and port into the response.
514  IPEndPoint address = stream_->GetPeerAddress();
515  response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
516  response_info_->connection_info =
517      HttpResponseInfo::CONNECTION_INFO_QUIC1_SPDY3;
518  response_info_->vary_data
519      .Init(*request_info_, *response_info_->headers.get());
520  response_info_->was_npn_negotiated = true;
521  response_info_->npn_negotiated_protocol = "quic/1+spdy/3";
522  response_headers_received_ = true;
523
524  return OK;
525}
526
527void QuicHttpStream::BufferResponseBody(const char* data, int length) {
528  if (length == 0)
529    return;
530  IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
531  memcpy(io_buffer->data(), data, length);
532  response_body_.push_back(make_scoped_refptr(io_buffer));
533}
534
535}  // namespace net
536