spdy_stream.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_stream.h"
6
7#include "base/bind.h"
8#include "base/compiler_specific.h"
9#include "base/logging.h"
10#include "base/message_loop/message_loop.h"
11#include "base/strings/string_number_conversions.h"
12#include "base/strings/stringprintf.h"
13#include "base/values.h"
14#include "net/spdy/spdy_buffer_producer.h"
15#include "net/spdy/spdy_http_utils.h"
16#include "net/spdy/spdy_session.h"
17
18namespace net {
19
20namespace {
21
22base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
23                                           int status,
24                                           const std::string* description,
25                                           NetLog::LogLevel /* log_level */) {
26  base::DictionaryValue* dict = new base::DictionaryValue();
27  dict->SetInteger("stream_id", static_cast<int>(stream_id));
28  dict->SetInteger("status", status);
29  dict->SetString("description", *description);
30  return dict;
31}
32
33base::Value* NetLogSpdyStreamWindowUpdateCallback(
34    SpdyStreamId stream_id,
35    int32 delta,
36    int32 window_size,
37    NetLog::LogLevel /* log_level */) {
38  base::DictionaryValue* dict = new base::DictionaryValue();
39  dict->SetInteger("stream_id", stream_id);
40  dict->SetInteger("delta", delta);
41  dict->SetInteger("window_size", window_size);
42  return dict;
43}
44
45bool ContainsUppercaseAscii(const std::string& str) {
46  for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
47    if (*i >= 'A' && *i <= 'Z') {
48      return true;
49    }
50  }
51  return false;
52}
53
54}  // namespace
55
56// A wrapper around a stream that calls into ProduceSynStreamFrame().
57class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58 public:
59  SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60      : stream_(stream) {
61    DCHECK(stream_.get());
62  }
63
64  virtual ~SynStreamBufferProducer() {}
65
66  virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67    if (!stream_.get()) {
68      NOTREACHED();
69      return scoped_ptr<SpdyBuffer>();
70    }
71    DCHECK_GT(stream_->stream_id(), 0u);
72    return scoped_ptr<SpdyBuffer>(
73        new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74  }
75
76 private:
77  const base::WeakPtr<SpdyStream> stream_;
78};
79
80SpdyStream::SpdyStream(SpdyStreamType type,
81                       const base::WeakPtr<SpdySession>& session,
82                       const GURL& url,
83                       RequestPriority priority,
84                       int32 initial_send_window_size,
85                       int32 initial_recv_window_size,
86                       const BoundNetLog& net_log)
87    : type_(type),
88      weak_ptr_factory_(this),
89      stream_id_(0),
90      url_(url),
91      priority_(priority),
92      send_stalled_by_flow_control_(false),
93      send_window_size_(initial_send_window_size),
94      recv_window_size_(initial_recv_window_size),
95      unacked_recv_window_bytes_(0),
96      session_(session),
97      delegate_(NULL),
98      pending_send_status_(MORE_DATA_TO_SEND),
99      request_time_(base::Time::Now()),
100      response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
101      io_state_(STATE_IDLE),
102      response_status_(OK),
103      net_log_(net_log),
104      raw_received_bytes_(0),
105      send_bytes_(0),
106      recv_bytes_(0) {
107  CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
108        type_ == SPDY_REQUEST_RESPONSE_STREAM ||
109        type_ == SPDY_PUSH_STREAM);
110  CHECK_GE(priority_, MINIMUM_PRIORITY);
111  CHECK_LE(priority_, MAXIMUM_PRIORITY);
112}
113
114SpdyStream::~SpdyStream() {
115  UpdateHistograms();
116}
117
118void SpdyStream::SetDelegate(Delegate* delegate) {
119  CHECK(!delegate_);
120  CHECK(delegate);
121  delegate_ = delegate;
122
123  CHECK(io_state_ == STATE_IDLE ||
124        io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
125
126  if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
127    DCHECK_EQ(type_, SPDY_PUSH_STREAM);
128    base::MessageLoop::current()->PostTask(
129        FROM_HERE,
130        base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr()));
131  }
132}
133
134void SpdyStream::PushedStreamReplay() {
135  DCHECK_EQ(type_, SPDY_PUSH_STREAM);
136  DCHECK_NE(stream_id_, 0u);
137  CHECK_EQ(stream_id_ % 2, 0u);
138
139  CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
140  io_state_ = STATE_HALF_CLOSED_LOCAL;
141
142  // The delegate methods called below may delete |this|, so use
143  // |weak_this| to detect that.
144  base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
145
146  CHECK(delegate_);
147  SpdyResponseHeadersStatus status =
148      delegate_->OnResponseHeadersUpdated(response_headers_);
149  if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
150    // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
151    // have been closed. Since we don't have complete headers, assume
152    // we're waiting for another HEADERS frame, and we had better not
153    // have any pending data frames.
154    CHECK(weak_this);
155    if (!pending_recv_data_.empty()) {
156      LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
157                     "Data received with incomplete headers.");
158      session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
159    }
160    return;
161  }
162
163  // OnResponseHeadersUpdated() may have closed |this|.
164  if (!weak_this)
165    return;
166
167  response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
168
169  while (!pending_recv_data_.empty()) {
170    // Take ownership of the first element of |pending_recv_data_|.
171    scoped_ptr<SpdyBuffer> buffer(pending_recv_data_.front());
172    pending_recv_data_.weak_erase(pending_recv_data_.begin());
173
174    bool eof = (buffer == NULL);
175
176    CHECK(delegate_);
177    delegate_->OnDataReceived(buffer.Pass());
178
179    // OnDataReceived() may have closed |this|.
180    if (!weak_this)
181      return;
182
183    if (eof) {
184      DCHECK(pending_recv_data_.empty());
185      session_->CloseActiveStream(stream_id_, OK);
186      DCHECK(!weak_this);
187      // |pending_recv_data_| is invalid at this point.
188      break;
189    }
190  }
191}
192
193scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
194  CHECK_EQ(io_state_, STATE_IDLE);
195  CHECK(request_headers_);
196  CHECK_GT(stream_id_, 0u);
197
198  SpdyControlFlags flags =
199      (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
200      CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
201  scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
202      stream_id_, priority_, flags, *request_headers_));
203  send_time_ = base::TimeTicks::Now();
204  return frame.Pass();
205}
206
207void SpdyStream::DetachDelegate() {
208  DCHECK(!IsClosed());
209  delegate_ = NULL;
210  Cancel();
211}
212
213void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
214  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
215
216  if (IsClosed())
217    return;
218
219  // Check for wraparound.
220  if (send_window_size_ > 0) {
221    DCHECK_LE(delta_window_size, kint32max - send_window_size_);
222  }
223  if (send_window_size_ < 0) {
224    DCHECK_GE(delta_window_size, kint32min - send_window_size_);
225  }
226  send_window_size_ += delta_window_size;
227  PossiblyResumeIfSendStalled();
228}
229
230void SpdyStream::OnWriteBufferConsumed(
231    size_t frame_payload_size,
232    size_t consume_size,
233    SpdyBuffer::ConsumeSource consume_source) {
234  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
235  if (consume_source == SpdyBuffer::DISCARD) {
236    // If we're discarding a frame or part of it, increase the send
237    // window by the number of discarded bytes. (Although if we're
238    // discarding part of a frame, it's probably because of a write
239    // error and we'll be tearing down the stream soon.)
240    size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
241    DCHECK_GT(remaining_payload_bytes, 0u);
242    IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
243  }
244  // For consumed bytes, the send window is increased when we receive
245  // a WINDOW_UPDATE frame.
246}
247
248void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
249  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
250  DCHECK_GE(delta_window_size, 1);
251
252  // Ignore late WINDOW_UPDATEs.
253  if (IsClosed())
254    return;
255
256  if (send_window_size_ > 0) {
257    // Check for overflow.
258    int32 max_delta_window_size = kint32max - send_window_size_;
259    if (delta_window_size > max_delta_window_size) {
260      std::string desc = base::StringPrintf(
261          "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
262          "send_window_size_ [current: %d]", delta_window_size, stream_id_,
263          send_window_size_);
264      session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
265      return;
266    }
267  }
268
269  send_window_size_ += delta_window_size;
270
271  net_log_.AddEvent(
272      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
273      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
274                 stream_id_, delta_window_size, send_window_size_));
275
276  PossiblyResumeIfSendStalled();
277}
278
279void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
280  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
281
282  if (IsClosed())
283    return;
284
285  // We only call this method when sending a frame. Therefore,
286  // |delta_window_size| should be within the valid frame size range.
287  DCHECK_GE(delta_window_size, 1);
288  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
289
290  // |send_window_size_| should have been at least |delta_window_size| for
291  // this call to happen.
292  DCHECK_GE(send_window_size_, delta_window_size);
293
294  send_window_size_ -= delta_window_size;
295
296  net_log_.AddEvent(
297      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
298      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
299                 stream_id_, -delta_window_size, send_window_size_));
300}
301
302void SpdyStream::OnReadBufferConsumed(
303    size_t consume_size,
304    SpdyBuffer::ConsumeSource consume_source) {
305  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
306  DCHECK_GE(consume_size, 1u);
307  DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
308  IncreaseRecvWindowSize(static_cast<int32>(consume_size));
309}
310
311void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
312  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
313
314  // By the time a read is processed by the delegate, this stream may
315  // already be inactive.
316  if (!session_->IsStreamActive(stream_id_))
317    return;
318
319  DCHECK_GE(unacked_recv_window_bytes_, 0);
320  DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
321  DCHECK_GE(delta_window_size, 1);
322  // Check for overflow.
323  DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
324
325  recv_window_size_ += delta_window_size;
326  net_log_.AddEvent(
327      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
328      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
329                 stream_id_, delta_window_size, recv_window_size_));
330
331  unacked_recv_window_bytes_ += delta_window_size;
332  if (unacked_recv_window_bytes_ >
333      session_->stream_initial_recv_window_size() / 2) {
334    session_->SendStreamWindowUpdate(
335        stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
336    unacked_recv_window_bytes_ = 0;
337  }
338}
339
340void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
341  DCHECK(session_->IsStreamActive(stream_id_));
342  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
343  DCHECK_GE(delta_window_size, 1);
344
345  // Since we never decrease the initial receive window size,
346  // |delta_window_size| should never cause |recv_window_size_| to go
347  // negative. If we do, the receive window isn't being respected.
348  if (delta_window_size > recv_window_size_) {
349    session_->ResetStream(
350        stream_id_, RST_STREAM_PROTOCOL_ERROR,
351        "delta_window_size is " + base::IntToString(delta_window_size) +
352            " in DecreaseRecvWindowSize, which is larger than the receive " +
353            "window size of " + base::IntToString(recv_window_size_));
354    return;
355  }
356
357  recv_window_size_ -= delta_window_size;
358  net_log_.AddEvent(
359      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
360      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
361                 stream_id_, -delta_window_size, recv_window_size_));
362}
363
364int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
365  return session_->GetPeerAddress(address);
366}
367
368int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
369  return session_->GetLocalAddress(address);
370}
371
372bool SpdyStream::WasEverUsed() const {
373  return session_->WasEverUsed();
374}
375
376base::Time SpdyStream::GetRequestTime() const {
377  return request_time_;
378}
379
380void SpdyStream::SetRequestTime(base::Time t) {
381  request_time_ = t;
382}
383
384int SpdyStream::OnInitialResponseHeadersReceived(
385    const SpdyHeaderBlock& initial_response_headers,
386    base::Time response_time,
387    base::TimeTicks recv_first_byte_time) {
388  // SpdySession guarantees that this is called at most once.
389  CHECK(response_headers_.empty());
390
391  // Check to make sure that we don't receive the response headers
392  // before we're ready for it.
393  switch (type_) {
394    case SPDY_BIDIRECTIONAL_STREAM:
395      // For a bidirectional stream, we're ready for the response
396      // headers once we've finished sending the request headers.
397      if (io_state_ == STATE_IDLE) {
398        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
399                              "Response received before request sent");
400        return ERR_SPDY_PROTOCOL_ERROR;
401      }
402      break;
403
404    case SPDY_REQUEST_RESPONSE_STREAM:
405      // For a request/response stream, we're ready for the response
406      // headers once we've finished sending the request headers.
407      if (io_state_ == STATE_IDLE) {
408        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
409                              "Response received before request sent");
410        return ERR_SPDY_PROTOCOL_ERROR;
411      }
412      break;
413
414    case SPDY_PUSH_STREAM:
415      // Push streams transition to a locally half-closed state upon headers.
416      // We must continue to buffer data while waiting for a call to
417      // SetDelegate() (which may not ever happen).
418      // TODO(jgraettinger): When PUSH_PROMISE is added, Handle RESERVED_REMOTE
419      // cases here depending on whether the delegate is already set.
420      CHECK_EQ(io_state_, STATE_IDLE);
421      DCHECK(!delegate_);
422      io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED;
423      break;
424  }
425
426  metrics_.StartStream();
427
428  DCHECK_NE(io_state_, STATE_IDLE);
429
430  response_time_ = response_time;
431  recv_first_byte_time_ = recv_first_byte_time;
432  return MergeWithResponseHeaders(initial_response_headers);
433}
434
435int SpdyStream::OnAdditionalResponseHeadersReceived(
436    const SpdyHeaderBlock& additional_response_headers) {
437  if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
438    session_->ResetStream(
439        stream_id_, RST_STREAM_PROTOCOL_ERROR,
440        "Additional headers received for request/response stream");
441    return ERR_SPDY_PROTOCOL_ERROR;
442  } else if (type_ == SPDY_PUSH_STREAM &&
443             response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
444    session_->ResetStream(
445        stream_id_, RST_STREAM_PROTOCOL_ERROR,
446        "Additional headers received for push stream");
447    return ERR_SPDY_PROTOCOL_ERROR;
448  }
449  return MergeWithResponseHeaders(additional_response_headers);
450}
451
452void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
453  DCHECK(session_->IsStreamActive(stream_id_));
454
455  // If we're still buffering data for a push stream, we will do the
456  // check for data received with incomplete headers in
457  // PushedStreamReplayData().
458  if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
459    DCHECK_EQ(type_, SPDY_PUSH_STREAM);
460    CHECK(!delegate_);
461    // It should be valid for this to happen in the server push case.
462    // We'll return received data when delegate gets attached to the stream.
463    if (buffer) {
464      pending_recv_data_.push_back(buffer.release());
465    } else {
466      pending_recv_data_.push_back(NULL);
467      metrics_.StopStream();
468      // Note: we leave the stream open in the session until the stream
469      //       is claimed.
470    }
471    return;
472  }
473
474  // If we have response headers but the delegate has indicated that
475  // it's still incomplete, then that's a protocol error.
476  if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
477    LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
478                   "Data received with incomplete headers.");
479    session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
480    return;
481  }
482
483  CHECK(!IsClosed());
484
485  if (!buffer) {
486    metrics_.StopStream();
487    if (io_state_ == STATE_OPEN) {
488      io_state_ = STATE_HALF_CLOSED_REMOTE;
489    } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) {
490      io_state_ = STATE_CLOSED;
491      // Deletes |this|.
492      session_->CloseActiveStream(stream_id_, OK);
493    } else {
494      NOTREACHED() << io_state_;
495    }
496    return;
497  }
498
499  size_t length = buffer->GetRemainingSize();
500  DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
501  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
502    DecreaseRecvWindowSize(static_cast<int32>(length));
503    buffer->AddConsumeCallback(
504        base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
505  }
506
507  // Track our bandwidth.
508  metrics_.RecordBytes(length);
509  recv_bytes_ += length;
510  recv_last_byte_time_ = base::TimeTicks::Now();
511
512  // May close |this|.
513  delegate_->OnDataReceived(buffer.Pass());
514}
515
516void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
517                                      size_t frame_size) {
518  DCHECK_NE(type_, SPDY_PUSH_STREAM);
519
520  if (frame_size < session_->GetFrameMinimumSize() ||
521      frame_size > session_->GetFrameMaximumSize()) {
522    NOTREACHED();
523    return;
524  }
525  CHECK(frame_type == SYN_STREAM ||
526        frame_type == DATA) << frame_type;
527
528  int result = (frame_type == SYN_STREAM) ?
529      OnRequestHeadersSent() : OnDataSent(frame_size);
530  if (result == ERR_IO_PENDING) {
531    // The write operation hasn't completed yet.
532    return;
533  }
534
535  if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
536    if(io_state_ == STATE_OPEN) {
537      io_state_ = STATE_HALF_CLOSED_LOCAL;
538    } else if(io_state_ == STATE_HALF_CLOSED_REMOTE) {
539      io_state_ = STATE_CLOSED;
540    } else {
541      NOTREACHED() << io_state_;
542    }
543  }
544  // Notify delegate of write completion. Must not destroy |this|.
545  CHECK(delegate_);
546  {
547    base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
548    if (frame_type == SYN_STREAM) {
549      delegate_->OnRequestHeadersSent();
550    } else {
551      delegate_->OnDataSent();
552    }
553    CHECK(weak_this);
554  }
555
556  if (io_state_ == STATE_CLOSED) {
557    // Deletes |this|.
558    session_->CloseActiveStream(stream_id_, OK);
559  }
560}
561
562int SpdyStream::OnRequestHeadersSent() {
563  CHECK_EQ(io_state_, STATE_IDLE);
564  CHECK_NE(stream_id_, 0u);
565
566  io_state_ = STATE_OPEN;
567  return OK;
568}
569
570int SpdyStream::OnDataSent(size_t frame_size) {
571  CHECK(io_state_ == STATE_OPEN ||
572        io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
573
574  size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize();
575
576  CHECK_GE(frame_size, session_->GetDataFrameMinimumSize());
577  CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload());
578
579  send_bytes_ += frame_payload_size;
580
581  // If more data is available to send, dispatch it and
582  // return that the write operation is still ongoing.
583  pending_send_data_->DidConsume(frame_payload_size);
584  if (pending_send_data_->BytesRemaining() > 0) {
585    QueueNextDataFrame();
586    return ERR_IO_PENDING;
587  } else {
588    pending_send_data_ = NULL;
589    return OK;
590  }
591}
592
593SpdyMajorVersion SpdyStream::GetProtocolVersion() const {
594  return session_->GetProtocolVersion();
595}
596
597void SpdyStream::LogStreamError(int status, const std::string& description) {
598  net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
599                    base::Bind(&NetLogSpdyStreamErrorCallback,
600                               stream_id_, status, &description));
601}
602
603void SpdyStream::OnClose(int status) {
604  // In most cases, the stream should already be CLOSED. The exception is when a
605  // SpdySession is shutting down while the stream is in an intermediate state.
606  io_state_ = STATE_CLOSED;
607  response_status_ = status;
608  Delegate* delegate = delegate_;
609  delegate_ = NULL;
610  if (delegate)
611    delegate->OnClose(status);
612  // Unset |stream_id_| last so that the delegate can look it up.
613  stream_id_ = 0;
614}
615
616void SpdyStream::Cancel() {
617  // We may be called again from a delegate's OnClose().
618  if (io_state_ == STATE_CLOSED)
619    return;
620
621  if (stream_id_ != 0) {
622    session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
623  } else {
624    session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
625  }
626  // |this| is invalid at this point.
627}
628
629void SpdyStream::Close() {
630  // We may be called again from a delegate's OnClose().
631  if (io_state_ == STATE_CLOSED)
632    return;
633
634  if (stream_id_ != 0) {
635    session_->CloseActiveStream(stream_id_, OK);
636  } else {
637    session_->CloseCreatedStream(GetWeakPtr(), OK);
638  }
639  // |this| is invalid at this point.
640}
641
642base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
643  return weak_ptr_factory_.GetWeakPtr();
644}
645
646int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
647                                   SpdySendStatus send_status) {
648  CHECK_NE(type_, SPDY_PUSH_STREAM);
649  CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
650  CHECK(!request_headers_);
651  CHECK(!pending_send_data_.get());
652  CHECK_EQ(io_state_, STATE_IDLE);
653  request_headers_ = request_headers.Pass();
654  pending_send_status_ = send_status;
655  session_->EnqueueStreamWrite(
656      GetWeakPtr(), SYN_STREAM,
657      scoped_ptr<SpdyBufferProducer>(
658          new SynStreamBufferProducer(GetWeakPtr())));
659  return ERR_IO_PENDING;
660}
661
662void SpdyStream::SendData(IOBuffer* data,
663                          int length,
664                          SpdySendStatus send_status) {
665  CHECK_NE(type_, SPDY_PUSH_STREAM);
666  CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
667  CHECK(io_state_ == STATE_OPEN ||
668        io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
669  CHECK(!pending_send_data_.get());
670  pending_send_data_ = new DrainableIOBuffer(data, length);
671  pending_send_status_ = send_status;
672  QueueNextDataFrame();
673}
674
675bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
676                            bool* was_npn_negotiated,
677                            NextProto* protocol_negotiated) {
678  return session_->GetSSLInfo(
679      ssl_info, was_npn_negotiated, protocol_negotiated);
680}
681
682bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
683  return session_->GetSSLCertRequestInfo(cert_request_info);
684}
685
686void SpdyStream::PossiblyResumeIfSendStalled() {
687  if (IsLocallyClosed()) {
688    return;
689  }
690  if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
691      send_window_size_ > 0) {
692    net_log_.AddEvent(
693        NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
694        NetLog::IntegerCallback("stream_id", stream_id_));
695    send_stalled_by_flow_control_ = false;
696    QueueNextDataFrame();
697  }
698}
699
700bool SpdyStream::IsClosed() const {
701  return io_state_ == STATE_CLOSED;
702}
703
704bool SpdyStream::IsLocallyClosed() const {
705  return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
706      io_state_ == STATE_HALF_CLOSED_LOCAL ||
707      io_state_ == STATE_CLOSED;
708}
709
710bool SpdyStream::IsIdle() const {
711  return io_state_ == STATE_IDLE;
712}
713
714bool SpdyStream::IsOpen() const {
715  return io_state_ == STATE_OPEN;
716}
717
718NextProto SpdyStream::GetProtocol() const {
719  return session_->protocol();
720}
721
722bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
723  if (stream_id_ == 0)
724    return false;
725
726  return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
727}
728
729GURL SpdyStream::GetUrlFromHeaders() const {
730  if (type_ != SPDY_PUSH_STREAM && !request_headers_)
731    return GURL();
732
733  const SpdyHeaderBlock& headers =
734      (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_;
735  return GetUrlFromHeaderBlock(headers, GetProtocolVersion(),
736                               type_ == SPDY_PUSH_STREAM);
737}
738
739bool SpdyStream::HasUrlFromHeaders() const {
740  return !GetUrlFromHeaders().is_empty();
741}
742
743void SpdyStream::UpdateHistograms() {
744  // We need at least the receive timers to be filled in, as otherwise
745  // metrics can be bogus.
746  if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
747    return;
748
749  base::TimeTicks effective_send_time;
750  if (type_ == SPDY_PUSH_STREAM) {
751    // Push streams shouldn't have |send_time_| filled in.
752    DCHECK(send_time_.is_null());
753    effective_send_time = recv_first_byte_time_;
754  } else {
755    // For non-push streams, we also need |send_time_| to be filled
756    // in.
757    if (send_time_.is_null())
758      return;
759    effective_send_time = send_time_;
760  }
761
762  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
763                      recv_first_byte_time_ - effective_send_time);
764  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
765                      recv_last_byte_time_ - recv_first_byte_time_);
766  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
767                      recv_last_byte_time_ - effective_send_time);
768
769  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
770  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
771}
772
773void SpdyStream::QueueNextDataFrame() {
774  // Until the request has been completely sent, we cannot be sure
775  // that our stream_id is correct.
776  CHECK(io_state_ == STATE_OPEN ||
777        io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
778  CHECK_GT(stream_id_, 0u);
779  CHECK(pending_send_data_.get());
780  CHECK_GT(pending_send_data_->BytesRemaining(), 0);
781
782  SpdyDataFlags flags =
783      (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
784      DATA_FLAG_FIN : DATA_FLAG_NONE;
785  scoped_ptr<SpdyBuffer> data_buffer(
786      session_->CreateDataBuffer(stream_id_,
787                                 pending_send_data_.get(),
788                                 pending_send_data_->BytesRemaining(),
789                                 flags));
790  // We'll get called again by PossiblyResumeIfSendStalled().
791  if (!data_buffer)
792    return;
793
794  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
795    DCHECK_GE(data_buffer->GetRemainingSize(),
796              session_->GetDataFrameMinimumSize());
797    size_t payload_size =
798        data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
799    DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
800    DecreaseSendWindowSize(static_cast<int32>(payload_size));
801    // This currently isn't strictly needed, since write frames are
802    // discarded only if the stream is about to be closed. But have it
803    // here anyway just in case this changes.
804    data_buffer->AddConsumeCallback(
805        base::Bind(&SpdyStream::OnWriteBufferConsumed,
806                   GetWeakPtr(), payload_size));
807  }
808
809  session_->EnqueueStreamWrite(
810      GetWeakPtr(), DATA,
811      scoped_ptr<SpdyBufferProducer>(
812          new SimpleBufferProducer(data_buffer.Pass())));
813}
814
815int SpdyStream::MergeWithResponseHeaders(
816    const SpdyHeaderBlock& new_response_headers) {
817  if (new_response_headers.find("transfer-encoding") !=
818      new_response_headers.end()) {
819    session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
820                         "Received transfer-encoding header");
821    return ERR_SPDY_PROTOCOL_ERROR;
822  }
823
824  for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
825      it != new_response_headers.end(); ++it) {
826    // Disallow uppercase headers.
827    if (ContainsUppercaseAscii(it->first)) {
828      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
829                            "Upper case characters in header: " + it->first);
830      return ERR_SPDY_PROTOCOL_ERROR;
831    }
832
833    SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
834    // Disallow duplicate headers.  This is just to be conservative.
835    if (it2 != response_headers_.end() && it2->first == it->first) {
836      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
837                            "Duplicate header: " + it->first);
838      return ERR_SPDY_PROTOCOL_ERROR;
839    }
840
841    response_headers_.insert(it2, *it);
842  }
843
844  // If delegate_ is not yet attached, we'll call
845  // OnResponseHeadersUpdated() after the delegate gets attached to
846  // the stream.
847  if (delegate_) {
848    // The call to OnResponseHeadersUpdated() below may delete |this|,
849    // so use |weak_this| to detect that.
850    base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
851
852    SpdyResponseHeadersStatus status =
853        delegate_->OnResponseHeadersUpdated(response_headers_);
854    if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
855      // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
856      // have been closed.
857      CHECK(weak_this);
858      // Incomplete headers are OK only for push streams.
859      if (type_ != SPDY_PUSH_STREAM) {
860        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
861                              "Incomplete headers");
862        return ERR_INCOMPLETE_SPDY_HEADERS;
863      }
864    } else if (weak_this) {
865      response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
866    }
867  }
868
869  return OK;
870}
871
872#define STATE_CASE(s) \
873  case s: \
874    description = base::StringPrintf("%s (0x%08X)", #s, s); \
875    break
876
877std::string SpdyStream::DescribeState(State state) {
878  std::string description;
879  switch (state) {
880    STATE_CASE(STATE_IDLE);
881    STATE_CASE(STATE_OPEN);
882    STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
883    STATE_CASE(STATE_HALF_CLOSED_LOCAL);
884    STATE_CASE(STATE_CLOSED);
885    default:
886      description = base::StringPrintf("Unknown state 0x%08X (%u)", state,
887                                       state);
888      break;
889  }
890  return description;
891}
892
893#undef STATE_CASE
894
895}  // namespace net
896