1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_http_stream.h"
6
7#include <algorithm>
8#include <list>
9
10#include "base/bind.h"
11#include "base/logging.h"
12#include "base/message_loop/message_loop.h"
13#include "base/strings/stringprintf.h"
14#include "net/base/host_port_pair.h"
15#include "net/base/net_log.h"
16#include "net/base/net_util.h"
17#include "net/base/upload_data_stream.h"
18#include "net/http/http_request_headers.h"
19#include "net/http/http_request_info.h"
20#include "net/http/http_response_info.h"
21#include "net/spdy/spdy_header_block.h"
22#include "net/spdy/spdy_http_utils.h"
23#include "net/spdy/spdy_protocol.h"
24#include "net/spdy/spdy_session.h"
25
26namespace net {
27
28SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
29                               bool direct)
30    : weak_factory_(this),
31      spdy_session_(spdy_session),
32      is_reused_(spdy_session_->IsReused()),
33      stream_closed_(false),
34      closed_stream_status_(ERR_FAILED),
35      closed_stream_id_(0),
36      request_info_(NULL),
37      response_info_(NULL),
38      response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
39      user_buffer_len_(0),
40      request_body_buf_size_(0),
41      buffered_read_callback_pending_(false),
42      more_read_data_pending_(false),
43      direct_(direct) {
44  DCHECK(spdy_session_.get());
45}
46
47SpdyHttpStream::~SpdyHttpStream() {
48  if (stream_.get()) {
49    stream_->DetachDelegate();
50    DCHECK(!stream_.get());
51  }
52}
53
54int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
55                                     RequestPriority priority,
56                                     const BoundNetLog& stream_net_log,
57                                     const CompletionCallback& callback) {
58  DCHECK(!stream_);
59  if (!spdy_session_)
60    return ERR_CONNECTION_CLOSED;
61
62  request_info_ = request_info;
63  if (request_info_->method == "GET") {
64    int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
65                                             stream_net_log);
66    if (error != OK)
67      return error;
68
69    // |stream_| may be NULL even if OK was returned.
70    if (stream_.get()) {
71      DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
72      stream_->SetDelegate(this);
73      return OK;
74    }
75  }
76
77  int rv = stream_request_.StartRequest(
78      SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
79      priority, stream_net_log,
80      base::Bind(&SpdyHttpStream::OnStreamCreated,
81                 weak_factory_.GetWeakPtr(), callback));
82
83  if (rv == OK) {
84    stream_ = stream_request_.ReleaseStream();
85    stream_->SetDelegate(this);
86  }
87
88  return rv;
89}
90
91const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
92  return response_info_;
93}
94
95UploadProgress SpdyHttpStream::GetUploadProgress() const {
96  if (!request_info_ || !HasUploadData())
97    return UploadProgress();
98
99  return UploadProgress(request_info_->upload_data_stream->position(),
100                        request_info_->upload_data_stream->size());
101}
102
103int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
104  CHECK(!callback.is_null());
105  if (stream_closed_)
106    return closed_stream_status_;
107
108  CHECK(stream_.get());
109
110  // Check if we already have the response headers. If so, return synchronously.
111  if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
112    CHECK(stream_->IsIdle());
113    return OK;
114  }
115
116  // Still waiting for the response, return IO_PENDING.
117  CHECK(callback_.is_null());
118  callback_ = callback;
119  return ERR_IO_PENDING;
120}
121
122int SpdyHttpStream::ReadResponseBody(
123    IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
124  if (stream_.get())
125    CHECK(stream_->IsIdle());
126
127  CHECK(buf);
128  CHECK(buf_len);
129  CHECK(!callback.is_null());
130
131  // If we have data buffered, complete the IO immediately.
132  if (!response_body_queue_.IsEmpty()) {
133    return response_body_queue_.Dequeue(buf->data(), buf_len);
134  } else if (stream_closed_) {
135    return closed_stream_status_;
136  }
137
138  CHECK(callback_.is_null());
139  CHECK(!user_buffer_.get());
140  CHECK_EQ(0, user_buffer_len_);
141
142  callback_ = callback;
143  user_buffer_ = buf;
144  user_buffer_len_ = buf_len;
145  return ERR_IO_PENDING;
146}
147
148void SpdyHttpStream::Close(bool not_reusable) {
149  // Note: the not_reusable flag has no meaning for SPDY streams.
150
151  Cancel();
152  DCHECK(!stream_.get());
153}
154
155HttpStream* SpdyHttpStream::RenewStreamForAuth() {
156  return NULL;
157}
158
159bool SpdyHttpStream::IsResponseBodyComplete() const {
160  return stream_closed_;
161}
162
163bool SpdyHttpStream::CanFindEndOfResponse() const {
164  return true;
165}
166
167bool SpdyHttpStream::IsConnectionReused() const {
168  return is_reused_;
169}
170
171void SpdyHttpStream::SetConnectionReused() {
172  // SPDY doesn't need an indicator here.
173}
174
175bool SpdyHttpStream::IsConnectionReusable() const {
176  // SPDY streams aren't considered reusable.
177  return false;
178}
179
180bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
181  if (stream_closed_) {
182    if (!closed_stream_has_load_timing_info_)
183      return false;
184    *load_timing_info = closed_stream_load_timing_info_;
185    return true;
186  }
187
188  // If |stream_| has yet to be created, or does not yet have an ID, fail.
189  // The reused flag can only be correctly set once a stream has an ID.  Streams
190  // get their IDs once the request has been successfully sent, so this does not
191  // behave that differently from other stream types.
192  if (!stream_ || stream_->stream_id() == 0)
193    return false;
194
195  return stream_->GetLoadTimingInfo(load_timing_info);
196}
197
198int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
199                                HttpResponseInfo* response,
200                                const CompletionCallback& callback) {
201  if (stream_closed_) {
202    if (stream_->type() == SPDY_PUSH_STREAM)
203      return closed_stream_status_;
204
205    return (closed_stream_status_ == OK) ? ERR_FAILED : closed_stream_status_;
206  }
207
208  base::Time request_time = base::Time::Now();
209  CHECK(stream_.get());
210
211  stream_->SetRequestTime(request_time);
212  // This should only get called in the case of a request occurring
213  // during server push that has already begun but hasn't finished,
214  // so we set the response's request time to be the actual one
215  if (response_info_)
216    response_info_->request_time = request_time;
217
218  CHECK(!request_body_buf_.get());
219  if (HasUploadData()) {
220    // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
221    // body data is written with this size at a time.
222    request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
223    // The request body buffer is empty at first.
224    request_body_buf_size_ = 0;
225  }
226
227  CHECK(!callback.is_null());
228  CHECK(response);
229
230  // SendRequest can be called in two cases.
231  //
232  // a) A client initiated request. In this case, |response_info_| should be
233  //    NULL to start with.
234  // b) A client request which matches a response that the server has already
235  //    pushed.
236  if (push_response_info_.get()) {
237    *response = *(push_response_info_.get());
238    push_response_info_.reset();
239  } else {
240    DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
241  }
242
243  response_info_ = response;
244
245  // Put the peer's IP address and port into the response.
246  IPEndPoint address;
247  int result = stream_->GetPeerAddress(&address);
248  if (result != OK)
249    return result;
250  response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
251
252  if (stream_->type() == SPDY_PUSH_STREAM) {
253    // Pushed streams do not send any data, and should always be
254    // idle. However, we still want to return ERR_IO_PENDING to mimic
255    // non-push behavior. The callback will be called when the
256    // response is received.
257    result = ERR_IO_PENDING;
258  } else {
259    scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
260    CreateSpdyHeadersFromHttpRequest(
261        *request_info_, request_headers,
262        headers.get(), stream_->GetProtocolVersion(),
263        direct_);
264    stream_->net_log().AddEvent(
265        NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
266        base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
267    result =
268        stream_->SendRequestHeaders(
269            headers.Pass(),
270            HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
271  }
272
273  if (result == ERR_IO_PENDING) {
274    CHECK(callback_.is_null());
275    callback_ = callback;
276  }
277  return result;
278}
279
280void SpdyHttpStream::Cancel() {
281  callback_.Reset();
282  if (stream_.get()) {
283    stream_->Cancel();
284    DCHECK(!stream_.get());
285  }
286}
287
288void SpdyHttpStream::OnRequestHeadersSent() {
289  if (!callback_.is_null())
290    DoCallback(OK);
291
292  // TODO(akalin): Do this immediately after sending the request
293  // headers.
294  if (HasUploadData())
295    ReadAndSendRequestBodyData();
296}
297
298SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
299    const SpdyHeaderBlock& response_headers) {
300  CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
301
302  if (!response_info_) {
303    DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
304    push_response_info_.reset(new HttpResponseInfo);
305    response_info_ = push_response_info_.get();
306  }
307
308  if (!SpdyHeadersToHttpResponse(
309          response_headers, stream_->GetProtocolVersion(), response_info_)) {
310    // We do not have complete headers yet.
311    return RESPONSE_HEADERS_ARE_INCOMPLETE;
312  }
313
314  response_info_->response_time = stream_->response_time();
315  response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
316  // Don't store the SSLInfo in the response here, HttpNetworkTransaction
317  // will take care of that part.
318  SSLInfo ssl_info;
319  NextProto protocol_negotiated = kProtoUnknown;
320  stream_->GetSSLInfo(&ssl_info,
321                      &response_info_->was_npn_negotiated,
322                      &protocol_negotiated);
323  response_info_->npn_negotiated_protocol =
324      SSLClientSocket::NextProtoToString(protocol_negotiated);
325  response_info_->request_time = stream_->GetRequestTime();
326  response_info_->connection_info =
327      HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
328  response_info_->vary_data
329      .Init(*request_info_, *response_info_->headers.get());
330
331  if (!callback_.is_null())
332    DoCallback(OK);
333
334  return RESPONSE_HEADERS_ARE_COMPLETE;
335}
336
337void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
338  CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
339
340  // Note that data may be received for a SpdyStream prior to the user calling
341  // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
342  // happen for server initiated streams.
343  DCHECK(stream_.get());
344  DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
345  if (buffer) {
346    response_body_queue_.Enqueue(buffer.Pass());
347
348    if (user_buffer_.get()) {
349      // Handing small chunks of data to the caller creates measurable overhead.
350      // We buffer data in short time-spans and send a single read notification.
351      ScheduleBufferedReadCallback();
352    }
353  }
354}
355
356void SpdyHttpStream::OnDataSent() {
357  request_body_buf_size_ = 0;
358  ReadAndSendRequestBodyData();
359}
360
361void SpdyHttpStream::OnClose(int status) {
362  if (stream_.get()) {
363    stream_closed_ = true;
364    closed_stream_status_ = status;
365    closed_stream_id_ = stream_->stream_id();
366    closed_stream_has_load_timing_info_ =
367        stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
368  }
369  stream_.reset();
370  bool invoked_callback = false;
371  if (status == net::OK) {
372    // We need to complete any pending buffered read now.
373    invoked_callback = DoBufferedReadCallback();
374  }
375  if (!invoked_callback && !callback_.is_null())
376    DoCallback(status);
377}
378
379bool SpdyHttpStream::HasUploadData() const {
380  CHECK(request_info_);
381  return
382      request_info_->upload_data_stream &&
383      ((request_info_->upload_data_stream->size() > 0) ||
384       request_info_->upload_data_stream->is_chunked());
385}
386
387void SpdyHttpStream::OnStreamCreated(
388    const CompletionCallback& callback,
389    int rv) {
390  if (rv == OK) {
391    stream_ = stream_request_.ReleaseStream();
392    stream_->SetDelegate(this);
393  }
394  callback.Run(rv);
395}
396
397void SpdyHttpStream::ReadAndSendRequestBodyData() {
398  CHECK(HasUploadData());
399  CHECK_EQ(request_body_buf_size_, 0);
400
401  if (request_info_->upload_data_stream->IsEOF())
402    return;
403
404  // Read the data from the request body stream.
405  const int rv = request_info_->upload_data_stream
406      ->Read(request_body_buf_.get(),
407             request_body_buf_->size(),
408             base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
409                        weak_factory_.GetWeakPtr()));
410
411  if (rv != ERR_IO_PENDING) {
412    // ERR_IO_PENDING is the only possible error.
413    CHECK_GE(rv, 0);
414    OnRequestBodyReadCompleted(rv);
415  }
416}
417
418void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
419  CHECK_GE(status, 0);
420  request_body_buf_size_ = status;
421  const bool eof = request_info_->upload_data_stream->IsEOF();
422  if (eof) {
423    CHECK_GE(request_body_buf_size_, 0);
424  } else {
425    CHECK_GT(request_body_buf_size_, 0);
426  }
427  stream_->SendData(request_body_buf_.get(),
428                    request_body_buf_size_,
429                    eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
430}
431
432void SpdyHttpStream::ScheduleBufferedReadCallback() {
433  // If there is already a scheduled DoBufferedReadCallback, don't issue
434  // another one.  Mark that we have received more data and return.
435  if (buffered_read_callback_pending_) {
436    more_read_data_pending_ = true;
437    return;
438  }
439
440  more_read_data_pending_ = false;
441  buffered_read_callback_pending_ = true;
442  const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
443  base::MessageLoop::current()->PostDelayedTask(
444      FROM_HERE,
445      base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
446                 weak_factory_.GetWeakPtr()),
447      kBufferTime);
448}
449
450// Checks to see if we should wait for more buffered data before notifying
451// the caller.  Returns true if we should wait, false otherwise.
452bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
453  // If the response is complete, there is no point in waiting.
454  if (stream_closed_)
455    return false;
456
457  DCHECK_GT(user_buffer_len_, 0);
458  return response_body_queue_.GetTotalSize() <
459      static_cast<size_t>(user_buffer_len_);
460}
461
462bool SpdyHttpStream::DoBufferedReadCallback() {
463  buffered_read_callback_pending_ = false;
464
465  // If the transaction is cancelled or errored out, we don't need to complete
466  // the read.
467  if (!stream_.get() && !stream_closed_)
468    return false;
469
470  int stream_status =
471      stream_closed_ ? closed_stream_status_ : stream_->response_status();
472  if (stream_status != OK)
473    return false;
474
475  // When more_read_data_pending_ is true, it means that more data has
476  // arrived since we started waiting.  Wait a little longer and continue
477  // to buffer.
478  if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
479    ScheduleBufferedReadCallback();
480    return false;
481  }
482
483  int rv = 0;
484  if (user_buffer_.get()) {
485    rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
486    CHECK_NE(rv, ERR_IO_PENDING);
487    user_buffer_ = NULL;
488    user_buffer_len_ = 0;
489    DoCallback(rv);
490    return true;
491  }
492  return false;
493}
494
495void SpdyHttpStream::DoCallback(int rv) {
496  CHECK_NE(rv, ERR_IO_PENDING);
497  CHECK(!callback_.is_null());
498
499  // Since Run may result in being called back, clear user_callback_ in advance.
500  CompletionCallback c = callback_;
501  callback_.Reset();
502  c.Run(rv);
503}
504
505void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
506  DCHECK(stream_.get());
507  bool using_npn;
508  NextProto protocol_negotiated = kProtoUnknown;
509  stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
510}
511
512void SpdyHttpStream::GetSSLCertRequestInfo(
513    SSLCertRequestInfo* cert_request_info) {
514  DCHECK(stream_.get());
515  stream_->GetSSLCertRequestInfo(cert_request_info);
516}
517
518bool SpdyHttpStream::IsSpdyHttpStream() const {
519  return true;
520}
521
522void SpdyHttpStream::Drain(HttpNetworkSession* session) {
523  Close(false);
524  delete this;
525}
526
527}  // namespace net
528