1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_http_stream.h"
6
7#include <algorithm>
8#include <list>
9
10#include "base/bind.h"
11#include "base/logging.h"
12#include "base/message_loop/message_loop.h"
13#include "base/strings/stringprintf.h"
14#include "net/base/host_port_pair.h"
15#include "net/base/net_log.h"
16#include "net/base/net_util.h"
17#include "net/base/upload_data_stream.h"
18#include "net/http/http_request_headers.h"
19#include "net/http/http_request_info.h"
20#include "net/http/http_response_info.h"
21#include "net/spdy/spdy_header_block.h"
22#include "net/spdy/spdy_http_utils.h"
23#include "net/spdy/spdy_protocol.h"
24#include "net/spdy/spdy_session.h"
25
26namespace net {
27
28SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
29                               bool direct)
30    : spdy_session_(spdy_session),
31      is_reused_(spdy_session_->IsReused()),
32      stream_closed_(false),
33      closed_stream_status_(ERR_FAILED),
34      closed_stream_id_(0),
35      closed_stream_received_bytes_(0),
36      request_info_(NULL),
37      response_info_(NULL),
38      response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
39      user_buffer_len_(0),
40      request_body_buf_size_(0),
41      buffered_read_callback_pending_(false),
42      more_read_data_pending_(false),
43      direct_(direct),
44      weak_factory_(this) {
45  DCHECK(spdy_session_.get());
46}
47
48SpdyHttpStream::~SpdyHttpStream() {
49  if (stream_.get()) {
50    stream_->DetachDelegate();
51    DCHECK(!stream_.get());
52  }
53}
54
55int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
56                                     RequestPriority priority,
57                                     const BoundNetLog& stream_net_log,
58                                     const CompletionCallback& callback) {
59  DCHECK(!stream_);
60  if (!spdy_session_)
61    return ERR_CONNECTION_CLOSED;
62
63  request_info_ = request_info;
64  if (request_info_->method == "GET") {
65    int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
66                                             stream_net_log);
67    if (error != OK)
68      return error;
69
70    // |stream_| may be NULL even if OK was returned.
71    if (stream_.get()) {
72      DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
73      stream_->SetDelegate(this);
74      return OK;
75    }
76  }
77
78  int rv = stream_request_.StartRequest(
79      SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
80      priority, stream_net_log,
81      base::Bind(&SpdyHttpStream::OnStreamCreated,
82                 weak_factory_.GetWeakPtr(), callback));
83
84  if (rv == OK) {
85    stream_ = stream_request_.ReleaseStream();
86    stream_->SetDelegate(this);
87  }
88
89  return rv;
90}
91
92UploadProgress SpdyHttpStream::GetUploadProgress() const {
93  if (!request_info_ || !HasUploadData())
94    return UploadProgress();
95
96  return UploadProgress(request_info_->upload_data_stream->position(),
97                        request_info_->upload_data_stream->size());
98}
99
100int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
101  CHECK(!callback.is_null());
102  if (stream_closed_)
103    return closed_stream_status_;
104
105  CHECK(stream_.get());
106
107  // Check if we already have the response headers. If so, return synchronously.
108  if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
109    CHECK(!stream_->IsIdle());
110    return OK;
111  }
112
113  // Still waiting for the response, return IO_PENDING.
114  CHECK(callback_.is_null());
115  callback_ = callback;
116  return ERR_IO_PENDING;
117}
118
119int SpdyHttpStream::ReadResponseBody(
120    IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
121  if (stream_.get())
122    CHECK(!stream_->IsIdle());
123
124  CHECK(buf);
125  CHECK(buf_len);
126  CHECK(!callback.is_null());
127
128  // If we have data buffered, complete the IO immediately.
129  if (!response_body_queue_.IsEmpty()) {
130    return response_body_queue_.Dequeue(buf->data(), buf_len);
131  } else if (stream_closed_) {
132    return closed_stream_status_;
133  }
134
135  CHECK(callback_.is_null());
136  CHECK(!user_buffer_.get());
137  CHECK_EQ(0, user_buffer_len_);
138
139  callback_ = callback;
140  user_buffer_ = buf;
141  user_buffer_len_ = buf_len;
142  return ERR_IO_PENDING;
143}
144
145void SpdyHttpStream::Close(bool not_reusable) {
146  // Note: the not_reusable flag has no meaning for SPDY streams.
147
148  Cancel();
149  DCHECK(!stream_.get());
150}
151
152HttpStream* SpdyHttpStream::RenewStreamForAuth() {
153  return NULL;
154}
155
156bool SpdyHttpStream::IsResponseBodyComplete() const {
157  return stream_closed_;
158}
159
160bool SpdyHttpStream::CanFindEndOfResponse() const {
161  return true;
162}
163
164bool SpdyHttpStream::IsConnectionReused() const {
165  return is_reused_;
166}
167
168void SpdyHttpStream::SetConnectionReused() {
169  // SPDY doesn't need an indicator here.
170}
171
172bool SpdyHttpStream::IsConnectionReusable() const {
173  // SPDY streams aren't considered reusable.
174  return false;
175}
176
177int64 SpdyHttpStream::GetTotalReceivedBytes() const {
178  if (stream_closed_)
179    return closed_stream_received_bytes_;
180
181  if (!stream_)
182    return 0;
183
184  return stream_->raw_received_bytes();
185}
186
187bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
188  if (stream_closed_) {
189    if (!closed_stream_has_load_timing_info_)
190      return false;
191    *load_timing_info = closed_stream_load_timing_info_;
192    return true;
193  }
194
195  // If |stream_| has yet to be created, or does not yet have an ID, fail.
196  // The reused flag can only be correctly set once a stream has an ID.  Streams
197  // get their IDs once the request has been successfully sent, so this does not
198  // behave that differently from other stream types.
199  if (!stream_ || stream_->stream_id() == 0)
200    return false;
201
202  return stream_->GetLoadTimingInfo(load_timing_info);
203}
204
205int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
206                                HttpResponseInfo* response,
207                                const CompletionCallback& callback) {
208  if (stream_closed_) {
209    return closed_stream_status_;
210  }
211
212  base::Time request_time = base::Time::Now();
213  CHECK(stream_.get());
214
215  stream_->SetRequestTime(request_time);
216  // This should only get called in the case of a request occurring
217  // during server push that has already begun but hasn't finished,
218  // so we set the response's request time to be the actual one
219  if (response_info_)
220    response_info_->request_time = request_time;
221
222  CHECK(!request_body_buf_.get());
223  if (HasUploadData()) {
224    // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
225    // body data is written with this size at a time.
226    request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
227    // The request body buffer is empty at first.
228    request_body_buf_size_ = 0;
229  }
230
231  CHECK(!callback.is_null());
232  CHECK(response);
233
234  // SendRequest can be called in two cases.
235  //
236  // a) A client initiated request. In this case, |response_info_| should be
237  //    NULL to start with.
238  // b) A client request which matches a response that the server has already
239  //    pushed.
240  if (push_response_info_.get()) {
241    *response = *(push_response_info_.get());
242    push_response_info_.reset();
243  } else {
244    DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
245  }
246
247  response_info_ = response;
248
249  // Put the peer's IP address and port into the response.
250  IPEndPoint address;
251  int result = stream_->GetPeerAddress(&address);
252  if (result != OK)
253    return result;
254  response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
255
256  if (stream_->type() == SPDY_PUSH_STREAM) {
257    // Pushed streams do not send any data, and should always be
258    // idle. However, we still want to return ERR_IO_PENDING to mimic
259    // non-push behavior. The callback will be called when the
260    // response is received.
261    result = ERR_IO_PENDING;
262  } else {
263    scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
264    CreateSpdyHeadersFromHttpRequest(
265        *request_info_, request_headers,
266        stream_->GetProtocolVersion(), direct_,
267        headers.get());
268    stream_->net_log().AddEvent(
269        NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
270        base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
271    result =
272        stream_->SendRequestHeaders(
273            headers.Pass(),
274            HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
275  }
276
277  if (result == ERR_IO_PENDING) {
278    CHECK(callback_.is_null());
279    callback_ = callback;
280  }
281  return result;
282}
283
284void SpdyHttpStream::Cancel() {
285  callback_.Reset();
286  if (stream_.get()) {
287    stream_->Cancel();
288    DCHECK(!stream_.get());
289  }
290}
291
292void SpdyHttpStream::OnRequestHeadersSent() {
293  if (!callback_.is_null())
294    DoCallback(OK);
295
296  // TODO(akalin): Do this immediately after sending the request
297  // headers.
298  if (HasUploadData())
299    ReadAndSendRequestBodyData();
300}
301
302SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
303    const SpdyHeaderBlock& response_headers) {
304  CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
305
306  if (!response_info_) {
307    DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
308    push_response_info_.reset(new HttpResponseInfo);
309    response_info_ = push_response_info_.get();
310  }
311
312  if (!SpdyHeadersToHttpResponse(
313          response_headers, stream_->GetProtocolVersion(), response_info_)) {
314    // We do not have complete headers yet.
315    return RESPONSE_HEADERS_ARE_INCOMPLETE;
316  }
317
318  response_info_->response_time = stream_->response_time();
319  response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
320  // Don't store the SSLInfo in the response here, HttpNetworkTransaction
321  // will take care of that part.
322  SSLInfo ssl_info;
323  NextProto protocol_negotiated = kProtoUnknown;
324  stream_->GetSSLInfo(&ssl_info,
325                      &response_info_->was_npn_negotiated,
326                      &protocol_negotiated);
327  response_info_->npn_negotiated_protocol =
328      SSLClientSocket::NextProtoToString(protocol_negotiated);
329  response_info_->request_time = stream_->GetRequestTime();
330  response_info_->connection_info =
331      HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
332  response_info_->vary_data
333      .Init(*request_info_, *response_info_->headers.get());
334
335  if (!callback_.is_null())
336    DoCallback(OK);
337
338  return RESPONSE_HEADERS_ARE_COMPLETE;
339}
340
341void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
342  CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
343
344  // Note that data may be received for a SpdyStream prior to the user calling
345  // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
346  // happen for server initiated streams.
347  DCHECK(stream_.get());
348  DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
349  if (buffer) {
350    response_body_queue_.Enqueue(buffer.Pass());
351
352    if (user_buffer_.get()) {
353      // Handing small chunks of data to the caller creates measurable overhead.
354      // We buffer data in short time-spans and send a single read notification.
355      ScheduleBufferedReadCallback();
356    }
357  }
358}
359
360void SpdyHttpStream::OnDataSent() {
361  request_body_buf_size_ = 0;
362  ReadAndSendRequestBodyData();
363}
364
365void SpdyHttpStream::OnClose(int status) {
366  if (stream_.get()) {
367    stream_closed_ = true;
368    closed_stream_status_ = status;
369    closed_stream_id_ = stream_->stream_id();
370    closed_stream_has_load_timing_info_ =
371        stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
372    closed_stream_received_bytes_ = stream_->raw_received_bytes();
373  }
374  stream_.reset();
375  bool invoked_callback = false;
376  if (status == net::OK) {
377    // We need to complete any pending buffered read now.
378    invoked_callback = DoBufferedReadCallback();
379  }
380  if (!invoked_callback && !callback_.is_null())
381    DoCallback(status);
382}
383
384bool SpdyHttpStream::HasUploadData() const {
385  CHECK(request_info_);
386  return
387      request_info_->upload_data_stream &&
388      ((request_info_->upload_data_stream->size() > 0) ||
389       request_info_->upload_data_stream->is_chunked());
390}
391
392void SpdyHttpStream::OnStreamCreated(
393    const CompletionCallback& callback,
394    int rv) {
395  if (rv == OK) {
396    stream_ = stream_request_.ReleaseStream();
397    stream_->SetDelegate(this);
398  }
399  callback.Run(rv);
400}
401
402void SpdyHttpStream::ReadAndSendRequestBodyData() {
403  CHECK(HasUploadData());
404  CHECK_EQ(request_body_buf_size_, 0);
405
406  if (request_info_->upload_data_stream->IsEOF())
407    return;
408
409  // Read the data from the request body stream.
410  const int rv = request_info_->upload_data_stream
411      ->Read(request_body_buf_.get(),
412             request_body_buf_->size(),
413             base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
414                        weak_factory_.GetWeakPtr()));
415
416  if (rv != ERR_IO_PENDING) {
417    // ERR_IO_PENDING is the only possible error.
418    CHECK_GE(rv, 0);
419    OnRequestBodyReadCompleted(rv);
420  }
421}
422
423void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
424  CHECK_GE(status, 0);
425  request_body_buf_size_ = status;
426  const bool eof = request_info_->upload_data_stream->IsEOF();
427  // Only the final fame may have a length of 0.
428  if (eof) {
429    CHECK_GE(request_body_buf_size_, 0);
430  } else {
431    CHECK_GT(request_body_buf_size_, 0);
432  }
433  stream_->SendData(request_body_buf_.get(),
434                    request_body_buf_size_,
435                    eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
436}
437
438void SpdyHttpStream::ScheduleBufferedReadCallback() {
439  // If there is already a scheduled DoBufferedReadCallback, don't issue
440  // another one.  Mark that we have received more data and return.
441  if (buffered_read_callback_pending_) {
442    more_read_data_pending_ = true;
443    return;
444  }
445
446  more_read_data_pending_ = false;
447  buffered_read_callback_pending_ = true;
448  const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
449  base::MessageLoop::current()->PostDelayedTask(
450      FROM_HERE,
451      base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
452                 weak_factory_.GetWeakPtr()),
453      kBufferTime);
454}
455
456// Checks to see if we should wait for more buffered data before notifying
457// the caller.  Returns true if we should wait, false otherwise.
458bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
459  // If the response is complete, there is no point in waiting.
460  if (stream_closed_)
461    return false;
462
463  DCHECK_GT(user_buffer_len_, 0);
464  return response_body_queue_.GetTotalSize() <
465      static_cast<size_t>(user_buffer_len_);
466}
467
468bool SpdyHttpStream::DoBufferedReadCallback() {
469  buffered_read_callback_pending_ = false;
470
471  // If the transaction is cancelled or errored out, we don't need to complete
472  // the read.
473  if (!stream_.get() && !stream_closed_)
474    return false;
475
476  int stream_status =
477      stream_closed_ ? closed_stream_status_ : stream_->response_status();
478  if (stream_status != OK)
479    return false;
480
481  // When more_read_data_pending_ is true, it means that more data has
482  // arrived since we started waiting.  Wait a little longer and continue
483  // to buffer.
484  if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
485    ScheduleBufferedReadCallback();
486    return false;
487  }
488
489  int rv = 0;
490  if (user_buffer_.get()) {
491    rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
492    CHECK_NE(rv, ERR_IO_PENDING);
493    user_buffer_ = NULL;
494    user_buffer_len_ = 0;
495    DoCallback(rv);
496    return true;
497  }
498  return false;
499}
500
501void SpdyHttpStream::DoCallback(int rv) {
502  CHECK_NE(rv, ERR_IO_PENDING);
503  CHECK(!callback_.is_null());
504
505  // Since Run may result in being called back, clear user_callback_ in advance.
506  CompletionCallback c = callback_;
507  callback_.Reset();
508  c.Run(rv);
509}
510
511void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
512  DCHECK(stream_.get());
513  bool using_npn;
514  NextProto protocol_negotiated = kProtoUnknown;
515  stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
516}
517
518void SpdyHttpStream::GetSSLCertRequestInfo(
519    SSLCertRequestInfo* cert_request_info) {
520  DCHECK(stream_.get());
521  stream_->GetSSLCertRequestInfo(cert_request_info);
522}
523
524bool SpdyHttpStream::IsSpdyHttpStream() const {
525  return true;
526}
527
528void SpdyHttpStream::Drain(HttpNetworkSession* session) {
529  Close(false);
530  delete this;
531}
532
533void SpdyHttpStream::SetPriority(RequestPriority priority) {
534  // TODO(akalin): Plumb this through to |stream_request_| and
535  // |stream_|.
536}
537
538}  // namespace net
539