1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_stream.h"
6
7#include "base/bind.h"
8#include "base/compiler_specific.h"
9#include "base/logging.h"
10#include "base/message_loop/message_loop.h"
11#include "base/strings/string_number_conversions.h"
12#include "base/strings/stringprintf.h"
13#include "base/values.h"
14#include "net/spdy/spdy_buffer_producer.h"
15#include "net/spdy/spdy_http_utils.h"
16#include "net/spdy/spdy_session.h"
17
18namespace net {
19
20namespace {
21
22base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
23                                           int status,
24                                           const std::string* description,
25                                           NetLog::LogLevel /* log_level */) {
26  base::DictionaryValue* dict = new base::DictionaryValue();
27  dict->SetInteger("stream_id", static_cast<int>(stream_id));
28  dict->SetInteger("status", status);
29  dict->SetString("description", *description);
30  return dict;
31}
32
33base::Value* NetLogSpdyStreamWindowUpdateCallback(
34    SpdyStreamId stream_id,
35    int32 delta,
36    int32 window_size,
37    NetLog::LogLevel /* log_level */) {
38  base::DictionaryValue* dict = new base::DictionaryValue();
39  dict->SetInteger("stream_id", stream_id);
40  dict->SetInteger("delta", delta);
41  dict->SetInteger("window_size", window_size);
42  return dict;
43}
44
45bool ContainsUppercaseAscii(const std::string& str) {
46  for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
47    if (*i >= 'A' && *i <= 'Z') {
48      return true;
49    }
50  }
51  return false;
52}
53
54}  // namespace
55
56// A wrapper around a stream that calls into ProduceSynStreamFrame().
57class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58 public:
59  SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60      : stream_(stream) {
61    DCHECK(stream_.get());
62  }
63
64  virtual ~SynStreamBufferProducer() {}
65
66  virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67    if (!stream_.get()) {
68      NOTREACHED();
69      return scoped_ptr<SpdyBuffer>();
70    }
71    DCHECK_GT(stream_->stream_id(), 0u);
72    return scoped_ptr<SpdyBuffer>(
73        new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74  }
75
76 private:
77  const base::WeakPtr<SpdyStream> stream_;
78};
79
80SpdyStream::SpdyStream(SpdyStreamType type,
81                       const base::WeakPtr<SpdySession>& session,
82                       const GURL& url,
83                       RequestPriority priority,
84                       int32 initial_send_window_size,
85                       int32 initial_recv_window_size,
86                       const BoundNetLog& net_log)
87    : type_(type),
88      weak_ptr_factory_(this),
89      in_do_loop_(false),
90      continue_buffering_data_(type_ == SPDY_PUSH_STREAM),
91      stream_id_(0),
92      url_(url),
93      priority_(priority),
94      slot_(0),
95      send_stalled_by_flow_control_(false),
96      send_window_size_(initial_send_window_size),
97      recv_window_size_(initial_recv_window_size),
98      unacked_recv_window_bytes_(0),
99      session_(session),
100      delegate_(NULL),
101      send_status_(
102          (type_ == SPDY_PUSH_STREAM) ?
103          NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND),
104      request_time_(base::Time::Now()),
105      response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
106      io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE),
107      response_status_(OK),
108      net_log_(net_log),
109      send_bytes_(0),
110      recv_bytes_(0),
111      just_completed_frame_type_(DATA),
112      just_completed_frame_size_(0) {
113  CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
114        type_ == SPDY_REQUEST_RESPONSE_STREAM ||
115        type_ == SPDY_PUSH_STREAM);
116}
117
118SpdyStream::~SpdyStream() {
119  CHECK(!in_do_loop_);
120  UpdateHistograms();
121}
122
123void SpdyStream::SetDelegate(Delegate* delegate) {
124  CHECK(!delegate_);
125  CHECK(delegate);
126  delegate_ = delegate;
127
128  if (type_ == SPDY_PUSH_STREAM) {
129    DCHECK(continue_buffering_data_);
130    base::MessageLoop::current()->PostTask(
131        FROM_HERE,
132        base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr()));
133  }
134}
135
136void SpdyStream::PushedStreamReplayData() {
137  DCHECK_EQ(type_, SPDY_PUSH_STREAM);
138  DCHECK_NE(stream_id_, 0u);
139  DCHECK(continue_buffering_data_);
140
141  continue_buffering_data_ = false;
142
143  // The delegate methods called below may delete |this|, so use
144  // |weak_this| to detect that.
145  base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
146
147  CHECK(delegate_);
148  SpdyResponseHeadersStatus status =
149      delegate_->OnResponseHeadersUpdated(response_headers_);
150  if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
151    // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
152    // have been closed. Since we don't have complete headers, assume
153    // we're waiting for another HEADERS frame, and we had better not
154    // have any pending data frames.
155    CHECK(weak_this);
156    if (!pending_buffers_.empty()) {
157      LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
158                     "Data received with incomplete headers.");
159      session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
160    }
161    return;
162  }
163
164  // OnResponseHeadersUpdated() may have closed |this|.
165  if (!weak_this)
166    return;
167
168  response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
169
170  while (!pending_buffers_.empty()) {
171    // Take ownership of the first element of |pending_buffers_|.
172    scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front());
173    pending_buffers_.weak_erase(pending_buffers_.begin());
174
175    bool eof = (buffer == NULL);
176
177    CHECK(delegate_);
178    delegate_->OnDataReceived(buffer.Pass());
179
180    // OnDataReceived() may have closed |this|.
181    if (!weak_this)
182      return;
183
184    if (eof) {
185      DCHECK(pending_buffers_.empty());
186      session_->CloseActiveStream(stream_id_, OK);
187      DCHECK(!weak_this);
188      // |pending_buffers_| is invalid at this point.
189      break;
190    }
191  }
192}
193
194scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
195  CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
196  CHECK(request_headers_);
197  CHECK_GT(stream_id_, 0u);
198
199  SpdyControlFlags flags =
200      (send_status_ == NO_MORE_DATA_TO_SEND) ?
201      CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
202  scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
203      stream_id_, priority_, slot_, flags, *request_headers_));
204  send_time_ = base::TimeTicks::Now();
205  return frame.Pass();
206}
207
208void SpdyStream::DetachDelegate() {
209  CHECK(!in_do_loop_);
210  DCHECK(!IsClosed());
211  delegate_ = NULL;
212  Cancel();
213}
214
215void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
216  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
217
218  if (IsClosed())
219    return;
220
221  // Check for wraparound.
222  if (send_window_size_ > 0) {
223    DCHECK_LE(delta_window_size, kint32max - send_window_size_);
224  }
225  if (send_window_size_ < 0) {
226    DCHECK_GE(delta_window_size, kint32min - send_window_size_);
227  }
228  send_window_size_ += delta_window_size;
229  PossiblyResumeIfSendStalled();
230}
231
232void SpdyStream::OnWriteBufferConsumed(
233    size_t frame_payload_size,
234    size_t consume_size,
235    SpdyBuffer::ConsumeSource consume_source) {
236  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
237  if (consume_source == SpdyBuffer::DISCARD) {
238    // If we're discarding a frame or part of it, increase the send
239    // window by the number of discarded bytes. (Although if we're
240    // discarding part of a frame, it's probably because of a write
241    // error and we'll be tearing down the stream soon.)
242    size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
243    DCHECK_GT(remaining_payload_bytes, 0u);
244    IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
245  }
246  // For consumed bytes, the send window is increased when we receive
247  // a WINDOW_UPDATE frame.
248}
249
250void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
251  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
252  DCHECK_GE(delta_window_size, 1);
253
254  // Ignore late WINDOW_UPDATEs.
255  if (IsClosed())
256    return;
257
258  if (send_window_size_ > 0) {
259    // Check for overflow.
260    int32 max_delta_window_size = kint32max - send_window_size_;
261    if (delta_window_size > max_delta_window_size) {
262      std::string desc = base::StringPrintf(
263          "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
264          "send_window_size_ [current: %d]", delta_window_size, stream_id_,
265          send_window_size_);
266      session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
267      return;
268    }
269  }
270
271  send_window_size_ += delta_window_size;
272
273  net_log_.AddEvent(
274      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
275      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
276                 stream_id_, delta_window_size, send_window_size_));
277
278  PossiblyResumeIfSendStalled();
279}
280
281void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
282  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
283
284  if (IsClosed())
285    return;
286
287  // We only call this method when sending a frame. Therefore,
288  // |delta_window_size| should be within the valid frame size range.
289  DCHECK_GE(delta_window_size, 1);
290  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
291
292  // |send_window_size_| should have been at least |delta_window_size| for
293  // this call to happen.
294  DCHECK_GE(send_window_size_, delta_window_size);
295
296  send_window_size_ -= delta_window_size;
297
298  net_log_.AddEvent(
299      NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
300      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
301                 stream_id_, -delta_window_size, send_window_size_));
302}
303
304void SpdyStream::OnReadBufferConsumed(
305    size_t consume_size,
306    SpdyBuffer::ConsumeSource consume_source) {
307  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
308  DCHECK_GE(consume_size, 1u);
309  DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
310  IncreaseRecvWindowSize(static_cast<int32>(consume_size));
311}
312
313void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
314  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
315
316  // By the time a read is processed by the delegate, this stream may
317  // already be inactive.
318  if (!session_->IsStreamActive(stream_id_))
319    return;
320
321  DCHECK_GE(unacked_recv_window_bytes_, 0);
322  DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
323  DCHECK_GE(delta_window_size, 1);
324  // Check for overflow.
325  DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
326
327  recv_window_size_ += delta_window_size;
328  net_log_.AddEvent(
329      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
330      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
331                 stream_id_, delta_window_size, recv_window_size_));
332
333  unacked_recv_window_bytes_ += delta_window_size;
334  if (unacked_recv_window_bytes_ >
335      session_->stream_initial_recv_window_size() / 2) {
336    session_->SendStreamWindowUpdate(
337        stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
338    unacked_recv_window_bytes_ = 0;
339  }
340}
341
342void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
343  DCHECK(session_->IsStreamActive(stream_id_));
344  DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
345  DCHECK_GE(delta_window_size, 1);
346
347  // Since we never decrease the initial receive window size,
348  // |delta_window_size| should never cause |recv_window_size_| to go
349  // negative. If we do, the receive window isn't being respected.
350  if (delta_window_size > recv_window_size_) {
351    session_->ResetStream(
352        stream_id_, RST_STREAM_PROTOCOL_ERROR,
353        "delta_window_size is " + base::IntToString(delta_window_size) +
354            " in DecreaseRecvWindowSize, which is larger than the receive " +
355            "window size of " + base::IntToString(recv_window_size_));
356    return;
357  }
358
359  recv_window_size_ -= delta_window_size;
360  net_log_.AddEvent(
361      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
362      base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
363                 stream_id_, -delta_window_size, recv_window_size_));
364}
365
366int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
367  return session_->GetPeerAddress(address);
368}
369
370int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
371  return session_->GetLocalAddress(address);
372}
373
374bool SpdyStream::WasEverUsed() const {
375  return session_->WasEverUsed();
376}
377
378base::Time SpdyStream::GetRequestTime() const {
379  return request_time_;
380}
381
382void SpdyStream::SetRequestTime(base::Time t) {
383  request_time_ = t;
384}
385
386int SpdyStream::OnInitialResponseHeadersReceived(
387    const SpdyHeaderBlock& initial_response_headers,
388    base::Time response_time,
389    base::TimeTicks recv_first_byte_time) {
390  // SpdySession guarantees that this is called at most once.
391  CHECK(response_headers_.empty());
392
393  // Check to make sure that we don't receive the response headers
394  // before we're ready for it.
395  switch (type_) {
396    case SPDY_BIDIRECTIONAL_STREAM:
397      // For a bidirectional stream, we're ready for the response
398      // headers once we've finished sending the request headers.
399      if (io_state_ < STATE_IDLE) {
400        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
401                              "Response received before request sent");
402        return ERR_SPDY_PROTOCOL_ERROR;
403      }
404      break;
405
406    case SPDY_REQUEST_RESPONSE_STREAM:
407      // For a request/response stream, we're ready for the response
408      // headers once we've finished sending the request headers and
409      // the request body (if we have one).
410      if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) ||
411          pending_send_data_.get()) {
412        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
413                              "Response received before request sent");
414        return ERR_SPDY_PROTOCOL_ERROR;
415      }
416      break;
417
418    case SPDY_PUSH_STREAM:
419      // For a push stream, we're ready immediately.
420      DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND);
421      DCHECK_EQ(io_state_, STATE_IDLE);
422      break;
423  }
424
425  metrics_.StartStream();
426
427  DCHECK_EQ(io_state_, STATE_IDLE);
428
429  response_time_ = response_time;
430  recv_first_byte_time_ = recv_first_byte_time;
431  return MergeWithResponseHeaders(initial_response_headers);
432}
433
434int SpdyStream::OnAdditionalResponseHeadersReceived(
435    const SpdyHeaderBlock& additional_response_headers) {
436  if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
437    session_->ResetStream(
438        stream_id_, RST_STREAM_PROTOCOL_ERROR,
439        "Additional headers received for request/response stream");
440    return ERR_SPDY_PROTOCOL_ERROR;
441  } else if (type_ == SPDY_PUSH_STREAM &&
442             response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
443    session_->ResetStream(
444        stream_id_, RST_STREAM_PROTOCOL_ERROR,
445        "Additional headers received for push stream");
446    return ERR_SPDY_PROTOCOL_ERROR;
447  }
448  return MergeWithResponseHeaders(additional_response_headers);
449}
450
451void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
452  DCHECK(session_->IsStreamActive(stream_id_));
453
454  // If we're still buffering data for a push stream, we will do the
455  // check for data received with incomplete headers in
456  // PushedStreamReplayData().
457  if (!delegate_ || continue_buffering_data_) {
458    DCHECK_EQ(type_, SPDY_PUSH_STREAM);
459    // It should be valid for this to happen in the server push case.
460    // We'll return received data when delegate gets attached to the stream.
461    if (buffer) {
462      pending_buffers_.push_back(buffer.release());
463    } else {
464      pending_buffers_.push_back(NULL);
465      metrics_.StopStream();
466      // Note: we leave the stream open in the session until the stream
467      //       is claimed.
468    }
469    return;
470  }
471
472  // If we have response headers but the delegate has indicated that
473  // it's still incomplete, then that's a protocol error.
474  if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
475    LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
476                   "Data received with incomplete headers.");
477    session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
478    return;
479  }
480
481  CHECK(!IsClosed());
482
483  if (!buffer) {
484    metrics_.StopStream();
485    // Deletes |this|.
486    session_->CloseActiveStream(stream_id_, OK);
487    return;
488  }
489
490  size_t length = buffer->GetRemainingSize();
491  DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
492  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
493    DecreaseRecvWindowSize(static_cast<int32>(length));
494    buffer->AddConsumeCallback(
495        base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
496  }
497
498  // Track our bandwidth.
499  metrics_.RecordBytes(length);
500  recv_bytes_ += length;
501  recv_last_byte_time_ = base::TimeTicks::Now();
502
503  // May close |this|.
504  delegate_->OnDataReceived(buffer.Pass());
505}
506
507void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
508                                      size_t frame_size) {
509  if (frame_size < session_->GetFrameMinimumSize() ||
510      frame_size > session_->GetFrameMaximumSize()) {
511    NOTREACHED();
512    return;
513  }
514  if (IsClosed())
515    return;
516  just_completed_frame_type_ = frame_type;
517  just_completed_frame_size_ = frame_size;
518  DoLoop(OK);
519}
520
521int SpdyStream::GetProtocolVersion() const {
522  return session_->GetProtocolVersion();
523}
524
525void SpdyStream::LogStreamError(int status, const std::string& description) {
526  net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
527                    base::Bind(&NetLogSpdyStreamErrorCallback,
528                               stream_id_, status, &description));
529}
530
531void SpdyStream::OnClose(int status) {
532  CHECK(!in_do_loop_);
533  io_state_ = STATE_CLOSED;
534  response_status_ = status;
535  Delegate* delegate = delegate_;
536  delegate_ = NULL;
537  if (delegate)
538    delegate->OnClose(status);
539  // Unset |stream_id_| last so that the delegate can look it up.
540  stream_id_ = 0;
541}
542
543void SpdyStream::Cancel() {
544  CHECK(!in_do_loop_);
545  // We may be called again from a delegate's OnClose().
546  if (io_state_ == STATE_CLOSED)
547    return;
548
549  if (stream_id_ != 0) {
550    session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
551  } else {
552    session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
553  }
554  // |this| is invalid at this point.
555}
556
557void SpdyStream::Close() {
558  CHECK(!in_do_loop_);
559  // We may be called again from a delegate's OnClose().
560  if (io_state_ == STATE_CLOSED)
561    return;
562
563  if (stream_id_ != 0) {
564    session_->CloseActiveStream(stream_id_, OK);
565  } else {
566    session_->CloseCreatedStream(GetWeakPtr(), OK);
567  }
568  // |this| is invalid at this point.
569}
570
571base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
572  return weak_ptr_factory_.GetWeakPtr();
573}
574
575int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
576                                   SpdySendStatus send_status) {
577  CHECK_NE(type_, SPDY_PUSH_STREAM);
578  CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
579  CHECK(!request_headers_);
580  CHECK(!pending_send_data_.get());
581  CHECK_EQ(io_state_, STATE_NONE);
582  request_headers_ = request_headers.Pass();
583  send_status_ = send_status;
584  io_state_ = STATE_GET_DOMAIN_BOUND_CERT;
585  return DoLoop(OK);
586}
587
588void SpdyStream::SendData(IOBuffer* data,
589                          int length,
590                          SpdySendStatus send_status) {
591  CHECK_NE(type_, SPDY_PUSH_STREAM);
592  CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
593  CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
594  CHECK(!pending_send_data_.get());
595  pending_send_data_ = new DrainableIOBuffer(data, length);
596  send_status_ = send_status;
597  QueueNextDataFrame();
598}
599
600bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
601                            bool* was_npn_negotiated,
602                            NextProto* protocol_negotiated) {
603  return session_->GetSSLInfo(
604      ssl_info, was_npn_negotiated, protocol_negotiated);
605}
606
607bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
608  return session_->GetSSLCertRequestInfo(cert_request_info);
609}
610
611void SpdyStream::PossiblyResumeIfSendStalled() {
612  DCHECK(!IsClosed());
613
614  if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
615      send_window_size_ > 0) {
616    net_log_.AddEvent(
617        NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
618        NetLog::IntegerCallback("stream_id", stream_id_));
619    send_stalled_by_flow_control_ = false;
620    QueueNextDataFrame();
621  }
622}
623
624bool SpdyStream::IsClosed() const {
625  return io_state_ == STATE_CLOSED;
626}
627
628bool SpdyStream::IsIdle() const {
629  return io_state_ == STATE_IDLE;
630}
631
632NextProto SpdyStream::GetProtocol() const {
633  return session_->protocol();
634}
635
636bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
637  if (stream_id_ == 0)
638    return false;
639
640  return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
641}
642
643GURL SpdyStream::GetUrlFromHeaders() const {
644  if (type_ != SPDY_PUSH_STREAM && !request_headers_)
645    return GURL();
646
647  const SpdyHeaderBlock& headers =
648      (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_;
649  return GetUrlFromHeaderBlock(headers, GetProtocolVersion(),
650                               type_ == SPDY_PUSH_STREAM);
651}
652
653bool SpdyStream::HasUrlFromHeaders() const {
654  return !GetUrlFromHeaders().is_empty();
655}
656
657void SpdyStream::OnGetDomainBoundCertComplete(int result) {
658  DCHECK_EQ(io_state_, STATE_GET_DOMAIN_BOUND_CERT_COMPLETE);
659  DoLoop(result);
660}
661
662int SpdyStream::DoLoop(int result) {
663  CHECK(!in_do_loop_);
664  in_do_loop_ = true;
665
666  do {
667    State state = io_state_;
668    io_state_ = STATE_NONE;
669    switch (state) {
670      case STATE_GET_DOMAIN_BOUND_CERT:
671        CHECK_EQ(result, OK);
672        result = DoGetDomainBoundCert();
673        break;
674      case STATE_GET_DOMAIN_BOUND_CERT_COMPLETE:
675        result = DoGetDomainBoundCertComplete(result);
676        break;
677      case STATE_SEND_DOMAIN_BOUND_CERT:
678        CHECK_EQ(result, OK);
679        result = DoSendDomainBoundCert();
680        break;
681      case STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE:
682        result = DoSendDomainBoundCertComplete(result);
683        break;
684      case STATE_SEND_REQUEST_HEADERS:
685        CHECK_EQ(result, OK);
686        result = DoSendRequestHeaders();
687        break;
688      case STATE_SEND_REQUEST_HEADERS_COMPLETE:
689        CHECK_EQ(result, OK);
690        result = DoSendRequestHeadersComplete();
691        break;
692
693      // For request/response streams, no data is sent from the client
694      // while in the OPEN state, so OnFrameWriteComplete is never
695      // called here.  The HTTP body is handled in the OnDataReceived
696      // callback, which does not call into DoLoop.
697      //
698      // For bidirectional streams, we'll send and receive data once
699      // the connection is established.  Received data is handled in
700      // OnDataReceived.  Sent data is handled in
701      // OnFrameWriteComplete, which calls DoOpen().
702      case STATE_IDLE:
703        CHECK_EQ(result, OK);
704        result = DoOpen();
705        break;
706
707      case STATE_CLOSED:
708        DCHECK_NE(result, ERR_IO_PENDING);
709        break;
710      default:
711        NOTREACHED() << io_state_;
712        break;
713    }
714  } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
715           io_state_ != STATE_IDLE);
716
717  CHECK(in_do_loop_);
718  in_do_loop_ = false;
719
720  return result;
721}
722
723int SpdyStream::DoGetDomainBoundCert() {
724  CHECK(request_headers_);
725  DCHECK_NE(type_, SPDY_PUSH_STREAM);
726  GURL url = GetUrlFromHeaders();
727  if (!session_->NeedsCredentials() || !url.SchemeIs("https")) {
728    // Proceed directly to sending the request headers
729    io_state_ = STATE_SEND_REQUEST_HEADERS;
730    return OK;
731  }
732
733  slot_ = session_->credential_state()->FindCredentialSlot(GetUrlFromHeaders());
734  if (slot_ != SpdyCredentialState::kNoEntry) {
735    // Proceed directly to sending the request headers
736    io_state_ = STATE_SEND_REQUEST_HEADERS;
737    return OK;
738  }
739
740  io_state_ = STATE_GET_DOMAIN_BOUND_CERT_COMPLETE;
741  ServerBoundCertService* sbc_service = session_->GetServerBoundCertService();
742  DCHECK(sbc_service != NULL);
743  int rv = sbc_service->GetDomainBoundCert(
744      url.GetOrigin().host(),
745      &domain_bound_private_key_,
746      &domain_bound_cert_,
747      base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()),
748      &domain_bound_cert_request_handle_);
749  return rv;
750}
751
752int SpdyStream::DoGetDomainBoundCertComplete(int result) {
753  DCHECK_NE(type_, SPDY_PUSH_STREAM);
754  if (result != OK)
755    return result;
756
757  io_state_ = STATE_SEND_DOMAIN_BOUND_CERT;
758  slot_ =  session_->credential_state()->SetHasCredential(GetUrlFromHeaders());
759  return OK;
760}
761
762int SpdyStream::DoSendDomainBoundCert() {
763  CHECK(request_headers_);
764  DCHECK_NE(type_, SPDY_PUSH_STREAM);
765  io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
766
767  std::string origin = GetUrlFromHeaders().GetOrigin().spec();
768  DCHECK(origin[origin.length() - 1] == '/');
769  origin.erase(origin.length() - 1);  // Trim trailing slash.
770  scoped_ptr<SpdyFrame> frame;
771  int rv = session_->CreateCredentialFrame(
772      origin,
773      domain_bound_private_key_,
774      domain_bound_cert_,
775      priority_,
776      &frame);
777  if (rv != OK) {
778    DCHECK_NE(rv, ERR_IO_PENDING);
779    return rv;
780  }
781
782  DCHECK(frame);
783  // TODO(akalin): Fix the following race condition:
784  //
785  // Since this is decoupled from sending the SYN_STREAM frame, it is
786  // possible that other domain-bound cert frames will clobber ours
787  // before our SYN_STREAM frame gets sent. This can be solved by
788  // immediately enqueueing the SYN_STREAM frame here and adjusting
789  // the state machine appropriately.
790  session_->EnqueueStreamWrite(
791      GetWeakPtr(), CREDENTIAL,
792      scoped_ptr<SpdyBufferProducer>(
793          new SimpleBufferProducer(
794              scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
795  return ERR_IO_PENDING;
796}
797
798int SpdyStream::DoSendDomainBoundCertComplete(int result) {
799  DCHECK_NE(type_, SPDY_PUSH_STREAM);
800  if (result != OK)
801    return result;
802
803  DCHECK_EQ(just_completed_frame_type_, CREDENTIAL);
804  io_state_ = STATE_SEND_REQUEST_HEADERS;
805  return OK;
806}
807
808int SpdyStream::DoSendRequestHeaders() {
809  DCHECK_NE(type_, SPDY_PUSH_STREAM);
810  io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE;
811
812  session_->EnqueueStreamWrite(
813      GetWeakPtr(), SYN_STREAM,
814      scoped_ptr<SpdyBufferProducer>(
815          new SynStreamBufferProducer(GetWeakPtr())));
816  return ERR_IO_PENDING;
817}
818
819namespace {
820
821// Assuming we're in STATE_IDLE, maps the given type (which must not
822// be SPDY_PUSH_STREAM) and send status to a result to return from
823// DoSendRequestHeadersComplete() or DoOpen().
824int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) {
825  switch (type) {
826    case SPDY_BIDIRECTIONAL_STREAM:
827      // For bidirectional streams, there's nothing else to do.
828      DCHECK_EQ(send_status, MORE_DATA_TO_SEND);
829      return OK;
830
831    case SPDY_REQUEST_RESPONSE_STREAM:
832      // For request/response streams, wait for the delegate to send
833      // data if there's request data to send; we'll get called back
834      // when the send finishes.
835      if (send_status == MORE_DATA_TO_SEND)
836        return ERR_IO_PENDING;
837
838      return OK;
839
840    case SPDY_PUSH_STREAM:
841      // This should never be called for push streams.
842      break;
843  }
844
845  CHECK(false);
846  return ERR_UNEXPECTED;
847}
848
849}  // namespace
850
851int SpdyStream::DoSendRequestHeadersComplete() {
852  DCHECK_NE(type_, SPDY_PUSH_STREAM);
853  DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
854  DCHECK_NE(stream_id_, 0u);
855
856  io_state_ = STATE_IDLE;
857
858  CHECK(delegate_);
859  // Must not close |this|; if it does, it will trigger the |in_do_loop_|
860  // check in the destructor.
861  delegate_->OnRequestHeadersSent();
862
863  return GetOpenStateResult(type_, send_status_);
864}
865
866int SpdyStream::DoOpen() {
867  DCHECK_NE(type_, SPDY_PUSH_STREAM);
868
869  if (just_completed_frame_type_ != DATA) {
870    NOTREACHED();
871    return ERR_UNEXPECTED;
872  }
873
874  if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) {
875    NOTREACHED();
876    return ERR_UNEXPECTED;
877  }
878
879  size_t frame_payload_size =
880      just_completed_frame_size_ - session_->GetDataFrameMinimumSize();
881  if (frame_payload_size > session_->GetDataFrameMaximumPayload()) {
882    NOTREACHED();
883    return ERR_UNEXPECTED;
884  }
885
886  // Set |io_state_| first as |delegate_| may check it.
887  io_state_ = STATE_IDLE;
888
889  send_bytes_ += frame_payload_size;
890
891  pending_send_data_->DidConsume(frame_payload_size);
892  if (pending_send_data_->BytesRemaining() > 0) {
893    QueueNextDataFrame();
894    return ERR_IO_PENDING;
895  }
896
897  pending_send_data_ = NULL;
898
899  CHECK(delegate_);
900  // Must not close |this|; if it does, it will trigger the
901  // |in_do_loop_| check in the destructor.
902  delegate_->OnDataSent();
903
904  return GetOpenStateResult(type_, send_status_);
905}
906
907void SpdyStream::UpdateHistograms() {
908  // We need at least the receive timers to be filled in, as otherwise
909  // metrics can be bogus.
910  if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
911    return;
912
913  base::TimeTicks effective_send_time;
914  if (type_ == SPDY_PUSH_STREAM) {
915    // Push streams shouldn't have |send_time_| filled in.
916    DCHECK(send_time_.is_null());
917    effective_send_time = recv_first_byte_time_;
918  } else {
919    // For non-push streams, we also need |send_time_| to be filled
920    // in.
921    if (send_time_.is_null())
922      return;
923    effective_send_time = send_time_;
924  }
925
926  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
927                      recv_first_byte_time_ - effective_send_time);
928  UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
929                      recv_last_byte_time_ - recv_first_byte_time_);
930  UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
931                      recv_last_byte_time_ - effective_send_time);
932
933  UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
934  UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
935}
936
937void SpdyStream::QueueNextDataFrame() {
938  // Until the request has been completely sent, we cannot be sure
939  // that our stream_id is correct.
940  DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
941  CHECK_GT(stream_id_, 0u);
942  CHECK(pending_send_data_.get());
943  CHECK_GT(pending_send_data_->BytesRemaining(), 0);
944
945  SpdyDataFlags flags =
946      (send_status_ == NO_MORE_DATA_TO_SEND) ?
947      DATA_FLAG_FIN : DATA_FLAG_NONE;
948  scoped_ptr<SpdyBuffer> data_buffer(
949      session_->CreateDataBuffer(stream_id_,
950                                 pending_send_data_.get(),
951                                 pending_send_data_->BytesRemaining(),
952                                 flags));
953  // We'll get called again by PossiblyResumeIfSendStalled().
954  if (!data_buffer)
955    return;
956
957  if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
958    DCHECK_GE(data_buffer->GetRemainingSize(),
959              session_->GetDataFrameMinimumSize());
960    size_t payload_size =
961        data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
962    DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
963    DecreaseSendWindowSize(static_cast<int32>(payload_size));
964    // This currently isn't strictly needed, since write frames are
965    // discarded only if the stream is about to be closed. But have it
966    // here anyway just in case this changes.
967    data_buffer->AddConsumeCallback(
968        base::Bind(&SpdyStream::OnWriteBufferConsumed,
969                   GetWeakPtr(), payload_size));
970  }
971
972  session_->EnqueueStreamWrite(
973      GetWeakPtr(), DATA,
974      scoped_ptr<SpdyBufferProducer>(
975          new SimpleBufferProducer(data_buffer.Pass())));
976}
977
978int SpdyStream::MergeWithResponseHeaders(
979    const SpdyHeaderBlock& new_response_headers) {
980  if (new_response_headers.find("transfer-encoding") !=
981      new_response_headers.end()) {
982    session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
983                         "Received transfer-encoding header");
984    return ERR_SPDY_PROTOCOL_ERROR;
985  }
986
987  for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
988      it != new_response_headers.end(); ++it) {
989    // Disallow uppercase headers.
990    if (ContainsUppercaseAscii(it->first)) {
991      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
992                            "Upper case characters in header: " + it->first);
993      return ERR_SPDY_PROTOCOL_ERROR;
994    }
995
996    SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
997    // Disallow duplicate headers.  This is just to be conservative.
998    if (it2 != response_headers_.end() && it2->first == it->first) {
999      session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
1000                            "Duplicate header: " + it->first);
1001      return ERR_SPDY_PROTOCOL_ERROR;
1002    }
1003
1004    response_headers_.insert(it2, *it);
1005  }
1006
1007  // If delegate_ is not yet attached, we'll call
1008  // OnResponseHeadersUpdated() after the delegate gets attached to
1009  // the stream.
1010  if (delegate_) {
1011    // The call to OnResponseHeadersUpdated() below may delete |this|,
1012    // so use |weak_this| to detect that.
1013    base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
1014
1015    SpdyResponseHeadersStatus status =
1016        delegate_->OnResponseHeadersUpdated(response_headers_);
1017    if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
1018      // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
1019      // have been closed.
1020      CHECK(weak_this);
1021      // Incomplete headers are OK only for push streams.
1022      if (type_ != SPDY_PUSH_STREAM) {
1023        session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
1024                              "Incomplete headers");
1025        return ERR_INCOMPLETE_SPDY_HEADERS;
1026      }
1027    } else if (weak_this) {
1028      response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
1029    }
1030  }
1031
1032  return OK;
1033}
1034
1035}  // namespace net
1036