spdy_stream.cc revision bbcdd45c55eb7c4641ab97aef9889b0fc828e7d3
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/spdy/spdy_stream.h" 6 7#include "base/bind.h" 8#include "base/compiler_specific.h" 9#include "base/logging.h" 10#include "base/message_loop/message_loop.h" 11#include "base/strings/string_number_conversions.h" 12#include "base/strings/stringprintf.h" 13#include "base/values.h" 14#include "net/spdy/spdy_buffer_producer.h" 15#include "net/spdy/spdy_http_utils.h" 16#include "net/spdy/spdy_session.h" 17 18namespace net { 19 20namespace { 21 22base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 23 int status, 24 const std::string* description, 25 NetLog::LogLevel /* log_level */) { 26 base::DictionaryValue* dict = new base::DictionaryValue(); 27 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 28 dict->SetInteger("status", status); 29 dict->SetString("description", *description); 30 return dict; 31} 32 33base::Value* NetLogSpdyStreamWindowUpdateCallback( 34 SpdyStreamId stream_id, 35 int32 delta, 36 int32 window_size, 37 NetLog::LogLevel /* log_level */) { 38 base::DictionaryValue* dict = new base::DictionaryValue(); 39 dict->SetInteger("stream_id", stream_id); 40 dict->SetInteger("delta", delta); 41 dict->SetInteger("window_size", window_size); 42 return dict; 43} 44 45bool ContainsUppercaseAscii(const std::string& str) { 46 for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) { 47 if (*i >= 'A' && *i <= 'Z') { 48 return true; 49 } 50 } 51 return false; 52} 53 54} // namespace 55 56// A wrapper around a stream that calls into ProduceSynStreamFrame(). 57class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { 58 public: 59 SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) 60 : stream_(stream) { 61 DCHECK(stream_.get()); 62 } 63 64 virtual ~SynStreamBufferProducer() {} 65 66 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 67 if (!stream_.get()) { 68 NOTREACHED(); 69 return scoped_ptr<SpdyBuffer>(); 70 } 71 DCHECK_GT(stream_->stream_id(), 0u); 72 return scoped_ptr<SpdyBuffer>( 73 new SpdyBuffer(stream_->ProduceSynStreamFrame())); 74 } 75 76 private: 77 const base::WeakPtr<SpdyStream> stream_; 78}; 79 80SpdyStream::SpdyStream(SpdyStreamType type, 81 const base::WeakPtr<SpdySession>& session, 82 const GURL& url, 83 RequestPriority priority, 84 int32 initial_send_window_size, 85 int32 initial_recv_window_size, 86 const BoundNetLog& net_log) 87 : type_(type), 88 weak_ptr_factory_(this), 89 in_do_loop_(false), 90 continue_buffering_data_(type_ == SPDY_PUSH_STREAM), 91 stream_id_(0), 92 url_(url), 93 priority_(priority), 94 slot_(0), 95 send_stalled_by_flow_control_(false), 96 send_window_size_(initial_send_window_size), 97 recv_window_size_(initial_recv_window_size), 98 unacked_recv_window_bytes_(0), 99 session_(session), 100 delegate_(NULL), 101 send_status_( 102 (type_ == SPDY_PUSH_STREAM) ? 103 NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND), 104 request_time_(base::Time::Now()), 105 response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE), 106 io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE), 107 response_status_(OK), 108 net_log_(net_log), 109 send_bytes_(0), 110 recv_bytes_(0), 111 domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE), 112 just_completed_frame_type_(DATA), 113 just_completed_frame_size_(0) { 114 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM || 115 type_ == SPDY_REQUEST_RESPONSE_STREAM || 116 type_ == SPDY_PUSH_STREAM); 117} 118 119SpdyStream::~SpdyStream() { 120 CHECK(!in_do_loop_); 121 UpdateHistograms(); 122} 123 124void SpdyStream::SetDelegate(Delegate* delegate) { 125 CHECK(!delegate_); 126 CHECK(delegate); 127 delegate_ = delegate; 128 129 if (type_ == SPDY_PUSH_STREAM) { 130 DCHECK(continue_buffering_data_); 131 base::MessageLoop::current()->PostTask( 132 FROM_HERE, 133 base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr())); 134 } 135} 136 137void SpdyStream::PushedStreamReplayData() { 138 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 139 DCHECK_NE(stream_id_, 0u); 140 DCHECK(continue_buffering_data_); 141 142 continue_buffering_data_ = false; 143 144 // The delegate methods called below may delete |this|, so use 145 // |weak_this| to detect that. 146 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 147 148 CHECK(delegate_); 149 SpdyResponseHeadersStatus status = 150 delegate_->OnResponseHeadersUpdated(response_headers_); 151 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 152 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 153 // have been closed. Since we don't have complete headers, assume 154 // we're waiting for another HEADERS frame, and we had better not 155 // have any pending data frames. 156 CHECK(weak_this); 157 if (!pending_buffers_.empty()) { 158 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 159 "Data received with incomplete headers."); 160 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 161 } 162 return; 163 } 164 165 // OnResponseHeadersUpdated() may have closed |this|. 166 if (!weak_this) 167 return; 168 169 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 170 171 while (!pending_buffers_.empty()) { 172 // Take ownership of the first element of |pending_buffers_|. 173 scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front()); 174 pending_buffers_.weak_erase(pending_buffers_.begin()); 175 176 bool eof = (buffer == NULL); 177 178 CHECK(delegate_); 179 delegate_->OnDataReceived(buffer.Pass()); 180 181 // OnDataReceived() may have closed |this|. 182 if (!weak_this) 183 return; 184 185 if (eof) { 186 DCHECK(pending_buffers_.empty()); 187 session_->CloseActiveStream(stream_id_, OK); 188 DCHECK(!weak_this); 189 // |pending_buffers_| is invalid at this point. 190 break; 191 } 192 } 193} 194 195scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { 196 CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 197 CHECK(request_headers_); 198 CHECK_GT(stream_id_, 0u); 199 200 SpdyControlFlags flags = 201 (send_status_ == NO_MORE_DATA_TO_SEND) ? 202 CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; 203 scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( 204 stream_id_, priority_, slot_, flags, *request_headers_)); 205 send_time_ = base::TimeTicks::Now(); 206 return frame.Pass(); 207} 208 209void SpdyStream::DetachDelegate() { 210 CHECK(!in_do_loop_); 211 DCHECK(!IsClosed()); 212 delegate_ = NULL; 213 Cancel(); 214} 215 216void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { 217 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 218 219 if (IsClosed()) 220 return; 221 222 // Check for wraparound. 223 if (send_window_size_ > 0) { 224 DCHECK_LE(delta_window_size, kint32max - send_window_size_); 225 } 226 if (send_window_size_ < 0) { 227 DCHECK_GE(delta_window_size, kint32min - send_window_size_); 228 } 229 send_window_size_ += delta_window_size; 230 PossiblyResumeIfSendStalled(); 231} 232 233void SpdyStream::OnWriteBufferConsumed( 234 size_t frame_payload_size, 235 size_t consume_size, 236 SpdyBuffer::ConsumeSource consume_source) { 237 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 238 if (consume_source == SpdyBuffer::DISCARD) { 239 // If we're discarding a frame or part of it, increase the send 240 // window by the number of discarded bytes. (Although if we're 241 // discarding part of a frame, it's probably because of a write 242 // error and we'll be tearing down the stream soon.) 243 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 244 DCHECK_GT(remaining_payload_bytes, 0u); 245 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 246 } 247 // For consumed bytes, the send window is increased when we receive 248 // a WINDOW_UPDATE frame. 249} 250 251void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { 252 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 253 DCHECK_GE(delta_window_size, 1); 254 255 // Ignore late WINDOW_UPDATEs. 256 if (IsClosed()) 257 return; 258 259 if (send_window_size_ > 0) { 260 // Check for overflow. 261 int32 max_delta_window_size = kint32max - send_window_size_; 262 if (delta_window_size > max_delta_window_size) { 263 std::string desc = base::StringPrintf( 264 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows " 265 "send_window_size_ [current: %d]", delta_window_size, stream_id_, 266 send_window_size_); 267 session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc); 268 return; 269 } 270 } 271 272 send_window_size_ += delta_window_size; 273 274 net_log_.AddEvent( 275 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 276 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 277 stream_id_, delta_window_size, send_window_size_)); 278 279 PossiblyResumeIfSendStalled(); 280} 281 282void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { 283 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 284 285 if (IsClosed()) 286 return; 287 288 // We only call this method when sending a frame. Therefore, 289 // |delta_window_size| should be within the valid frame size range. 290 DCHECK_GE(delta_window_size, 1); 291 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 292 293 // |send_window_size_| should have been at least |delta_window_size| for 294 // this call to happen. 295 DCHECK_GE(send_window_size_, delta_window_size); 296 297 send_window_size_ -= delta_window_size; 298 299 net_log_.AddEvent( 300 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 301 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 302 stream_id_, -delta_window_size, send_window_size_)); 303} 304 305void SpdyStream::OnReadBufferConsumed( 306 size_t consume_size, 307 SpdyBuffer::ConsumeSource consume_source) { 308 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 309 DCHECK_GE(consume_size, 1u); 310 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 311 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 312} 313 314void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { 315 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 316 317 // By the time a read is processed by the delegate, this stream may 318 // already be inactive. 319 if (!session_->IsStreamActive(stream_id_)) 320 return; 321 322 DCHECK_GE(unacked_recv_window_bytes_, 0); 323 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); 324 DCHECK_GE(delta_window_size, 1); 325 // Check for overflow. 326 DCHECK_LE(delta_window_size, kint32max - recv_window_size_); 327 328 recv_window_size_ += delta_window_size; 329 net_log_.AddEvent( 330 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 331 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 332 stream_id_, delta_window_size, recv_window_size_)); 333 334 unacked_recv_window_bytes_ += delta_window_size; 335 if (unacked_recv_window_bytes_ > 336 session_->stream_initial_recv_window_size() / 2) { 337 session_->SendStreamWindowUpdate( 338 stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); 339 unacked_recv_window_bytes_ = 0; 340 } 341} 342 343void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { 344 DCHECK(session_->IsStreamActive(stream_id_)); 345 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 346 DCHECK_GE(delta_window_size, 1); 347 348 // Since we never decrease the initial receive window size, 349 // |delta_window_size| should never cause |recv_window_size_| to go 350 // negative. If we do, the receive window isn't being respected. 351 if (delta_window_size > recv_window_size_) { 352 session_->ResetStream( 353 stream_id_, RST_STREAM_PROTOCOL_ERROR, 354 "delta_window_size is " + base::IntToString(delta_window_size) + 355 " in DecreaseRecvWindowSize, which is larger than the receive " + 356 "window size of " + base::IntToString(recv_window_size_)); 357 return; 358 } 359 360 recv_window_size_ -= delta_window_size; 361 net_log_.AddEvent( 362 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 363 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 364 stream_id_, -delta_window_size, recv_window_size_)); 365} 366 367int SpdyStream::GetPeerAddress(IPEndPoint* address) const { 368 return session_->GetPeerAddress(address); 369} 370 371int SpdyStream::GetLocalAddress(IPEndPoint* address) const { 372 return session_->GetLocalAddress(address); 373} 374 375bool SpdyStream::WasEverUsed() const { 376 return session_->WasEverUsed(); 377} 378 379base::Time SpdyStream::GetRequestTime() const { 380 return request_time_; 381} 382 383void SpdyStream::SetRequestTime(base::Time t) { 384 request_time_ = t; 385} 386 387int SpdyStream::OnInitialResponseHeadersReceived( 388 const SpdyHeaderBlock& initial_response_headers, 389 base::Time response_time, 390 base::TimeTicks recv_first_byte_time) { 391 // SpdySession guarantees that this is called at most once. 392 CHECK(response_headers_.empty()); 393 394 // Check to make sure that we don't receive the response headers 395 // before we're ready for it. 396 switch (type_) { 397 case SPDY_BIDIRECTIONAL_STREAM: 398 // For a bidirectional stream, we're ready for the response 399 // headers once we've finished sending the request headers. 400 if (io_state_ < STATE_IDLE) { 401 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 402 "Response received before request sent"); 403 return ERR_SPDY_PROTOCOL_ERROR; 404 } 405 break; 406 407 case SPDY_REQUEST_RESPONSE_STREAM: 408 // For a request/response stream, we're ready for the response 409 // headers once we've finished sending the request headers and 410 // the request body (if we have one). 411 if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) || 412 pending_send_data_.get()) { 413 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 414 "Response received before request sent"); 415 return ERR_SPDY_PROTOCOL_ERROR; 416 } 417 break; 418 419 case SPDY_PUSH_STREAM: 420 // For a push stream, we're ready immediately. 421 DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND); 422 DCHECK_EQ(io_state_, STATE_IDLE); 423 break; 424 } 425 426 metrics_.StartStream(); 427 428 DCHECK_EQ(io_state_, STATE_IDLE); 429 430 response_time_ = response_time; 431 recv_first_byte_time_ = recv_first_byte_time; 432 return MergeWithResponseHeaders(initial_response_headers); 433} 434 435int SpdyStream::OnAdditionalResponseHeadersReceived( 436 const SpdyHeaderBlock& additional_response_headers) { 437 if (type_ == SPDY_REQUEST_RESPONSE_STREAM) { 438 session_->ResetStream( 439 stream_id_, RST_STREAM_PROTOCOL_ERROR, 440 "Additional headers received for request/response stream"); 441 return ERR_SPDY_PROTOCOL_ERROR; 442 } else if (type_ == SPDY_PUSH_STREAM && 443 response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) { 444 session_->ResetStream( 445 stream_id_, RST_STREAM_PROTOCOL_ERROR, 446 "Additional headers received for push stream"); 447 return ERR_SPDY_PROTOCOL_ERROR; 448 } 449 return MergeWithResponseHeaders(additional_response_headers); 450} 451 452void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { 453 DCHECK(session_->IsStreamActive(stream_id_)); 454 455 // If we're still buffering data for a push stream, we will do the 456 // check for data received with incomplete headers in 457 // PushedStreamReplayData(). 458 if (!delegate_ || continue_buffering_data_) { 459 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 460 // It should be valid for this to happen in the server push case. 461 // We'll return received data when delegate gets attached to the stream. 462 if (buffer) { 463 pending_buffers_.push_back(buffer.release()); 464 } else { 465 pending_buffers_.push_back(NULL); 466 metrics_.StopStream(); 467 // Note: we leave the stream open in the session until the stream 468 // is claimed. 469 } 470 return; 471 } 472 473 // If we have response headers but the delegate has indicated that 474 // it's still incomplete, then that's a protocol error. 475 if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) { 476 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 477 "Data received with incomplete headers."); 478 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 479 return; 480 } 481 482 CHECK(!IsClosed()); 483 484 if (!buffer) { 485 metrics_.StopStream(); 486 // Deletes |this|. 487 session_->CloseActiveStream(stream_id_, OK); 488 return; 489 } 490 491 size_t length = buffer->GetRemainingSize(); 492 DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); 493 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 494 DecreaseRecvWindowSize(static_cast<int32>(length)); 495 buffer->AddConsumeCallback( 496 base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr())); 497 } 498 499 // Track our bandwidth. 500 metrics_.RecordBytes(length); 501 recv_bytes_ += length; 502 recv_last_byte_time_ = base::TimeTicks::Now(); 503 504 // May close |this|. 505 delegate_->OnDataReceived(buffer.Pass()); 506} 507 508void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, 509 size_t frame_size) { 510 if (frame_size < session_->GetFrameMinimumSize() || 511 frame_size > session_->GetFrameMaximumSize()) { 512 NOTREACHED(); 513 return; 514 } 515 if (IsClosed()) 516 return; 517 just_completed_frame_type_ = frame_type; 518 just_completed_frame_size_ = frame_size; 519 DoLoop(OK); 520} 521 522int SpdyStream::GetProtocolVersion() const { 523 return session_->GetProtocolVersion(); 524} 525 526void SpdyStream::LogStreamError(int status, const std::string& description) { 527 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR, 528 base::Bind(&NetLogSpdyStreamErrorCallback, 529 stream_id_, status, &description)); 530} 531 532void SpdyStream::OnClose(int status) { 533 CHECK(!in_do_loop_); 534 io_state_ = STATE_CLOSED; 535 response_status_ = status; 536 Delegate* delegate = delegate_; 537 delegate_ = NULL; 538 if (delegate) 539 delegate->OnClose(status); 540 // Unset |stream_id_| last so that the delegate can look it up. 541 stream_id_ = 0; 542} 543 544void SpdyStream::Cancel() { 545 CHECK(!in_do_loop_); 546 // We may be called again from a delegate's OnClose(). 547 if (io_state_ == STATE_CLOSED) 548 return; 549 550 if (stream_id_ != 0) { 551 session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string()); 552 } else { 553 session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL); 554 } 555 // |this| is invalid at this point. 556} 557 558void SpdyStream::Close() { 559 CHECK(!in_do_loop_); 560 // We may be called again from a delegate's OnClose(). 561 if (io_state_ == STATE_CLOSED) 562 return; 563 564 if (stream_id_ != 0) { 565 session_->CloseActiveStream(stream_id_, OK); 566 } else { 567 session_->CloseCreatedStream(GetWeakPtr(), OK); 568 } 569 // |this| is invalid at this point. 570} 571 572base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { 573 return weak_ptr_factory_.GetWeakPtr(); 574} 575 576int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers, 577 SpdySendStatus send_status) { 578 CHECK_NE(type_, SPDY_PUSH_STREAM); 579 CHECK_EQ(send_status_, MORE_DATA_TO_SEND); 580 CHECK(!request_headers_); 581 CHECK(!pending_send_data_.get()); 582 CHECK_EQ(io_state_, STATE_NONE); 583 request_headers_ = request_headers.Pass(); 584 send_status_ = send_status; 585 io_state_ = STATE_GET_DOMAIN_BOUND_CERT; 586 return DoLoop(OK); 587} 588 589void SpdyStream::SendData(IOBuffer* data, 590 int length, 591 SpdySendStatus send_status) { 592 CHECK_NE(type_, SPDY_PUSH_STREAM); 593 CHECK_EQ(send_status_, MORE_DATA_TO_SEND); 594 CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 595 CHECK(!pending_send_data_.get()); 596 pending_send_data_ = new DrainableIOBuffer(data, length); 597 send_status_ = send_status; 598 QueueNextDataFrame(); 599} 600 601bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 602 bool* was_npn_negotiated, 603 NextProto* protocol_negotiated) { 604 return session_->GetSSLInfo( 605 ssl_info, was_npn_negotiated, protocol_negotiated); 606} 607 608bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 609 return session_->GetSSLCertRequestInfo(cert_request_info); 610} 611 612void SpdyStream::PossiblyResumeIfSendStalled() { 613 DCHECK(!IsClosed()); 614 615 if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && 616 send_window_size_ > 0) { 617 net_log_.AddEvent( 618 NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED, 619 NetLog::IntegerCallback("stream_id", stream_id_)); 620 send_stalled_by_flow_control_ = false; 621 QueueNextDataFrame(); 622 } 623} 624 625bool SpdyStream::IsClosed() const { 626 return io_state_ == STATE_CLOSED; 627} 628 629bool SpdyStream::IsIdle() const { 630 return io_state_ == STATE_IDLE; 631} 632 633bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { 634 if (stream_id_ == 0) 635 return false; 636 637 return session_->GetLoadTimingInfo(stream_id_, load_timing_info); 638} 639 640GURL SpdyStream::GetUrlFromHeaders() const { 641 if (type_ != SPDY_PUSH_STREAM && !request_headers_) 642 return GURL(); 643 644 const SpdyHeaderBlock& headers = 645 (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_; 646 return GetUrlFromHeaderBlock(headers, GetProtocolVersion(), 647 type_ == SPDY_PUSH_STREAM); 648} 649 650bool SpdyStream::HasUrlFromHeaders() const { 651 return !GetUrlFromHeaders().is_empty(); 652} 653 654void SpdyStream::OnGetDomainBoundCertComplete(int result) { 655 DCHECK_EQ(io_state_, STATE_GET_DOMAIN_BOUND_CERT_COMPLETE); 656 DoLoop(result); 657} 658 659int SpdyStream::DoLoop(int result) { 660 CHECK(!in_do_loop_); 661 in_do_loop_ = true; 662 663 do { 664 State state = io_state_; 665 io_state_ = STATE_NONE; 666 switch (state) { 667 case STATE_GET_DOMAIN_BOUND_CERT: 668 CHECK_EQ(result, OK); 669 result = DoGetDomainBoundCert(); 670 break; 671 case STATE_GET_DOMAIN_BOUND_CERT_COMPLETE: 672 result = DoGetDomainBoundCertComplete(result); 673 break; 674 case STATE_SEND_DOMAIN_BOUND_CERT: 675 CHECK_EQ(result, OK); 676 result = DoSendDomainBoundCert(); 677 break; 678 case STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE: 679 result = DoSendDomainBoundCertComplete(result); 680 break; 681 case STATE_SEND_REQUEST_HEADERS: 682 CHECK_EQ(result, OK); 683 result = DoSendRequestHeaders(); 684 break; 685 case STATE_SEND_REQUEST_HEADERS_COMPLETE: 686 CHECK_EQ(result, OK); 687 result = DoSendRequestHeadersComplete(); 688 break; 689 690 // For request/response streams, no data is sent from the client 691 // while in the OPEN state, so OnFrameWriteComplete is never 692 // called here. The HTTP body is handled in the OnDataReceived 693 // callback, which does not call into DoLoop. 694 // 695 // For bidirectional streams, we'll send and receive data once 696 // the connection is established. Received data is handled in 697 // OnDataReceived. Sent data is handled in 698 // OnFrameWriteComplete, which calls DoOpen(). 699 case STATE_IDLE: 700 CHECK_EQ(result, OK); 701 result = DoOpen(); 702 break; 703 704 case STATE_CLOSED: 705 DCHECK_NE(result, ERR_IO_PENDING); 706 break; 707 default: 708 NOTREACHED() << io_state_; 709 break; 710 } 711 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE && 712 io_state_ != STATE_IDLE); 713 714 CHECK(in_do_loop_); 715 in_do_loop_ = false; 716 717 return result; 718} 719 720int SpdyStream::DoGetDomainBoundCert() { 721 CHECK(request_headers_); 722 DCHECK_NE(type_, SPDY_PUSH_STREAM); 723 GURL url = GetUrlFromHeaders(); 724 if (!session_->NeedsCredentials() || !url.SchemeIs("https")) { 725 // Proceed directly to sending the request headers 726 io_state_ = STATE_SEND_REQUEST_HEADERS; 727 return OK; 728 } 729 730 slot_ = session_->credential_state()->FindCredentialSlot(GetUrlFromHeaders()); 731 if (slot_ != SpdyCredentialState::kNoEntry) { 732 // Proceed directly to sending the request headers 733 io_state_ = STATE_SEND_REQUEST_HEADERS; 734 return OK; 735 } 736 737 io_state_ = STATE_GET_DOMAIN_BOUND_CERT_COMPLETE; 738 ServerBoundCertService* sbc_service = session_->GetServerBoundCertService(); 739 DCHECK(sbc_service != NULL); 740 std::vector<uint8> requested_cert_types; 741 requested_cert_types.push_back(CLIENT_CERT_ECDSA_SIGN); 742 int rv = sbc_service->GetDomainBoundCert( 743 url.GetOrigin().host(), requested_cert_types, 744 &domain_bound_cert_type_, &domain_bound_private_key_, &domain_bound_cert_, 745 base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()), 746 &domain_bound_cert_request_handle_); 747 return rv; 748} 749 750int SpdyStream::DoGetDomainBoundCertComplete(int result) { 751 DCHECK_NE(type_, SPDY_PUSH_STREAM); 752 if (result != OK) 753 return result; 754 755 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT; 756 slot_ = session_->credential_state()->SetHasCredential(GetUrlFromHeaders()); 757 return OK; 758} 759 760int SpdyStream::DoSendDomainBoundCert() { 761 CHECK(request_headers_); 762 DCHECK_NE(type_, SPDY_PUSH_STREAM); 763 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; 764 765 std::string origin = GetUrlFromHeaders().GetOrigin().spec(); 766 DCHECK(origin[origin.length() - 1] == '/'); 767 origin.erase(origin.length() - 1); // Trim trailing slash. 768 scoped_ptr<SpdyFrame> frame; 769 int rv = session_->CreateCredentialFrame( 770 origin, domain_bound_cert_type_, domain_bound_private_key_, 771 domain_bound_cert_, priority_, &frame); 772 if (rv != OK) { 773 DCHECK_NE(rv, ERR_IO_PENDING); 774 return rv; 775 } 776 777 DCHECK(frame); 778 // TODO(akalin): Fix the following race condition: 779 // 780 // Since this is decoupled from sending the SYN_STREAM frame, it is 781 // possible that other domain-bound cert frames will clobber ours 782 // before our SYN_STREAM frame gets sent. This can be solved by 783 // immediately enqueueing the SYN_STREAM frame here and adjusting 784 // the state machine appropriately. 785 session_->EnqueueStreamWrite( 786 GetWeakPtr(), CREDENTIAL, 787 scoped_ptr<SpdyBufferProducer>( 788 new SimpleBufferProducer( 789 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()))))); 790 return ERR_IO_PENDING; 791} 792 793int SpdyStream::DoSendDomainBoundCertComplete(int result) { 794 DCHECK_NE(type_, SPDY_PUSH_STREAM); 795 if (result != OK) 796 return result; 797 798 DCHECK_EQ(just_completed_frame_type_, CREDENTIAL); 799 io_state_ = STATE_SEND_REQUEST_HEADERS; 800 return OK; 801} 802 803int SpdyStream::DoSendRequestHeaders() { 804 DCHECK_NE(type_, SPDY_PUSH_STREAM); 805 io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE; 806 807 session_->EnqueueStreamWrite( 808 GetWeakPtr(), SYN_STREAM, 809 scoped_ptr<SpdyBufferProducer>( 810 new SynStreamBufferProducer(GetWeakPtr()))); 811 return ERR_IO_PENDING; 812} 813 814namespace { 815 816// Assuming we're in STATE_IDLE, maps the given type (which must not 817// be SPDY_PUSH_STREAM) and send status to a result to return from 818// DoSendRequestHeadersComplete() or DoOpen(). 819int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) { 820 switch (type) { 821 case SPDY_BIDIRECTIONAL_STREAM: 822 // For bidirectional streams, there's nothing else to do. 823 DCHECK_EQ(send_status, MORE_DATA_TO_SEND); 824 return OK; 825 826 case SPDY_REQUEST_RESPONSE_STREAM: 827 // For request/response streams, wait for the delegate to send 828 // data if there's request data to send; we'll get called back 829 // when the send finishes. 830 if (send_status == MORE_DATA_TO_SEND) 831 return ERR_IO_PENDING; 832 833 return OK; 834 835 case SPDY_PUSH_STREAM: 836 // This should never be called for push streams. 837 break; 838 } 839 840 CHECK(false); 841 return ERR_UNEXPECTED; 842} 843 844} // namespace 845 846int SpdyStream::DoSendRequestHeadersComplete() { 847 DCHECK_NE(type_, SPDY_PUSH_STREAM); 848 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM); 849 DCHECK_NE(stream_id_, 0u); 850 851 io_state_ = STATE_IDLE; 852 853 CHECK(delegate_); 854 // Must not close |this|; if it does, it will trigger the |in_do_loop_| 855 // check in the destructor. 856 delegate_->OnRequestHeadersSent(); 857 858 return GetOpenStateResult(type_, send_status_); 859} 860 861int SpdyStream::DoOpen() { 862 DCHECK_NE(type_, SPDY_PUSH_STREAM); 863 864 if (just_completed_frame_type_ != DATA) { 865 NOTREACHED(); 866 return ERR_UNEXPECTED; 867 } 868 869 if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) { 870 NOTREACHED(); 871 return ERR_UNEXPECTED; 872 } 873 874 size_t frame_payload_size = 875 just_completed_frame_size_ - session_->GetDataFrameMinimumSize(); 876 if (frame_payload_size > session_->GetDataFrameMaximumPayload()) { 877 NOTREACHED(); 878 return ERR_UNEXPECTED; 879 } 880 881 // Set |io_state_| first as |delegate_| may check it. 882 io_state_ = STATE_IDLE; 883 884 send_bytes_ += frame_payload_size; 885 886 pending_send_data_->DidConsume(frame_payload_size); 887 if (pending_send_data_->BytesRemaining() > 0) { 888 QueueNextDataFrame(); 889 return ERR_IO_PENDING; 890 } 891 892 pending_send_data_ = NULL; 893 894 CHECK(delegate_); 895 // Must not close |this|; if it does, it will trigger the 896 // |in_do_loop_| check in the destructor. 897 delegate_->OnDataSent(); 898 899 return GetOpenStateResult(type_, send_status_); 900} 901 902void SpdyStream::UpdateHistograms() { 903 // We need at least the receive timers to be filled in, as otherwise 904 // metrics can be bogus. 905 if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null()) 906 return; 907 908 base::TimeTicks effective_send_time; 909 if (type_ == SPDY_PUSH_STREAM) { 910 // Push streams shouldn't have |send_time_| filled in. 911 DCHECK(send_time_.is_null()); 912 effective_send_time = recv_first_byte_time_; 913 } else { 914 // For non-push streams, we also need |send_time_| to be filled 915 // in. 916 if (send_time_.is_null()) 917 return; 918 effective_send_time = send_time_; 919 } 920 921 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", 922 recv_first_byte_time_ - effective_send_time); 923 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 924 recv_last_byte_time_ - recv_first_byte_time_); 925 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 926 recv_last_byte_time_ - effective_send_time); 927 928 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 929 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 930} 931 932void SpdyStream::QueueNextDataFrame() { 933 // Until the request has been completely sent, we cannot be sure 934 // that our stream_id is correct. 935 DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 936 CHECK_GT(stream_id_, 0u); 937 CHECK(pending_send_data_.get()); 938 CHECK_GT(pending_send_data_->BytesRemaining(), 0); 939 940 SpdyDataFlags flags = 941 (send_status_ == NO_MORE_DATA_TO_SEND) ? 942 DATA_FLAG_FIN : DATA_FLAG_NONE; 943 scoped_ptr<SpdyBuffer> data_buffer( 944 session_->CreateDataBuffer(stream_id_, 945 pending_send_data_.get(), 946 pending_send_data_->BytesRemaining(), 947 flags)); 948 // We'll get called again by PossiblyResumeIfSendStalled(). 949 if (!data_buffer) 950 return; 951 952 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 953 DCHECK_GE(data_buffer->GetRemainingSize(), 954 session_->GetDataFrameMinimumSize()); 955 size_t payload_size = 956 data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); 957 DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); 958 DecreaseSendWindowSize(static_cast<int32>(payload_size)); 959 // This currently isn't strictly needed, since write frames are 960 // discarded only if the stream is about to be closed. But have it 961 // here anyway just in case this changes. 962 data_buffer->AddConsumeCallback( 963 base::Bind(&SpdyStream::OnWriteBufferConsumed, 964 GetWeakPtr(), payload_size)); 965 } 966 967 session_->EnqueueStreamWrite( 968 GetWeakPtr(), DATA, 969 scoped_ptr<SpdyBufferProducer>( 970 new SimpleBufferProducer(data_buffer.Pass()))); 971} 972 973int SpdyStream::MergeWithResponseHeaders( 974 const SpdyHeaderBlock& new_response_headers) { 975 if (new_response_headers.find("transfer-encoding") != 976 new_response_headers.end()) { 977 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 978 "Received transfer-encoding header"); 979 return ERR_SPDY_PROTOCOL_ERROR; 980 } 981 982 for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin(); 983 it != new_response_headers.end(); ++it) { 984 // Disallow uppercase headers. 985 if (ContainsUppercaseAscii(it->first)) { 986 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 987 "Upper case characters in header: " + it->first); 988 return ERR_SPDY_PROTOCOL_ERROR; 989 } 990 991 SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first); 992 // Disallow duplicate headers. This is just to be conservative. 993 if (it2 != response_headers_.end() && it2->first == it->first) { 994 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 995 "Duplicate header: " + it->first); 996 return ERR_SPDY_PROTOCOL_ERROR; 997 } 998 999 response_headers_.insert(it2, *it); 1000 } 1001 1002 // If delegate_ is not yet attached, we'll call 1003 // OnResponseHeadersUpdated() after the delegate gets attached to 1004 // the stream. 1005 if (delegate_) { 1006 // The call to OnResponseHeadersUpdated() below may delete |this|, 1007 // so use |weak_this| to detect that. 1008 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 1009 1010 SpdyResponseHeadersStatus status = 1011 delegate_->OnResponseHeadersUpdated(response_headers_); 1012 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 1013 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 1014 // have been closed. 1015 CHECK(weak_this); 1016 // Incomplete headers are OK only for push streams. 1017 if (type_ != SPDY_PUSH_STREAM) { 1018 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 1019 "Incomplete headers"); 1020 return ERR_INCOMPLETE_SPDY_HEADERS; 1021 } 1022 } else if (weak_this) { 1023 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 1024 } 1025 } 1026 1027 return OK; 1028} 1029 1030} // namespace net 1031