1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_stream.h"
6
7#include "base/logging.h"
8#include "base/message_loop.h"
9#include "base/values.h"
10#include "net/spdy/spdy_session.h"
11
12namespace net {
13
14namespace {
15
16class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters {
17 public:
18  NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,
19                                        int delta,
20                                        int window_size)
21      : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
22  virtual Value* ToValue() const {
23    DictionaryValue* dict = new DictionaryValue();
24    dict->SetInteger("id", static_cast<int>(stream_id_));
25    dict->SetInteger("delta", delta_);
26    dict->SetInteger("window_size", window_size_);
27    return dict;
28  }
29 private:
30  const spdy::SpdyStreamId stream_id_;
31  const int delta_;
32  const int window_size_;
33  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter);
34};
35
36}
37
38SpdyStream::SpdyStream(SpdySession* session,
39                       spdy::SpdyStreamId stream_id,
40                       bool pushed,
41                       const BoundNetLog& net_log)
42    : continue_buffering_data_(true),
43      stream_id_(stream_id),
44      priority_(0),
45      stalled_by_flow_control_(false),
46      send_window_size_(spdy::kSpdyStreamInitialWindowSize),
47      recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
48      pushed_(pushed),
49      response_received_(false),
50      session_(session),
51      delegate_(NULL),
52      request_time_(base::Time::Now()),
53      response_(new spdy::SpdyHeaderBlock),
54      io_state_(STATE_NONE),
55      response_status_(OK),
56      cancelled_(false),
57      has_upload_data_(false),
58      net_log_(net_log),
59      send_bytes_(0),
60      recv_bytes_(0) {
61}
62
63SpdyStream::~SpdyStream() {
64  UpdateHistograms();
65}
66
67void SpdyStream::SetDelegate(Delegate* delegate) {
68  CHECK(delegate);
69  delegate_ = delegate;
70
71  if (pushed_) {
72    CHECK(response_received());
73    MessageLoop::current()->PostTask(
74        FROM_HERE, NewRunnableMethod(this,
75                                     &SpdyStream::PushedStreamReplayData));
76  } else {
77    continue_buffering_data_ = false;
78  }
79}
80
81void SpdyStream::PushedStreamReplayData() {
82  if (cancelled_ || !delegate_)
83    return;
84
85  continue_buffering_data_ = false;
86
87  int rv = delegate_->OnResponseReceived(*response_, response_time_, OK);
88  if (rv == ERR_INCOMPLETE_SPDY_HEADERS) {
89    // We don't have complete headers.  Assume we're waiting for another
90    // HEADERS frame.  Since we don't have headers, we had better not have
91    // any pending data frames.
92    DCHECK_EQ(0U, pending_buffers_.size());
93    return;
94  }
95
96  std::vector<scoped_refptr<IOBufferWithSize> > buffers;
97  buffers.swap(pending_buffers_);
98  for (size_t i = 0; i < buffers.size(); ++i) {
99    // It is always possible that a callback to the delegate results in
100    // the delegate no longer being available.
101    if (!delegate_)
102      break;
103    if (buffers[i]) {
104      delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
105    } else {
106      delegate_->OnDataReceived(NULL, 0);
107      session_->CloseStream(stream_id_, net::OK);
108      // Note: |this| may be deleted after calling CloseStream.
109      DCHECK_EQ(buffers.size() - 1, i);
110    }
111  }
112}
113
114void SpdyStream::DetachDelegate() {
115  if (delegate_)
116    delegate_->set_chunk_callback(NULL);
117  delegate_ = NULL;
118  if (!closed())
119    Cancel();
120}
121
122const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const {
123  return request_;
124}
125
126void SpdyStream::set_spdy_headers(
127    const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
128  request_ = headers;
129}
130
131void SpdyStream::IncreaseSendWindowSize(int delta_window_size) {
132  DCHECK_GE(delta_window_size, 1);
133  int new_window_size = send_window_size_ + delta_window_size;
134
135  // We should ignore WINDOW_UPDATEs received before or after this state,
136  // since before means we've not written SYN_STREAM yet (i.e. it's too
137  // early) and after means we've written a DATA frame with FIN bit.
138  if (io_state_ != STATE_SEND_BODY_COMPLETE)
139    return;
140
141  // it's valid for send_window_size_ to become negative (via an incoming
142  // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make
143  // it positive; however, if send_window_size_ is positive and incoming
144  // WINDOW_UPDATE makes it negative, we have an overflow.
145  if (send_window_size_ > 0 && new_window_size < 0) {
146    LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size
147                 << "] for stream " << stream_id_
148                 << " overflows send_window_size_ [current:"
149                 << send_window_size_ << "]";
150    session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
151    return;
152  }
153
154  send_window_size_ = new_window_size;
155
156  net_log_.AddEvent(
157      NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
158      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
159          stream_id_, delta_window_size, send_window_size_)));
160  if (stalled_by_flow_control_) {
161    stalled_by_flow_control_ = false;
162    io_state_ = STATE_SEND_BODY;
163    DoLoop(OK);
164  }
165}
166
167void SpdyStream::DecreaseSendWindowSize(int delta_window_size) {
168  // we only call this method when sending a frame, therefore
169  // |delta_window_size| should be within the valid frame size range.
170  DCHECK_GE(delta_window_size, 1);
171  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
172
173  // |send_window_size_| should have been at least |delta_window_size| for
174  // this call to happen.
175  DCHECK_GE(send_window_size_, delta_window_size);
176
177  send_window_size_ -= delta_window_size;
178
179  net_log_.AddEvent(
180      NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
181      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
182          stream_id_, -delta_window_size, send_window_size_)));
183}
184
185void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) {
186  DCHECK_GE(delta_window_size, 1);
187  // By the time a read is isued, stream may become inactive.
188  if (!session_->IsStreamActive(stream_id_))
189    return;
190  int new_window_size = recv_window_size_ + delta_window_size;
191  if (recv_window_size_ > 0)
192    DCHECK(new_window_size > 0);
193
194  recv_window_size_ = new_window_size;
195  net_log_.AddEvent(
196      NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
197      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
198          stream_id_, delta_window_size, recv_window_size_)));
199  session_->SendWindowUpdate(stream_id_, delta_window_size);
200}
201
202void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) {
203  DCHECK_GE(delta_window_size, 1);
204
205  recv_window_size_ -= delta_window_size;
206  net_log_.AddEvent(
207      NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
208      make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
209          stream_id_, -delta_window_size, recv_window_size_)));
210
211  // Since we never decrease the initial window size, we should never hit
212  // a negative |recv_window_size_|, if we do, it's a flow-control violation.
213  if (recv_window_size_ < 0)
214    session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
215}
216
217int SpdyStream::GetPeerAddress(AddressList* address) const {
218  return session_->GetPeerAddress(address);
219}
220
221int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
222  return session_->GetLocalAddress(address);
223}
224
225bool SpdyStream::WasEverUsed() const {
226  return session_->WasEverUsed();
227}
228
229base::Time SpdyStream::GetRequestTime() const {
230  return request_time_;
231}
232
233void SpdyStream::SetRequestTime(base::Time t) {
234  request_time_ = t;
235}
236
237int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) {
238  int rv = OK;
239
240  metrics_.StartStream();
241
242  DCHECK(response_->empty());
243  *response_ = response;  // TODO(ukai): avoid copy.
244
245  recv_first_byte_time_ = base::TimeTicks::Now();
246  response_time_ = base::Time::Now();
247
248  // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then
249  // the server has sent the SYN_REPLY too early.
250  if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE)
251    return ERR_SPDY_PROTOCOL_ERROR;
252  if (pushed_)
253    CHECK(io_state_ == STATE_NONE);
254  io_state_ = STATE_OPEN;
255
256  if (delegate_)
257    rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
258  // If delegate_ is not yet attached, we'll call OnResponseReceived after the
259  // delegate gets attached to the stream.
260
261  return rv;
262}
263
264int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) {
265  DCHECK(!response_->empty());
266
267  // Append all the headers into the response header block.
268  for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin();
269      it != headers.end(); ++it) {
270    // Disallow duplicate headers.  This is just to be conservative.
271    if ((*response_).find(it->first) != (*response_).end()) {
272      LOG(WARNING) << "HEADERS duplicate header";
273      response_status_ = ERR_SPDY_PROTOCOL_ERROR;
274      return ERR_SPDY_PROTOCOL_ERROR;
275    }
276
277    (*response_)[it->first] = it->second;
278  }
279
280  int rv = OK;
281  if (delegate_) {
282    rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
283    // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
284    // headers before the response header block is complete.
285    if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
286      rv = OK;
287  }
288  return rv;
289}
290
291void SpdyStream::OnDataReceived(const char* data, int length) {
292  DCHECK_GE(length, 0);
293
294  // If we don't have a response, then the SYN_REPLY did not come through.
295  // We cannot pass data up to the caller unless the reply headers have been
296  // received.
297  if (!response_received()) {
298    session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
299    return;
300  }
301
302  if (!delegate_ || continue_buffering_data_) {
303    // It should be valid for this to happen in the server push case.
304    // We'll return received data when delegate gets attached to the stream.
305    if (length > 0) {
306      IOBufferWithSize* buf = new IOBufferWithSize(length);
307      memcpy(buf->data(), data, length);
308      pending_buffers_.push_back(make_scoped_refptr(buf));
309    } else {
310      pending_buffers_.push_back(NULL);
311      metrics_.StopStream();
312      // Note: we leave the stream open in the session until the stream
313      //       is claimed.
314    }
315    return;
316  }
317
318  CHECK(!closed());
319
320  // A zero-length read means that the stream is being closed.
321  if (!length) {
322    metrics_.StopStream();
323    session_->CloseStream(stream_id_, net::OK);
324    // Note: |this| may be deleted after calling CloseStream.
325    return;
326  }
327
328  if (session_->flow_control())
329    DecreaseRecvWindowSize(length);
330
331  // Track our bandwidth.
332  metrics_.RecordBytes(length);
333  recv_bytes_ += length;
334  recv_last_byte_time_ = base::TimeTicks::Now();
335
336  if (!delegate_) {
337    // It should be valid for this to happen in the server push case.
338    // We'll return received data when delegate gets attached to the stream.
339    IOBufferWithSize* buf = new IOBufferWithSize(length);
340    memcpy(buf->data(), data, length);
341    pending_buffers_.push_back(make_scoped_refptr(buf));
342    return;
343  }
344
345  delegate_->OnDataReceived(data, length);
346}
347
348// This function is only called when an entire frame is written.
349void SpdyStream::OnWriteComplete(int bytes) {
350  DCHECK_LE(0, bytes);
351  send_bytes_ += bytes;
352  if (cancelled() || closed())
353    return;
354  DoLoop(bytes);
355}
356
357void SpdyStream::OnChunkAvailable() {
358  DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY ||
359         io_state_ == STATE_SEND_BODY_COMPLETE);
360  if (io_state_ == STATE_SEND_BODY)
361    OnWriteComplete(0);
362}
363
364void SpdyStream::OnClose(int status) {
365  io_state_ = STATE_DONE;
366  response_status_ = status;
367  Delegate* delegate = delegate_;
368  delegate_ = NULL;
369  if (delegate) {
370    delegate->set_chunk_callback(NULL);
371    delegate->OnClose(status);
372  }
373}
374
375void SpdyStream::Cancel() {
376  if (cancelled())
377    return;
378
379  cancelled_ = true;
380  if (session_->IsStreamActive(stream_id_))
381    session_->ResetStream(stream_id_, spdy::CANCEL);
382}
383
384int SpdyStream::SendRequest(bool has_upload_data) {
385  if (delegate_)
386    delegate_->set_chunk_callback(this);
387
388  // Pushed streams do not send any data, and should always be in STATE_OPEN or
389  // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push
390  // behavior.
391  has_upload_data_ = has_upload_data;
392  if (pushed_) {
393    send_time_ = base::TimeTicks::Now();
394    DCHECK(!has_upload_data_);
395    DCHECK(response_received());
396    return ERR_IO_PENDING;
397  }
398  CHECK_EQ(STATE_NONE, io_state_);
399  io_state_ = STATE_SEND_HEADERS;
400  return DoLoop(OK);
401}
402
403int SpdyStream::WriteStreamData(IOBuffer* data, int length,
404                                spdy::SpdyDataFlags flags) {
405  return session_->WriteStreamData(stream_id_, data, length, flags);
406}
407
408bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
409  return session_->GetSSLInfo(ssl_info, was_npn_negotiated);
410}
411
412bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
413  return session_->GetSSLCertRequestInfo(cert_request_info);
414}
415
416bool SpdyStream::HasUrl() const {
417  if (pushed_)
418    return response_received();
419  return request_.get() != NULL;
420}
421
422GURL SpdyStream::GetUrl() const {
423  DCHECK(HasUrl());
424
425  if (pushed_) {
426    // assemble from the response
427    std::string url;
428    spdy::SpdyHeaderBlock::const_iterator it;
429    it = response_->find("url");
430    if (it != (*response_).end())
431      url = it->second;
432    return GURL(url);
433  }
434
435  // assemble from the request
436  std::string scheme;
437  std::string host_port;
438  std::string path;
439  spdy::SpdyHeaderBlock::const_iterator it;
440  it = request_->find("scheme");
441  if (it != (*request_).end())
442    scheme = it->second;
443  it = request_->find("host");
444  if (it != (*request_).end())
445    host_port = it->second;
446  it = request_->find("path");
447  if (it != (*request_).end())
448    path = it->second;
449  std::string url = scheme + "://" + host_port + path;
450  return GURL(url);
451}
452
453int SpdyStream::DoLoop(int result) {
454  do {
455    State state = io_state_;
456    io_state_ = STATE_NONE;
457    switch (state) {
458      // State machine 1: Send headers and body.
459      case STATE_SEND_HEADERS:
460        CHECK_EQ(OK, result);
461        result = DoSendHeaders();
462        break;
463      case STATE_SEND_HEADERS_COMPLETE:
464        result = DoSendHeadersComplete(result);
465        break;
466      case STATE_SEND_BODY:
467        CHECK_EQ(OK, result);
468        result = DoSendBody();
469        break;
470      case STATE_SEND_BODY_COMPLETE:
471        result = DoSendBodyComplete(result);
472        break;
473      // This is an intermediary waiting state. This state is reached when all
474      // data has been sent, but no data has been received.
475      case STATE_WAITING_FOR_RESPONSE:
476        io_state_ = STATE_WAITING_FOR_RESPONSE;
477        result = ERR_IO_PENDING;
478        break;
479      // State machine 2: connection is established.
480      // In STATE_OPEN, OnResponseReceived has already been called.
481      // OnDataReceived, OnClose and OnWriteCompelte can be called.
482      // Only OnWriteCompletee calls DoLoop(().
483      //
484      // For HTTP streams, no data is sent from the client while in the OPEN
485      // state, so OnWriteComplete is never called here.  The HTTP body is
486      // handled in the OnDataReceived callback, which does not call into
487      // DoLoop.
488      //
489      // For WebSocket streams, which are bi-directional, we'll send and
490      // receive data once the connection is established.  Received data is
491      // handled in OnDataReceived.  Sent data is handled in OnWriteComplete,
492      // which calls DoOpen().
493      case STATE_OPEN:
494        result = DoOpen(result);
495        break;
496
497      case STATE_DONE:
498        DCHECK(result != ERR_IO_PENDING);
499        break;
500      default:
501        NOTREACHED() << io_state_;
502        break;
503    }
504  } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
505           io_state_ != STATE_OPEN);
506
507  return result;
508}
509
510int SpdyStream::DoSendHeaders() {
511  CHECK(!cancelled_);
512
513  spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE;
514  if (!has_upload_data_)
515    flags = spdy::CONTROL_FLAG_FIN;
516
517  CHECK(request_.get());
518  int result = session_->WriteSynStream(
519      stream_id_, static_cast<RequestPriority>(priority_), flags,
520      request_);
521  if (result != ERR_IO_PENDING)
522    return result;
523
524  send_time_ = base::TimeTicks::Now();
525  io_state_ = STATE_SEND_HEADERS_COMPLETE;
526  return ERR_IO_PENDING;
527}
528
529int SpdyStream::DoSendHeadersComplete(int result) {
530  if (result < 0)
531    return result;
532
533  CHECK_GT(result, 0);
534
535  if (!delegate_)
536    return ERR_UNEXPECTED;
537
538  // There is no body, skip that state.
539  if (delegate_->OnSendHeadersComplete(result)) {
540    io_state_ = STATE_WAITING_FOR_RESPONSE;
541    return OK;
542  }
543
544  io_state_ = STATE_SEND_BODY;
545  return OK;
546}
547
548// DoSendBody is called to send the optional body for the request.  This call
549// will also be called as each write of a chunk of the body completes.
550int SpdyStream::DoSendBody() {
551  // If we're already in the STATE_SENDING_BODY state, then we've already
552  // sent a portion of the body.  In that case, we need to first consume
553  // the bytes written in the body stream.  Note that the bytes written is
554  // the number of bytes in the frame that were written, only consume the
555  // data portion, of course.
556  io_state_ = STATE_SEND_BODY_COMPLETE;
557  if (!delegate_)
558    return ERR_UNEXPECTED;
559  return delegate_->OnSendBody();
560}
561
562int SpdyStream::DoSendBodyComplete(int result) {
563  if (result < 0)
564    return result;
565
566  if (!delegate_)
567    return ERR_UNEXPECTED;
568
569  bool eof = false;
570  result = delegate_->OnSendBodyComplete(result, &eof);
571  if (!eof)
572    io_state_ = STATE_SEND_BODY;
573  else
574    io_state_ = STATE_WAITING_FOR_RESPONSE;
575
576  return result;
577}
578
579int SpdyStream::DoOpen(int result) {
580  if (delegate_)
581    delegate_->OnDataSent(result);
582  io_state_ = STATE_OPEN;
583  return result;
584}
585
586void SpdyStream::UpdateHistograms() {
587  // We need all timers to be filled in, otherwise metrics can be bogus.
588  if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
589      recv_last_byte_time_.is_null())
590    return;
591
592  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
593      recv_first_byte_time_ - send_time_);
594  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
595      recv_last_byte_time_ - recv_first_byte_time_);
596  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
597      recv_last_byte_time_ - send_time_);
598
599  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
600  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
601}
602
603}  // namespace net
604