spdy_stream.cc revision bbcdd45c55eb7c4641ab97aef9889b0fc828e7d3
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_stream.h"
6
7#include "base/bind.h"
8#include "base/compiler_specific.h"
9#include "base/logging.h"
10#include "base/message_loop/message_loop.h"
11#include "base/strings/string_number_conversions.h"
12#include "base/strings/stringprintf.h"
13#include "base/values.h"
14#include "net/spdy/spdy_buffer_producer.h"
15#include "net/spdy/spdy_http_utils.h"
16#include "net/spdy/spdy_session.h"
17
18namespace net {
19
20namespace {
21
22base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
23                                           int status,
24                                           const std::string* description,
25                                           NetLog::LogLevel /* log_level */) {
26  base::DictionaryValue* dict = new base::DictionaryValue();
27  dict->SetInteger("stream_id", static_cast<int>(stream_id));
28  dict->SetInteger("status", status);
29  dict->SetString("description", *description);
30  return dict;
31}
32
33base::Value* NetLogSpdyStreamWindowUpdateCallback(
34    SpdyStreamId stream_id,
35    int32 delta,
36    int32 window_size,
37    NetLog::LogLevel /* log_level */) {
38  base::DictionaryValue* dict = new base::DictionaryValue();
39  dict->SetInteger("stream_id", stream_id);
40  dict->SetInteger("delta", delta);
41  dict->SetInteger("window_size", window_size);
42  return dict;
43}
44
45bool ContainsUppercaseAscii(const std::string& str) {
46  for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
47    if (*i >= 'A' && *i <= 'Z') {
48      return true;
49    }
50  }
51  return false;
52}
53
54}  // namespace
55
56// A wrapper around a stream that calls into ProduceSynStreamFrame().
57class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58 public:
59  SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60      : stream_(stream) {
61    DCHECK(stream_.get());
62  }
63
64  virtual ~SynStreamBufferProducer() {}
65
66  virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67    if (!stream_.get()) {
68      NOTREACHED();
69      return scoped_ptr<SpdyBuffer>();
70    }
71    DCHECK_GT(stream_->stream_id(), 0u);
72    return scoped_ptr<SpdyBuffer>(
73        new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74  }
75
76 private:
77  const base::WeakPtr<SpdyStream> stream_;
78};
79
80SpdyStream::SpdyStream(SpdyStreamType type,
81                       const base::WeakPtr<SpdySession>& session,
82                       const GURL& url,
83                       RequestPriority priority,
84                       int32 initial_send_window_size,
85                       int32 initial_recv_window_size,
86                       const BoundNetLog& net_log)
87    : type_(type),
88      weak_ptr_factory_(this),
89      in_do_loop_(false),
90      continue_buffering_data_(type_ == SPDY_PUSH_STREAM),
91      stream_id_(0),
92      url_(url),
93      priority_(priority),
94      slot_(0),
95      send_stalled_by_flow_control_(false),
96      send_window_size_(initial_send_window_size),
97      recv_window_size_(initial_recv_window_size),
98      unacked_recv_window_bytes_(0),
99      session_(session),
100      delegate_(NULL),
101      send_status_(
102          (type_ == SPDY_PUSH_STREAM) ?
103          NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND),
104      request_time_(base::Time::Now()),
105      response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
106      io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE),
107      response_status_(OK),
108      net_log_(net_log),
109      send_bytes_(0),
110      recv_bytes_(0),
111      domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE),
112      just_completed_frame_type_(DATA),
113      just_completed_frame_size_(0) {
114  CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
115        type_ == SPDY_REQUEST_RESPONSE_STREAM ||
116        type_ == SPDY_PUSH_STREAM);
117}
118
119SpdyStream::~SpdyStream() {
120  CHECK(!in_do_loop_);
121  UpdateHistograms();
122}
123
124void SpdyStream::SetDelegate(Delegate* delegate) {
125  CHECK(!delegate_);
126  CHECK(delegate);
127  delegate_ = delegate;
128
129  if (type_ == SPDY_PUSH_STREAM) {
130    DCHECK(continue_buffering_data_);
131    base::MessageLoop::current()->PostTask(
132        FROM_HERE,
133        base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr()));
134  }
135}
136
137void SpdyStream::PushedStreamReplayData() {
138  DCHECK_EQ(type_, SPDY_PUSH_STREAM);
139  DCHECK_NE(stream_id_, 0u);
140  DCHECK(continue_buffering_data_);
141
142  continue_buffering_data_ = false;
143
144  // The delegate methods called below may delete |this|, so use
145  // |weak_this| to detect that.
146  base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
147
148  CHECK(delegate_);
149  SpdyResponseHeadersStatus status =
150      delegate_->OnResponseHeadersUpdated(response_headers_);
151  if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
152    // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
153    // have been closed. Since we don't have complete headers, assume
154    // we're waiting for another HEADERS frame, and we had better not
155    // have any pending data frames.
156    CHECK(weak_this);
157    if (!pending_buffers_.empty()) {
158      LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
159                     "Data received with incomplete headers.");
160      session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
161    }
162    return;
163  }
164
165  // OnResponseHeadersUpdated() may have closed |this|.
166  if (!weak_this)
167    return;
168
169  response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
170
171  while (!pending_buffers_.empty()) {
172    // Take ownership of the first element of |pending_buffers_|.
173    scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front());
174    pending_buffers_.weak_erase(pending_buffers_.begin());
175
176    bool eof = (buffer == NULL);
177
178    CHECK(delegate_);
179    delegate_->OnDataReceived(buffer.Pass());
180
181    // OnDataReceived() may have closed |this|.
182    if (!weak_this)
183      return;
184
185    if (eof) {
186      DCHECK(pending_buffers_.empty());
187      session_->CloseActiveStream(stream_id_, OK);
188      DCHECK(!weak_this);
189      // |pending_buffers_| is invalid at this point.
190      break;
191    }
192  }
193}
194
195scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
196  CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
197  CHECK(request_headers_);
198  CHECK_GT(stream_id_, 0u);
199
200  SpdyControlFlags flags =
201      (send_status_ == NO_MORE_DATA_TO_SEND) ?
202      CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
203  scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
204      stream_id_, priority_, slot_, flags, *request_headers_));
205  send_time_ = base::TimeTicks::Now();
206  return frame.Pass();
207}
208
209void SpdyStream::DetachDelegate() {
210  CHECK(!in_do_loop_);
211  DCHECK(!IsClosed());
212  delegate_ = NULL;
213  Cancel();
214}
215
216void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
217  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
218
219  if (IsClosed())
220    return;
221
222  // Check for wraparound.
223  if (send_window_size_ > 0) {
224    DCHECK_LE(delta_window_size, kint32max - send_window_size_);
225  }
226  if (send_window_size_ < 0) {
227    DCHECK_GE(delta_window_size, kint32min - send_window_size_);
228  }
229  send_window_size_ += delta_window_size;
230  PossiblyResumeIfSendStalled();
231}
232
233void SpdyStream::OnWriteBufferConsumed(
234    size_t frame_payload_size,
235    size_t consume_size,
236    SpdyBuffer::ConsumeSource consume_source) {
237  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
238  if (consume_source == SpdyBuffer::DISCARD) {
239    // If we're discarding a frame or part of it, increase the send
240    // window by the number of discarded bytes. (Although if we're
241    // discarding part of a frame, it's probably because of a write
242    // error and we'll be tearing down the stream soon.)
243    size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
244    DCHECK_GT(remaining_payload_bytes, 0u);
245    IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
246  }
247  // For consumed bytes, the send window is increased when we receive
248  // a WINDOW_UPDATE frame.
249}
250
251void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
252  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
253  DCHECK_GE(delta_window_size, 1);
254
255  // Ignore late WINDOW_UPDATEs.
256  if (IsClosed())
257    return;
258
259  if (send_window_size_ > 0) {
260    // Check for overflow.
261    int32 max_delta_window_size = kint32max - send_window_size_;
262    if (delta_window_size > max_delta_window_size) {
263      std::string desc = base::StringPrintf(
264          "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
265          "send_window_size_ [current: %d]", delta_window_size, stream_id_,
266          send_window_size_);
267      session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
268      return;
269    }
270  }
271
272  send_window_size_ += delta_window_size;
273
274  net_log_.AddEvent(
275      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
276      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
277                 stream_id_, delta_window_size, send_window_size_));
278
279  PossiblyResumeIfSendStalled();
280}
281
282void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
283  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
284
285  if (IsClosed())
286    return;
287
288  // We only call this method when sending a frame. Therefore,
289  // |delta_window_size| should be within the valid frame size range.
290  DCHECK_GE(delta_window_size, 1);
291  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
292
293  // |send_window_size_| should have been at least |delta_window_size| for
294  // this call to happen.
295  DCHECK_GE(send_window_size_, delta_window_size);
296
297  send_window_size_ -= delta_window_size;
298
299  net_log_.AddEvent(
300      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
301      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
302                 stream_id_, -delta_window_size, send_window_size_));
303}
304
305void SpdyStream::OnReadBufferConsumed(
306    size_t consume_size,
307    SpdyBuffer::ConsumeSource consume_source) {
308  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
309  DCHECK_GE(consume_size, 1u);
310  DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
311  IncreaseRecvWindowSize(static_cast<int32>(consume_size));
312}
313
314void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
315  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
316
317  // By the time a read is processed by the delegate, this stream may
318  // already be inactive.
319  if (!session_->IsStreamActive(stream_id_))
320    return;
321
322  DCHECK_GE(unacked_recv_window_bytes_, 0);
323  DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
324  DCHECK_GE(delta_window_size, 1);
325  // Check for overflow.
326  DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
327
328  recv_window_size_ += delta_window_size;
329  net_log_.AddEvent(
330      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
331      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
332                 stream_id_, delta_window_size, recv_window_size_));
333
334  unacked_recv_window_bytes_ += delta_window_size;
335  if (unacked_recv_window_bytes_ >
336      session_->stream_initial_recv_window_size() / 2) {
337    session_->SendStreamWindowUpdate(
338        stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
339    unacked_recv_window_bytes_ = 0;
340  }
341}
342
343void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
344  DCHECK(session_->IsStreamActive(stream_id_));
345  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
346  DCHECK_GE(delta_window_size, 1);
347
348  // Since we never decrease the initial receive window size,
349  // |delta_window_size| should never cause |recv_window_size_| to go
350  // negative. If we do, the receive window isn't being respected.
351  if (delta_window_size > recv_window_size_) {
352    session_->ResetStream(
353        stream_id_, RST_STREAM_PROTOCOL_ERROR,
354        "delta_window_size is " + base::IntToString(delta_window_size) +
355            " in DecreaseRecvWindowSize, which is larger than the receive " +
356            "window size of " + base::IntToString(recv_window_size_));
357    return;
358  }
359
360  recv_window_size_ -= delta_window_size;
361  net_log_.AddEvent(
362      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
363      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
364                 stream_id_, -delta_window_size, recv_window_size_));
365}
366
367int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
368  return session_->GetPeerAddress(address);
369}
370
371int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
372  return session_->GetLocalAddress(address);
373}
374
375bool SpdyStream::WasEverUsed() const {
376  return session_->WasEverUsed();
377}
378
379base::Time SpdyStream::GetRequestTime() const {
380  return request_time_;
381}
382
383void SpdyStream::SetRequestTime(base::Time t) {
384  request_time_ = t;
385}
386
387int SpdyStream::OnInitialResponseHeadersReceived(
388    const SpdyHeaderBlock& initial_response_headers,
389    base::Time response_time,
390    base::TimeTicks recv_first_byte_time) {
391  // SpdySession guarantees that this is called at most once.
392  CHECK(response_headers_.empty());
393
394  // Check to make sure that we don't receive the response headers
395  // before we're ready for it.
396  switch (type_) {
397    case SPDY_BIDIRECTIONAL_STREAM:
398      // For a bidirectional stream, we're ready for the response
399      // headers once we've finished sending the request headers.
400      if (io_state_ < STATE_IDLE) {
401        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
402                              "Response received before request sent");
403        return ERR_SPDY_PROTOCOL_ERROR;
404      }
405      break;
406
407    case SPDY_REQUEST_RESPONSE_STREAM:
408      // For a request/response stream, we're ready for the response
409      // headers once we've finished sending the request headers and
410      // the request body (if we have one).
411      if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) ||
412          pending_send_data_.get()) {
413        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
414                              "Response received before request sent");
415        return ERR_SPDY_PROTOCOL_ERROR;
416      }
417      break;
418
419    case SPDY_PUSH_STREAM:
420      // For a push stream, we're ready immediately.
421      DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND);
422      DCHECK_EQ(io_state_, STATE_IDLE);
423      break;
424  }
425
426  metrics_.StartStream();
427
428  DCHECK_EQ(io_state_, STATE_IDLE);
429
430  response_time_ = response_time;
431  recv_first_byte_time_ = recv_first_byte_time;
432  return MergeWithResponseHeaders(initial_response_headers);
433}
434
435int SpdyStream::OnAdditionalResponseHeadersReceived(
436    const SpdyHeaderBlock& additional_response_headers) {
437  if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
438    session_->ResetStream(
439        stream_id_, RST_STREAM_PROTOCOL_ERROR,
440        "Additional headers received for request/response stream");
441    return ERR_SPDY_PROTOCOL_ERROR;
442  } else if (type_ == SPDY_PUSH_STREAM &&
443             response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
444    session_->ResetStream(
445        stream_id_, RST_STREAM_PROTOCOL_ERROR,
446        "Additional headers received for push stream");
447    return ERR_SPDY_PROTOCOL_ERROR;
448  }
449  return MergeWithResponseHeaders(additional_response_headers);
450}
451
452void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
453  DCHECK(session_->IsStreamActive(stream_id_));
454
455  // If we're still buffering data for a push stream, we will do the
456  // check for data received with incomplete headers in
457  // PushedStreamReplayData().
458  if (!delegate_ || continue_buffering_data_) {
459    DCHECK_EQ(type_, SPDY_PUSH_STREAM);
460    // It should be valid for this to happen in the server push case.
461    // We'll return received data when delegate gets attached to the stream.
462    if (buffer) {
463      pending_buffers_.push_back(buffer.release());
464    } else {
465      pending_buffers_.push_back(NULL);
466      metrics_.StopStream();
467      // Note: we leave the stream open in the session until the stream
468      //       is claimed.
469    }
470    return;
471  }
472
473  // If we have response headers but the delegate has indicated that
474  // it's still incomplete, then that's a protocol error.
475  if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
476    LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
477                   "Data received with incomplete headers.");
478    session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
479    return;
480  }
481
482  CHECK(!IsClosed());
483
484  if (!buffer) {
485    metrics_.StopStream();
486    // Deletes |this|.
487    session_->CloseActiveStream(stream_id_, OK);
488    return;
489  }
490
491  size_t length = buffer->GetRemainingSize();
492  DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
493  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
494    DecreaseRecvWindowSize(static_cast<int32>(length));
495    buffer->AddConsumeCallback(
496        base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
497  }
498
499  // Track our bandwidth.
500  metrics_.RecordBytes(length);
501  recv_bytes_ += length;
502  recv_last_byte_time_ = base::TimeTicks::Now();
503
504  // May close |this|.
505  delegate_->OnDataReceived(buffer.Pass());
506}
507
508void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
509                                      size_t frame_size) {
510  if (frame_size < session_->GetFrameMinimumSize() ||
511      frame_size > session_->GetFrameMaximumSize()) {
512    NOTREACHED();
513    return;
514  }
515  if (IsClosed())
516    return;
517  just_completed_frame_type_ = frame_type;
518  just_completed_frame_size_ = frame_size;
519  DoLoop(OK);
520}
521
522int SpdyStream::GetProtocolVersion() const {
523  return session_->GetProtocolVersion();
524}
525
526void SpdyStream::LogStreamError(int status, const std::string& description) {
527  net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
528                    base::Bind(&NetLogSpdyStreamErrorCallback,
529                               stream_id_, status, &description));
530}
531
532void SpdyStream::OnClose(int status) {
533  CHECK(!in_do_loop_);
534  io_state_ = STATE_CLOSED;
535  response_status_ = status;
536  Delegate* delegate = delegate_;
537  delegate_ = NULL;
538  if (delegate)
539    delegate->OnClose(status);
540  // Unset |stream_id_| last so that the delegate can look it up.
541  stream_id_ = 0;
542}
543
544void SpdyStream::Cancel() {
545  CHECK(!in_do_loop_);
546  // We may be called again from a delegate's OnClose().
547  if (io_state_ == STATE_CLOSED)
548    return;
549
550  if (stream_id_ != 0) {
551    session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
552  } else {
553    session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
554  }
555  // |this| is invalid at this point.
556}
557
558void SpdyStream::Close() {
559  CHECK(!in_do_loop_);
560  // We may be called again from a delegate's OnClose().
561  if (io_state_ == STATE_CLOSED)
562    return;
563
564  if (stream_id_ != 0) {
565    session_->CloseActiveStream(stream_id_, OK);
566  } else {
567    session_->CloseCreatedStream(GetWeakPtr(), OK);
568  }
569  // |this| is invalid at this point.
570}
571
572base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
573  return weak_ptr_factory_.GetWeakPtr();
574}
575
576int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
577                                   SpdySendStatus send_status) {
578  CHECK_NE(type_, SPDY_PUSH_STREAM);
579  CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
580  CHECK(!request_headers_);
581  CHECK(!pending_send_data_.get());
582  CHECK_EQ(io_state_, STATE_NONE);
583  request_headers_ = request_headers.Pass();
584  send_status_ = send_status;
585  io_state_ = STATE_GET_DOMAIN_BOUND_CERT;
586  return DoLoop(OK);
587}
588
589void SpdyStream::SendData(IOBuffer* data,
590                          int length,
591                          SpdySendStatus send_status) {
592  CHECK_NE(type_, SPDY_PUSH_STREAM);
593  CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
594  CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
595  CHECK(!pending_send_data_.get());
596  pending_send_data_ = new DrainableIOBuffer(data, length);
597  send_status_ = send_status;
598  QueueNextDataFrame();
599}
600
601bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
602                            bool* was_npn_negotiated,
603                            NextProto* protocol_negotiated) {
604  return session_->GetSSLInfo(
605      ssl_info, was_npn_negotiated, protocol_negotiated);
606}
607
608bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
609  return session_->GetSSLCertRequestInfo(cert_request_info);
610}
611
612void SpdyStream::PossiblyResumeIfSendStalled() {
613  DCHECK(!IsClosed());
614
615  if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
616      send_window_size_ > 0) {
617    net_log_.AddEvent(
618        NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
619        NetLog::IntegerCallback("stream_id", stream_id_));
620    send_stalled_by_flow_control_ = false;
621    QueueNextDataFrame();
622  }
623}
624
625bool SpdyStream::IsClosed() const {
626  return io_state_ == STATE_CLOSED;
627}
628
629bool SpdyStream::IsIdle() const {
630  return io_state_ == STATE_IDLE;
631}
632
633bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
634  if (stream_id_ == 0)
635    return false;
636
637  return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
638}
639
640GURL SpdyStream::GetUrlFromHeaders() const {
641  if (type_ != SPDY_PUSH_STREAM && !request_headers_)
642    return GURL();
643
644  const SpdyHeaderBlock& headers =
645      (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_;
646  return GetUrlFromHeaderBlock(headers, GetProtocolVersion(),
647                               type_ == SPDY_PUSH_STREAM);
648}
649
650bool SpdyStream::HasUrlFromHeaders() const {
651  return !GetUrlFromHeaders().is_empty();
652}
653
654void SpdyStream::OnGetDomainBoundCertComplete(int result) {
655  DCHECK_EQ(io_state_, STATE_GET_DOMAIN_BOUND_CERT_COMPLETE);
656  DoLoop(result);
657}
658
659int SpdyStream::DoLoop(int result) {
660  CHECK(!in_do_loop_);
661  in_do_loop_ = true;
662
663  do {
664    State state = io_state_;
665    io_state_ = STATE_NONE;
666    switch (state) {
667      case STATE_GET_DOMAIN_BOUND_CERT:
668        CHECK_EQ(result, OK);
669        result = DoGetDomainBoundCert();
670        break;
671      case STATE_GET_DOMAIN_BOUND_CERT_COMPLETE:
672        result = DoGetDomainBoundCertComplete(result);
673        break;
674      case STATE_SEND_DOMAIN_BOUND_CERT:
675        CHECK_EQ(result, OK);
676        result = DoSendDomainBoundCert();
677        break;
678      case STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE:
679        result = DoSendDomainBoundCertComplete(result);
680        break;
681      case STATE_SEND_REQUEST_HEADERS:
682        CHECK_EQ(result, OK);
683        result = DoSendRequestHeaders();
684        break;
685      case STATE_SEND_REQUEST_HEADERS_COMPLETE:
686        CHECK_EQ(result, OK);
687        result = DoSendRequestHeadersComplete();
688        break;
689
690      // For request/response streams, no data is sent from the client
691      // while in the OPEN state, so OnFrameWriteComplete is never
692      // called here.  The HTTP body is handled in the OnDataReceived
693      // callback, which does not call into DoLoop.
694      //
695      // For bidirectional streams, we'll send and receive data once
696      // the connection is established.  Received data is handled in
697      // OnDataReceived.  Sent data is handled in
698      // OnFrameWriteComplete, which calls DoOpen().
699      case STATE_IDLE:
700        CHECK_EQ(result, OK);
701        result = DoOpen();
702        break;
703
704      case STATE_CLOSED:
705        DCHECK_NE(result, ERR_IO_PENDING);
706        break;
707      default:
708        NOTREACHED() << io_state_;
709        break;
710    }
711  } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
712           io_state_ != STATE_IDLE);
713
714  CHECK(in_do_loop_);
715  in_do_loop_ = false;
716
717  return result;
718}
719
720int SpdyStream::DoGetDomainBoundCert() {
721  CHECK(request_headers_);
722  DCHECK_NE(type_, SPDY_PUSH_STREAM);
723  GURL url = GetUrlFromHeaders();
724  if (!session_->NeedsCredentials() || !url.SchemeIs("https")) {
725    // Proceed directly to sending the request headers
726    io_state_ = STATE_SEND_REQUEST_HEADERS;
727    return OK;
728  }
729
730  slot_ = session_->credential_state()->FindCredentialSlot(GetUrlFromHeaders());
731  if (slot_ != SpdyCredentialState::kNoEntry) {
732    // Proceed directly to sending the request headers
733    io_state_ = STATE_SEND_REQUEST_HEADERS;
734    return OK;
735  }
736
737  io_state_ = STATE_GET_DOMAIN_BOUND_CERT_COMPLETE;
738  ServerBoundCertService* sbc_service = session_->GetServerBoundCertService();
739  DCHECK(sbc_service != NULL);
740  std::vector<uint8> requested_cert_types;
741  requested_cert_types.push_back(CLIENT_CERT_ECDSA_SIGN);
742  int rv = sbc_service->GetDomainBoundCert(
743      url.GetOrigin().host(), requested_cert_types,
744      &domain_bound_cert_type_, &domain_bound_private_key_, &domain_bound_cert_,
745      base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()),
746      &domain_bound_cert_request_handle_);
747  return rv;
748}
749
750int SpdyStream::DoGetDomainBoundCertComplete(int result) {
751  DCHECK_NE(type_, SPDY_PUSH_STREAM);
752  if (result != OK)
753    return result;
754
755  io_state_ = STATE_SEND_DOMAIN_BOUND_CERT;
756  slot_ =  session_->credential_state()->SetHasCredential(GetUrlFromHeaders());
757  return OK;
758}
759
760int SpdyStream::DoSendDomainBoundCert() {
761  CHECK(request_headers_);
762  DCHECK_NE(type_, SPDY_PUSH_STREAM);
763  io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
764
765  std::string origin = GetUrlFromHeaders().GetOrigin().spec();
766  DCHECK(origin[origin.length() - 1] == '/');
767  origin.erase(origin.length() - 1);  // Trim trailing slash.
768  scoped_ptr<SpdyFrame> frame;
769  int rv = session_->CreateCredentialFrame(
770      origin, domain_bound_cert_type_, domain_bound_private_key_,
771      domain_bound_cert_, priority_, &frame);
772  if (rv != OK) {
773    DCHECK_NE(rv, ERR_IO_PENDING);
774    return rv;
775  }
776
777  DCHECK(frame);
778  // TODO(akalin): Fix the following race condition:
779  //
780  // Since this is decoupled from sending the SYN_STREAM frame, it is
781  // possible that other domain-bound cert frames will clobber ours
782  // before our SYN_STREAM frame gets sent. This can be solved by
783  // immediately enqueueing the SYN_STREAM frame here and adjusting
784  // the state machine appropriately.
785  session_->EnqueueStreamWrite(
786      GetWeakPtr(), CREDENTIAL,
787      scoped_ptr<SpdyBufferProducer>(
788          new SimpleBufferProducer(
789              scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
790  return ERR_IO_PENDING;
791}
792
793int SpdyStream::DoSendDomainBoundCertComplete(int result) {
794  DCHECK_NE(type_, SPDY_PUSH_STREAM);
795  if (result != OK)
796    return result;
797
798  DCHECK_EQ(just_completed_frame_type_, CREDENTIAL);
799  io_state_ = STATE_SEND_REQUEST_HEADERS;
800  return OK;
801}
802
803int SpdyStream::DoSendRequestHeaders() {
804  DCHECK_NE(type_, SPDY_PUSH_STREAM);
805  io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE;
806
807  session_->EnqueueStreamWrite(
808      GetWeakPtr(), SYN_STREAM,
809      scoped_ptr<SpdyBufferProducer>(
810          new SynStreamBufferProducer(GetWeakPtr())));
811  return ERR_IO_PENDING;
812}
813
814namespace {
815
816// Assuming we're in STATE_IDLE, maps the given type (which must not
817// be SPDY_PUSH_STREAM) and send status to a result to return from
818// DoSendRequestHeadersComplete() or DoOpen().
819int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) {
820  switch (type) {
821    case SPDY_BIDIRECTIONAL_STREAM:
822      // For bidirectional streams, there's nothing else to do.
823      DCHECK_EQ(send_status, MORE_DATA_TO_SEND);
824      return OK;
825
826    case SPDY_REQUEST_RESPONSE_STREAM:
827      // For request/response streams, wait for the delegate to send
828      // data if there's request data to send; we'll get called back
829      // when the send finishes.
830      if (send_status == MORE_DATA_TO_SEND)
831        return ERR_IO_PENDING;
832
833      return OK;
834
835    case SPDY_PUSH_STREAM:
836      // This should never be called for push streams.
837      break;
838  }
839
840  CHECK(false);
841  return ERR_UNEXPECTED;
842}
843
844}  // namespace
845
846int SpdyStream::DoSendRequestHeadersComplete() {
847  DCHECK_NE(type_, SPDY_PUSH_STREAM);
848  DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
849  DCHECK_NE(stream_id_, 0u);
850
851  io_state_ = STATE_IDLE;
852
853  CHECK(delegate_);
854  // Must not close |this|; if it does, it will trigger the |in_do_loop_|
855  // check in the destructor.
856  delegate_->OnRequestHeadersSent();
857
858  return GetOpenStateResult(type_, send_status_);
859}
860
861int SpdyStream::DoOpen() {
862  DCHECK_NE(type_, SPDY_PUSH_STREAM);
863
864  if (just_completed_frame_type_ != DATA) {
865    NOTREACHED();
866    return ERR_UNEXPECTED;
867  }
868
869  if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) {
870    NOTREACHED();
871    return ERR_UNEXPECTED;
872  }
873
874  size_t frame_payload_size =
875      just_completed_frame_size_ - session_->GetDataFrameMinimumSize();
876  if (frame_payload_size > session_->GetDataFrameMaximumPayload()) {
877    NOTREACHED();
878    return ERR_UNEXPECTED;
879  }
880
881  // Set |io_state_| first as |delegate_| may check it.
882  io_state_ = STATE_IDLE;
883
884  send_bytes_ += frame_payload_size;
885
886  pending_send_data_->DidConsume(frame_payload_size);
887  if (pending_send_data_->BytesRemaining() > 0) {
888    QueueNextDataFrame();
889    return ERR_IO_PENDING;
890  }
891
892  pending_send_data_ = NULL;
893
894  CHECK(delegate_);
895  // Must not close |this|; if it does, it will trigger the
896  // |in_do_loop_| check in the destructor.
897  delegate_->OnDataSent();
898
899  return GetOpenStateResult(type_, send_status_);
900}
901
902void SpdyStream::UpdateHistograms() {
903  // We need at least the receive timers to be filled in, as otherwise
904  // metrics can be bogus.
905  if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
906    return;
907
908  base::TimeTicks effective_send_time;
909  if (type_ == SPDY_PUSH_STREAM) {
910    // Push streams shouldn't have |send_time_| filled in.
911    DCHECK(send_time_.is_null());
912    effective_send_time = recv_first_byte_time_;
913  } else {
914    // For non-push streams, we also need |send_time_| to be filled
915    // in.
916    if (send_time_.is_null())
917      return;
918    effective_send_time = send_time_;
919  }
920
921  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
922                      recv_first_byte_time_ - effective_send_time);
923  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
924                      recv_last_byte_time_ - recv_first_byte_time_);
925  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
926                      recv_last_byte_time_ - effective_send_time);
927
928  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
929  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
930}
931
932void SpdyStream::QueueNextDataFrame() {
933  // Until the request has been completely sent, we cannot be sure
934  // that our stream_id is correct.
935  DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
936  CHECK_GT(stream_id_, 0u);
937  CHECK(pending_send_data_.get());
938  CHECK_GT(pending_send_data_->BytesRemaining(), 0);
939
940  SpdyDataFlags flags =
941      (send_status_ == NO_MORE_DATA_TO_SEND) ?
942      DATA_FLAG_FIN : DATA_FLAG_NONE;
943  scoped_ptr<SpdyBuffer> data_buffer(
944      session_->CreateDataBuffer(stream_id_,
945                                 pending_send_data_.get(),
946                                 pending_send_data_->BytesRemaining(),
947                                 flags));
948  // We'll get called again by PossiblyResumeIfSendStalled().
949  if (!data_buffer)
950    return;
951
952  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
953    DCHECK_GE(data_buffer->GetRemainingSize(),
954              session_->GetDataFrameMinimumSize());
955    size_t payload_size =
956        data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
957    DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
958    DecreaseSendWindowSize(static_cast<int32>(payload_size));
959    // This currently isn't strictly needed, since write frames are
960    // discarded only if the stream is about to be closed. But have it
961    // here anyway just in case this changes.
962    data_buffer->AddConsumeCallback(
963        base::Bind(&SpdyStream::OnWriteBufferConsumed,
964                   GetWeakPtr(), payload_size));
965  }
966
967  session_->EnqueueStreamWrite(
968      GetWeakPtr(), DATA,
969      scoped_ptr<SpdyBufferProducer>(
970          new SimpleBufferProducer(data_buffer.Pass())));
971}
972
973int SpdyStream::MergeWithResponseHeaders(
974    const SpdyHeaderBlock& new_response_headers) {
975  if (new_response_headers.find("transfer-encoding") !=
976      new_response_headers.end()) {
977    session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
978                         "Received transfer-encoding header");
979    return ERR_SPDY_PROTOCOL_ERROR;
980  }
981
982  for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
983      it != new_response_headers.end(); ++it) {
984    // Disallow uppercase headers.
985    if (ContainsUppercaseAscii(it->first)) {
986      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
987                            "Upper case characters in header: " + it->first);
988      return ERR_SPDY_PROTOCOL_ERROR;
989    }
990
991    SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
992    // Disallow duplicate headers.  This is just to be conservative.
993    if (it2 != response_headers_.end() && it2->first == it->first) {
994      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
995                            "Duplicate header: " + it->first);
996      return ERR_SPDY_PROTOCOL_ERROR;
997    }
998
999    response_headers_.insert(it2, *it);
1000  }
1001
1002  // If delegate_ is not yet attached, we'll call
1003  // OnResponseHeadersUpdated() after the delegate gets attached to
1004  // the stream.
1005  if (delegate_) {
1006    // The call to OnResponseHeadersUpdated() below may delete |this|,
1007    // so use |weak_this| to detect that.
1008    base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
1009
1010    SpdyResponseHeadersStatus status =
1011        delegate_->OnResponseHeadersUpdated(response_headers_);
1012    if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
1013      // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
1014      // have been closed.
1015      CHECK(weak_this);
1016      // Incomplete headers are OK only for push streams.
1017      if (type_ != SPDY_PUSH_STREAM) {
1018        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
1019                              "Incomplete headers");
1020        return ERR_INCOMPLETE_SPDY_HEADERS;
1021      }
1022    } else if (weak_this) {
1023      response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
1024    }
1025  }
1026
1027  return OK;
1028}
1029
1030}  // namespace net
1031