spdy_stream.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/spdy/spdy_stream.h" 6 7#include "base/bind.h" 8#include "base/compiler_specific.h" 9#include "base/logging.h" 10#include "base/message_loop/message_loop.h" 11#include "base/strings/string_number_conversions.h" 12#include "base/strings/stringprintf.h" 13#include "base/values.h" 14#include "net/spdy/spdy_buffer_producer.h" 15#include "net/spdy/spdy_http_utils.h" 16#include "net/spdy/spdy_session.h" 17 18namespace net { 19 20namespace { 21 22base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 23 int status, 24 const std::string* description, 25 NetLog::LogLevel /* log_level */) { 26 base::DictionaryValue* dict = new base::DictionaryValue(); 27 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 28 dict->SetInteger("status", status); 29 dict->SetString("description", *description); 30 return dict; 31} 32 33base::Value* NetLogSpdyStreamWindowUpdateCallback( 34 SpdyStreamId stream_id, 35 int32 delta, 36 int32 window_size, 37 NetLog::LogLevel /* log_level */) { 38 base::DictionaryValue* dict = new base::DictionaryValue(); 39 dict->SetInteger("stream_id", stream_id); 40 dict->SetInteger("delta", delta); 41 dict->SetInteger("window_size", window_size); 42 return dict; 43} 44 45bool ContainsUppercaseAscii(const std::string& str) { 46 for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) { 47 if (*i >= 'A' && *i <= 'Z') { 48 return true; 49 } 50 } 51 return false; 52} 53 54} // namespace 55 56// A wrapper around a stream that calls into ProduceSynStreamFrame(). 57class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { 58 public: 59 SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) 60 : stream_(stream) { 61 DCHECK(stream_.get()); 62 } 63 64 virtual ~SynStreamBufferProducer() {} 65 66 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 67 if (!stream_.get()) { 68 NOTREACHED(); 69 return scoped_ptr<SpdyBuffer>(); 70 } 71 DCHECK_GT(stream_->stream_id(), 0u); 72 return scoped_ptr<SpdyBuffer>( 73 new SpdyBuffer(stream_->ProduceSynStreamFrame())); 74 } 75 76 private: 77 const base::WeakPtr<SpdyStream> stream_; 78}; 79 80SpdyStream::SpdyStream(SpdyStreamType type, 81 const base::WeakPtr<SpdySession>& session, 82 const GURL& url, 83 RequestPriority priority, 84 int32 initial_send_window_size, 85 int32 initial_recv_window_size, 86 const BoundNetLog& net_log) 87 : type_(type), 88 weak_ptr_factory_(this), 89 stream_id_(0), 90 url_(url), 91 priority_(priority), 92 send_stalled_by_flow_control_(false), 93 send_window_size_(initial_send_window_size), 94 recv_window_size_(initial_recv_window_size), 95 unacked_recv_window_bytes_(0), 96 session_(session), 97 delegate_(NULL), 98 pending_send_status_(MORE_DATA_TO_SEND), 99 request_time_(base::Time::Now()), 100 response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE), 101 io_state_(STATE_IDLE), 102 response_status_(OK), 103 net_log_(net_log), 104 raw_received_bytes_(0), 105 send_bytes_(0), 106 recv_bytes_(0) { 107 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM || 108 type_ == SPDY_REQUEST_RESPONSE_STREAM || 109 type_ == SPDY_PUSH_STREAM); 110 CHECK_GE(priority_, MINIMUM_PRIORITY); 111 CHECK_LE(priority_, MAXIMUM_PRIORITY); 112} 113 114SpdyStream::~SpdyStream() { 115 UpdateHistograms(); 116} 117 118void SpdyStream::SetDelegate(Delegate* delegate) { 119 CHECK(!delegate_); 120 CHECK(delegate); 121 delegate_ = delegate; 122 123 CHECK(io_state_ == STATE_IDLE || 124 io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED); 125 126 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { 127 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 128 base::MessageLoop::current()->PostTask( 129 FROM_HERE, 130 base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr())); 131 } 132} 133 134void SpdyStream::PushedStreamReplay() { 135 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 136 DCHECK_NE(stream_id_, 0u); 137 CHECK_EQ(stream_id_ % 2, 0u); 138 139 CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED); 140 io_state_ = STATE_HALF_CLOSED_LOCAL; 141 142 // The delegate methods called below may delete |this|, so use 143 // |weak_this| to detect that. 144 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 145 146 CHECK(delegate_); 147 SpdyResponseHeadersStatus status = 148 delegate_->OnResponseHeadersUpdated(response_headers_); 149 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 150 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 151 // have been closed. Since we don't have complete headers, assume 152 // we're waiting for another HEADERS frame, and we had better not 153 // have any pending data frames. 154 CHECK(weak_this); 155 if (!pending_recv_data_.empty()) { 156 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 157 "Data received with incomplete headers."); 158 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 159 } 160 return; 161 } 162 163 // OnResponseHeadersUpdated() may have closed |this|. 164 if (!weak_this) 165 return; 166 167 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 168 169 while (!pending_recv_data_.empty()) { 170 // Take ownership of the first element of |pending_recv_data_|. 171 scoped_ptr<SpdyBuffer> buffer(pending_recv_data_.front()); 172 pending_recv_data_.weak_erase(pending_recv_data_.begin()); 173 174 bool eof = (buffer == NULL); 175 176 CHECK(delegate_); 177 delegate_->OnDataReceived(buffer.Pass()); 178 179 // OnDataReceived() may have closed |this|. 180 if (!weak_this) 181 return; 182 183 if (eof) { 184 DCHECK(pending_recv_data_.empty()); 185 session_->CloseActiveStream(stream_id_, OK); 186 DCHECK(!weak_this); 187 // |pending_recv_data_| is invalid at this point. 188 break; 189 } 190 } 191} 192 193scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { 194 CHECK_EQ(io_state_, STATE_IDLE); 195 CHECK(request_headers_); 196 CHECK_GT(stream_id_, 0u); 197 198 SpdyControlFlags flags = 199 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? 200 CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; 201 scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( 202 stream_id_, priority_, flags, *request_headers_)); 203 send_time_ = base::TimeTicks::Now(); 204 return frame.Pass(); 205} 206 207void SpdyStream::DetachDelegate() { 208 DCHECK(!IsClosed()); 209 delegate_ = NULL; 210 Cancel(); 211} 212 213void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { 214 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 215 216 if (IsClosed()) 217 return; 218 219 // Check for wraparound. 220 if (send_window_size_ > 0) { 221 DCHECK_LE(delta_window_size, kint32max - send_window_size_); 222 } 223 if (send_window_size_ < 0) { 224 DCHECK_GE(delta_window_size, kint32min - send_window_size_); 225 } 226 send_window_size_ += delta_window_size; 227 PossiblyResumeIfSendStalled(); 228} 229 230void SpdyStream::OnWriteBufferConsumed( 231 size_t frame_payload_size, 232 size_t consume_size, 233 SpdyBuffer::ConsumeSource consume_source) { 234 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 235 if (consume_source == SpdyBuffer::DISCARD) { 236 // If we're discarding a frame or part of it, increase the send 237 // window by the number of discarded bytes. (Although if we're 238 // discarding part of a frame, it's probably because of a write 239 // error and we'll be tearing down the stream soon.) 240 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 241 DCHECK_GT(remaining_payload_bytes, 0u); 242 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 243 } 244 // For consumed bytes, the send window is increased when we receive 245 // a WINDOW_UPDATE frame. 246} 247 248void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { 249 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 250 DCHECK_GE(delta_window_size, 1); 251 252 // Ignore late WINDOW_UPDATEs. 253 if (IsClosed()) 254 return; 255 256 if (send_window_size_ > 0) { 257 // Check for overflow. 258 int32 max_delta_window_size = kint32max - send_window_size_; 259 if (delta_window_size > max_delta_window_size) { 260 std::string desc = base::StringPrintf( 261 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows " 262 "send_window_size_ [current: %d]", delta_window_size, stream_id_, 263 send_window_size_); 264 session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc); 265 return; 266 } 267 } 268 269 send_window_size_ += delta_window_size; 270 271 net_log_.AddEvent( 272 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 273 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 274 stream_id_, delta_window_size, send_window_size_)); 275 276 PossiblyResumeIfSendStalled(); 277} 278 279void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { 280 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 281 282 if (IsClosed()) 283 return; 284 285 // We only call this method when sending a frame. Therefore, 286 // |delta_window_size| should be within the valid frame size range. 287 DCHECK_GE(delta_window_size, 1); 288 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 289 290 // |send_window_size_| should have been at least |delta_window_size| for 291 // this call to happen. 292 DCHECK_GE(send_window_size_, delta_window_size); 293 294 send_window_size_ -= delta_window_size; 295 296 net_log_.AddEvent( 297 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 298 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 299 stream_id_, -delta_window_size, send_window_size_)); 300} 301 302void SpdyStream::OnReadBufferConsumed( 303 size_t consume_size, 304 SpdyBuffer::ConsumeSource consume_source) { 305 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 306 DCHECK_GE(consume_size, 1u); 307 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 308 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 309} 310 311void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { 312 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 313 314 // By the time a read is processed by the delegate, this stream may 315 // already be inactive. 316 if (!session_->IsStreamActive(stream_id_)) 317 return; 318 319 DCHECK_GE(unacked_recv_window_bytes_, 0); 320 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); 321 DCHECK_GE(delta_window_size, 1); 322 // Check for overflow. 323 DCHECK_LE(delta_window_size, kint32max - recv_window_size_); 324 325 recv_window_size_ += delta_window_size; 326 net_log_.AddEvent( 327 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 328 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 329 stream_id_, delta_window_size, recv_window_size_)); 330 331 unacked_recv_window_bytes_ += delta_window_size; 332 if (unacked_recv_window_bytes_ > 333 session_->stream_initial_recv_window_size() / 2) { 334 session_->SendStreamWindowUpdate( 335 stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); 336 unacked_recv_window_bytes_ = 0; 337 } 338} 339 340void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { 341 DCHECK(session_->IsStreamActive(stream_id_)); 342 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 343 DCHECK_GE(delta_window_size, 1); 344 345 // Since we never decrease the initial receive window size, 346 // |delta_window_size| should never cause |recv_window_size_| to go 347 // negative. If we do, the receive window isn't being respected. 348 if (delta_window_size > recv_window_size_) { 349 session_->ResetStream( 350 stream_id_, RST_STREAM_PROTOCOL_ERROR, 351 "delta_window_size is " + base::IntToString(delta_window_size) + 352 " in DecreaseRecvWindowSize, which is larger than the receive " + 353 "window size of " + base::IntToString(recv_window_size_)); 354 return; 355 } 356 357 recv_window_size_ -= delta_window_size; 358 net_log_.AddEvent( 359 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 360 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 361 stream_id_, -delta_window_size, recv_window_size_)); 362} 363 364int SpdyStream::GetPeerAddress(IPEndPoint* address) const { 365 return session_->GetPeerAddress(address); 366} 367 368int SpdyStream::GetLocalAddress(IPEndPoint* address) const { 369 return session_->GetLocalAddress(address); 370} 371 372bool SpdyStream::WasEverUsed() const { 373 return session_->WasEverUsed(); 374} 375 376base::Time SpdyStream::GetRequestTime() const { 377 return request_time_; 378} 379 380void SpdyStream::SetRequestTime(base::Time t) { 381 request_time_ = t; 382} 383 384int SpdyStream::OnInitialResponseHeadersReceived( 385 const SpdyHeaderBlock& initial_response_headers, 386 base::Time response_time, 387 base::TimeTicks recv_first_byte_time) { 388 // SpdySession guarantees that this is called at most once. 389 CHECK(response_headers_.empty()); 390 391 // Check to make sure that we don't receive the response headers 392 // before we're ready for it. 393 switch (type_) { 394 case SPDY_BIDIRECTIONAL_STREAM: 395 // For a bidirectional stream, we're ready for the response 396 // headers once we've finished sending the request headers. 397 if (io_state_ == STATE_IDLE) { 398 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 399 "Response received before request sent"); 400 return ERR_SPDY_PROTOCOL_ERROR; 401 } 402 break; 403 404 case SPDY_REQUEST_RESPONSE_STREAM: 405 // For a request/response stream, we're ready for the response 406 // headers once we've finished sending the request headers. 407 if (io_state_ == STATE_IDLE) { 408 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 409 "Response received before request sent"); 410 return ERR_SPDY_PROTOCOL_ERROR; 411 } 412 break; 413 414 case SPDY_PUSH_STREAM: 415 // Push streams transition to a locally half-closed state upon headers. 416 // We must continue to buffer data while waiting for a call to 417 // SetDelegate() (which may not ever happen). 418 // TODO(jgraettinger): When PUSH_PROMISE is added, Handle RESERVED_REMOTE 419 // cases here depending on whether the delegate is already set. 420 CHECK_EQ(io_state_, STATE_IDLE); 421 DCHECK(!delegate_); 422 io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED; 423 break; 424 } 425 426 metrics_.StartStream(); 427 428 DCHECK_NE(io_state_, STATE_IDLE); 429 430 response_time_ = response_time; 431 recv_first_byte_time_ = recv_first_byte_time; 432 return MergeWithResponseHeaders(initial_response_headers); 433} 434 435int SpdyStream::OnAdditionalResponseHeadersReceived( 436 const SpdyHeaderBlock& additional_response_headers) { 437 if (type_ == SPDY_REQUEST_RESPONSE_STREAM) { 438 session_->ResetStream( 439 stream_id_, RST_STREAM_PROTOCOL_ERROR, 440 "Additional headers received for request/response stream"); 441 return ERR_SPDY_PROTOCOL_ERROR; 442 } else if (type_ == SPDY_PUSH_STREAM && 443 response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) { 444 session_->ResetStream( 445 stream_id_, RST_STREAM_PROTOCOL_ERROR, 446 "Additional headers received for push stream"); 447 return ERR_SPDY_PROTOCOL_ERROR; 448 } 449 return MergeWithResponseHeaders(additional_response_headers); 450} 451 452void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { 453 DCHECK(session_->IsStreamActive(stream_id_)); 454 455 // If we're still buffering data for a push stream, we will do the 456 // check for data received with incomplete headers in 457 // PushedStreamReplayData(). 458 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { 459 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 460 CHECK(!delegate_); 461 // It should be valid for this to happen in the server push case. 462 // We'll return received data when delegate gets attached to the stream. 463 if (buffer) { 464 pending_recv_data_.push_back(buffer.release()); 465 } else { 466 pending_recv_data_.push_back(NULL); 467 metrics_.StopStream(); 468 // Note: we leave the stream open in the session until the stream 469 // is claimed. 470 } 471 return; 472 } 473 474 // If we have response headers but the delegate has indicated that 475 // it's still incomplete, then that's a protocol error. 476 if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) { 477 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 478 "Data received with incomplete headers."); 479 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 480 return; 481 } 482 483 CHECK(!IsClosed()); 484 485 if (!buffer) { 486 metrics_.StopStream(); 487 if (io_state_ == STATE_OPEN) { 488 io_state_ = STATE_HALF_CLOSED_REMOTE; 489 } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) { 490 io_state_ = STATE_CLOSED; 491 // Deletes |this|. 492 session_->CloseActiveStream(stream_id_, OK); 493 } else { 494 NOTREACHED() << io_state_; 495 } 496 return; 497 } 498 499 size_t length = buffer->GetRemainingSize(); 500 DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); 501 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 502 DecreaseRecvWindowSize(static_cast<int32>(length)); 503 buffer->AddConsumeCallback( 504 base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr())); 505 } 506 507 // Track our bandwidth. 508 metrics_.RecordBytes(length); 509 recv_bytes_ += length; 510 recv_last_byte_time_ = base::TimeTicks::Now(); 511 512 // May close |this|. 513 delegate_->OnDataReceived(buffer.Pass()); 514} 515 516void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, 517 size_t frame_size) { 518 DCHECK_NE(type_, SPDY_PUSH_STREAM); 519 520 if (frame_size < session_->GetFrameMinimumSize() || 521 frame_size > session_->GetFrameMaximumSize()) { 522 NOTREACHED(); 523 return; 524 } 525 CHECK(frame_type == SYN_STREAM || 526 frame_type == DATA) << frame_type; 527 528 int result = (frame_type == SYN_STREAM) ? 529 OnRequestHeadersSent() : OnDataSent(frame_size); 530 if (result == ERR_IO_PENDING) { 531 // The write operation hasn't completed yet. 532 return; 533 } 534 535 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) { 536 if(io_state_ == STATE_OPEN) { 537 io_state_ = STATE_HALF_CLOSED_LOCAL; 538 } else if(io_state_ == STATE_HALF_CLOSED_REMOTE) { 539 io_state_ = STATE_CLOSED; 540 } else { 541 NOTREACHED() << io_state_; 542 } 543 } 544 // Notify delegate of write completion. Must not destroy |this|. 545 CHECK(delegate_); 546 { 547 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 548 if (frame_type == SYN_STREAM) { 549 delegate_->OnRequestHeadersSent(); 550 } else { 551 delegate_->OnDataSent(); 552 } 553 CHECK(weak_this); 554 } 555 556 if (io_state_ == STATE_CLOSED) { 557 // Deletes |this|. 558 session_->CloseActiveStream(stream_id_, OK); 559 } 560} 561 562int SpdyStream::OnRequestHeadersSent() { 563 CHECK_EQ(io_state_, STATE_IDLE); 564 CHECK_NE(stream_id_, 0u); 565 566 io_state_ = STATE_OPEN; 567 return OK; 568} 569 570int SpdyStream::OnDataSent(size_t frame_size) { 571 CHECK(io_state_ == STATE_OPEN || 572 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; 573 574 size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize(); 575 576 CHECK_GE(frame_size, session_->GetDataFrameMinimumSize()); 577 CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload()); 578 579 send_bytes_ += frame_payload_size; 580 581 // If more data is available to send, dispatch it and 582 // return that the write operation is still ongoing. 583 pending_send_data_->DidConsume(frame_payload_size); 584 if (pending_send_data_->BytesRemaining() > 0) { 585 QueueNextDataFrame(); 586 return ERR_IO_PENDING; 587 } else { 588 pending_send_data_ = NULL; 589 return OK; 590 } 591} 592 593SpdyMajorVersion SpdyStream::GetProtocolVersion() const { 594 return session_->GetProtocolVersion(); 595} 596 597void SpdyStream::LogStreamError(int status, const std::string& description) { 598 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR, 599 base::Bind(&NetLogSpdyStreamErrorCallback, 600 stream_id_, status, &description)); 601} 602 603void SpdyStream::OnClose(int status) { 604 // In most cases, the stream should already be CLOSED. The exception is when a 605 // SpdySession is shutting down while the stream is in an intermediate state. 606 io_state_ = STATE_CLOSED; 607 response_status_ = status; 608 Delegate* delegate = delegate_; 609 delegate_ = NULL; 610 if (delegate) 611 delegate->OnClose(status); 612 // Unset |stream_id_| last so that the delegate can look it up. 613 stream_id_ = 0; 614} 615 616void SpdyStream::Cancel() { 617 // We may be called again from a delegate's OnClose(). 618 if (io_state_ == STATE_CLOSED) 619 return; 620 621 if (stream_id_ != 0) { 622 session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string()); 623 } else { 624 session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL); 625 } 626 // |this| is invalid at this point. 627} 628 629void SpdyStream::Close() { 630 // We may be called again from a delegate's OnClose(). 631 if (io_state_ == STATE_CLOSED) 632 return; 633 634 if (stream_id_ != 0) { 635 session_->CloseActiveStream(stream_id_, OK); 636 } else { 637 session_->CloseCreatedStream(GetWeakPtr(), OK); 638 } 639 // |this| is invalid at this point. 640} 641 642base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { 643 return weak_ptr_factory_.GetWeakPtr(); 644} 645 646int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers, 647 SpdySendStatus send_status) { 648 CHECK_NE(type_, SPDY_PUSH_STREAM); 649 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); 650 CHECK(!request_headers_); 651 CHECK(!pending_send_data_.get()); 652 CHECK_EQ(io_state_, STATE_IDLE); 653 request_headers_ = request_headers.Pass(); 654 pending_send_status_ = send_status; 655 session_->EnqueueStreamWrite( 656 GetWeakPtr(), SYN_STREAM, 657 scoped_ptr<SpdyBufferProducer>( 658 new SynStreamBufferProducer(GetWeakPtr()))); 659 return ERR_IO_PENDING; 660} 661 662void SpdyStream::SendData(IOBuffer* data, 663 int length, 664 SpdySendStatus send_status) { 665 CHECK_NE(type_, SPDY_PUSH_STREAM); 666 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); 667 CHECK(io_state_ == STATE_OPEN || 668 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; 669 CHECK(!pending_send_data_.get()); 670 pending_send_data_ = new DrainableIOBuffer(data, length); 671 pending_send_status_ = send_status; 672 QueueNextDataFrame(); 673} 674 675bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 676 bool* was_npn_negotiated, 677 NextProto* protocol_negotiated) { 678 return session_->GetSSLInfo( 679 ssl_info, was_npn_negotiated, protocol_negotiated); 680} 681 682bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 683 return session_->GetSSLCertRequestInfo(cert_request_info); 684} 685 686void SpdyStream::PossiblyResumeIfSendStalled() { 687 if (IsLocallyClosed()) { 688 return; 689 } 690 if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && 691 send_window_size_ > 0) { 692 net_log_.AddEvent( 693 NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED, 694 NetLog::IntegerCallback("stream_id", stream_id_)); 695 send_stalled_by_flow_control_ = false; 696 QueueNextDataFrame(); 697 } 698} 699 700bool SpdyStream::IsClosed() const { 701 return io_state_ == STATE_CLOSED; 702} 703 704bool SpdyStream::IsLocallyClosed() const { 705 return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED || 706 io_state_ == STATE_HALF_CLOSED_LOCAL || 707 io_state_ == STATE_CLOSED; 708} 709 710bool SpdyStream::IsIdle() const { 711 return io_state_ == STATE_IDLE; 712} 713 714bool SpdyStream::IsOpen() const { 715 return io_state_ == STATE_OPEN; 716} 717 718NextProto SpdyStream::GetProtocol() const { 719 return session_->protocol(); 720} 721 722bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { 723 if (stream_id_ == 0) 724 return false; 725 726 return session_->GetLoadTimingInfo(stream_id_, load_timing_info); 727} 728 729GURL SpdyStream::GetUrlFromHeaders() const { 730 if (type_ != SPDY_PUSH_STREAM && !request_headers_) 731 return GURL(); 732 733 const SpdyHeaderBlock& headers = 734 (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_; 735 return GetUrlFromHeaderBlock(headers, GetProtocolVersion(), 736 type_ == SPDY_PUSH_STREAM); 737} 738 739bool SpdyStream::HasUrlFromHeaders() const { 740 return !GetUrlFromHeaders().is_empty(); 741} 742 743void SpdyStream::UpdateHistograms() { 744 // We need at least the receive timers to be filled in, as otherwise 745 // metrics can be bogus. 746 if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null()) 747 return; 748 749 base::TimeTicks effective_send_time; 750 if (type_ == SPDY_PUSH_STREAM) { 751 // Push streams shouldn't have |send_time_| filled in. 752 DCHECK(send_time_.is_null()); 753 effective_send_time = recv_first_byte_time_; 754 } else { 755 // For non-push streams, we also need |send_time_| to be filled 756 // in. 757 if (send_time_.is_null()) 758 return; 759 effective_send_time = send_time_; 760 } 761 762 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", 763 recv_first_byte_time_ - effective_send_time); 764 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 765 recv_last_byte_time_ - recv_first_byte_time_); 766 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 767 recv_last_byte_time_ - effective_send_time); 768 769 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 770 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 771} 772 773void SpdyStream::QueueNextDataFrame() { 774 // Until the request has been completely sent, we cannot be sure 775 // that our stream_id is correct. 776 CHECK(io_state_ == STATE_OPEN || 777 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; 778 CHECK_GT(stream_id_, 0u); 779 CHECK(pending_send_data_.get()); 780 CHECK_GT(pending_send_data_->BytesRemaining(), 0); 781 782 SpdyDataFlags flags = 783 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? 784 DATA_FLAG_FIN : DATA_FLAG_NONE; 785 scoped_ptr<SpdyBuffer> data_buffer( 786 session_->CreateDataBuffer(stream_id_, 787 pending_send_data_.get(), 788 pending_send_data_->BytesRemaining(), 789 flags)); 790 // We'll get called again by PossiblyResumeIfSendStalled(). 791 if (!data_buffer) 792 return; 793 794 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 795 DCHECK_GE(data_buffer->GetRemainingSize(), 796 session_->GetDataFrameMinimumSize()); 797 size_t payload_size = 798 data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); 799 DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); 800 DecreaseSendWindowSize(static_cast<int32>(payload_size)); 801 // This currently isn't strictly needed, since write frames are 802 // discarded only if the stream is about to be closed. But have it 803 // here anyway just in case this changes. 804 data_buffer->AddConsumeCallback( 805 base::Bind(&SpdyStream::OnWriteBufferConsumed, 806 GetWeakPtr(), payload_size)); 807 } 808 809 session_->EnqueueStreamWrite( 810 GetWeakPtr(), DATA, 811 scoped_ptr<SpdyBufferProducer>( 812 new SimpleBufferProducer(data_buffer.Pass()))); 813} 814 815int SpdyStream::MergeWithResponseHeaders( 816 const SpdyHeaderBlock& new_response_headers) { 817 if (new_response_headers.find("transfer-encoding") != 818 new_response_headers.end()) { 819 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 820 "Received transfer-encoding header"); 821 return ERR_SPDY_PROTOCOL_ERROR; 822 } 823 824 for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin(); 825 it != new_response_headers.end(); ++it) { 826 // Disallow uppercase headers. 827 if (ContainsUppercaseAscii(it->first)) { 828 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 829 "Upper case characters in header: " + it->first); 830 return ERR_SPDY_PROTOCOL_ERROR; 831 } 832 833 SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first); 834 // Disallow duplicate headers. This is just to be conservative. 835 if (it2 != response_headers_.end() && it2->first == it->first) { 836 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 837 "Duplicate header: " + it->first); 838 return ERR_SPDY_PROTOCOL_ERROR; 839 } 840 841 response_headers_.insert(it2, *it); 842 } 843 844 // If delegate_ is not yet attached, we'll call 845 // OnResponseHeadersUpdated() after the delegate gets attached to 846 // the stream. 847 if (delegate_) { 848 // The call to OnResponseHeadersUpdated() below may delete |this|, 849 // so use |weak_this| to detect that. 850 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 851 852 SpdyResponseHeadersStatus status = 853 delegate_->OnResponseHeadersUpdated(response_headers_); 854 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 855 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 856 // have been closed. 857 CHECK(weak_this); 858 // Incomplete headers are OK only for push streams. 859 if (type_ != SPDY_PUSH_STREAM) { 860 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 861 "Incomplete headers"); 862 return ERR_INCOMPLETE_SPDY_HEADERS; 863 } 864 } else if (weak_this) { 865 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 866 } 867 } 868 869 return OK; 870} 871 872#define STATE_CASE(s) \ 873 case s: \ 874 description = base::StringPrintf("%s (0x%08X)", #s, s); \ 875 break 876 877std::string SpdyStream::DescribeState(State state) { 878 std::string description; 879 switch (state) { 880 STATE_CASE(STATE_IDLE); 881 STATE_CASE(STATE_OPEN); 882 STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED); 883 STATE_CASE(STATE_HALF_CLOSED_LOCAL); 884 STATE_CASE(STATE_CLOSED); 885 default: 886 description = base::StringPrintf("Unknown state 0x%08X (%u)", state, 887 state); 888 break; 889 } 890 return description; 891} 892 893#undef STATE_CASE 894 895} // namespace net 896