spdy_session.cc revision ca12bfac764ba476d6cd062bf1dde12cc64c3f40
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/spdy/spdy_session.h" 6 7#include <algorithm> 8#include <map> 9 10#include "base/basictypes.h" 11#include "base/bind.h" 12#include "base/compiler_specific.h" 13#include "base/logging.h" 14#include "base/message_loop/message_loop.h" 15#include "base/metrics/field_trial.h" 16#include "base/metrics/histogram.h" 17#include "base/metrics/sparse_histogram.h" 18#include "base/metrics/stats_counters.h" 19#include "base/stl_util.h" 20#include "base/strings/string_number_conversions.h" 21#include "base/strings/string_util.h" 22#include "base/strings/stringprintf.h" 23#include "base/strings/utf_string_conversions.h" 24#include "base/time/time.h" 25#include "base/values.h" 26#include "crypto/ec_private_key.h" 27#include "crypto/ec_signature_creator.h" 28#include "net/base/connection_type_histograms.h" 29#include "net/base/net_log.h" 30#include "net/base/net_util.h" 31#include "net/cert/asn1_util.h" 32#include "net/http/http_network_session.h" 33#include "net/http/http_server_properties.h" 34#include "net/spdy/spdy_buffer_producer.h" 35#include "net/spdy/spdy_credential_builder.h" 36#include "net/spdy/spdy_frame_builder.h" 37#include "net/spdy/spdy_http_utils.h" 38#include "net/spdy/spdy_protocol.h" 39#include "net/spdy/spdy_session_pool.h" 40#include "net/spdy/spdy_stream.h" 41#include "net/ssl/server_bound_cert_service.h" 42 43namespace net { 44 45namespace { 46 47const int kReadBufferSize = 8 * 1024; 48const int kDefaultConnectionAtRiskOfLossSeconds = 10; 49const int kHungIntervalSeconds = 10; 50 51// Always start at 1 for the first stream id. 52const SpdyStreamId kFirstStreamId = 1; 53 54// Minimum seconds that unclaimed pushed streams will be kept in memory. 55const int kMinPushedStreamLifetimeSeconds = 300; 56 57SpdyMajorVersion NPNToSpdyVersion(NextProto next_proto) { 58 switch (next_proto) { 59 case kProtoSPDY2: 60 case kProtoSPDY21: 61 return SPDY2; 62 case kProtoSPDY3: 63 case kProtoSPDY31: 64 return SPDY3; 65 case kProtoSPDY4a2: 66 return SPDY4; 67 default: 68 NOTREACHED(); 69 } 70 return SPDY2; 71} 72 73base::Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers, 74 bool fin, 75 bool unidirectional, 76 SpdyStreamId stream_id, 77 SpdyStreamId associated_stream, 78 NetLog::LogLevel /* log_level */) { 79 base::DictionaryValue* dict = new base::DictionaryValue(); 80 base::ListValue* headers_list = new base::ListValue(); 81 for (SpdyHeaderBlock::const_iterator it = headers->begin(); 82 it != headers->end(); ++it) { 83 headers_list->Append(new base::StringValue(base::StringPrintf( 84 "%s: %s", it->first.c_str(), 85 (ShouldShowHttpHeaderValue( 86 it->first) ? it->second : "[elided]").c_str()))); 87 } 88 dict->SetBoolean("fin", fin); 89 dict->SetBoolean("unidirectional", unidirectional); 90 dict->Set("headers", headers_list); 91 dict->SetInteger("stream_id", stream_id); 92 if (associated_stream) 93 dict->SetInteger("associated_stream", associated_stream); 94 return dict; 95} 96 97base::Value* NetLogSpdyCredentialCallback(size_t slot, 98 const std::string* origin, 99 NetLog::LogLevel /* log_level */) { 100 base::DictionaryValue* dict = new base::DictionaryValue(); 101 dict->SetInteger("slot", slot); 102 dict->SetString("origin", *origin); 103 return dict; 104} 105 106base::Value* NetLogSpdySessionCloseCallback(int net_error, 107 const std::string* description, 108 NetLog::LogLevel /* log_level */) { 109 base::DictionaryValue* dict = new base::DictionaryValue(); 110 dict->SetInteger("net_error", net_error); 111 dict->SetString("description", *description); 112 return dict; 113} 114 115base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair, 116 NetLog::LogLevel /* log_level */) { 117 base::DictionaryValue* dict = new base::DictionaryValue(); 118 dict->SetString("host", host_pair->first.ToString()); 119 dict->SetString("proxy", host_pair->second.ToPacString()); 120 return dict; 121} 122 123base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair, 124 bool clear_persisted, 125 NetLog::LogLevel /* log_level */) { 126 base::DictionaryValue* dict = new base::DictionaryValue(); 127 dict->SetString("host", host_port_pair.ToString()); 128 dict->SetBoolean("clear_persisted", clear_persisted); 129 return dict; 130} 131 132base::Value* NetLogSpdySettingCallback(SpdySettingsIds id, 133 SpdySettingsFlags flags, 134 uint32 value, 135 NetLog::LogLevel /* log_level */) { 136 base::DictionaryValue* dict = new base::DictionaryValue(); 137 dict->SetInteger("id", id); 138 dict->SetInteger("flags", flags); 139 dict->SetInteger("value", value); 140 return dict; 141} 142 143base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings, 144 NetLog::LogLevel /* log_level */) { 145 base::DictionaryValue* dict = new base::DictionaryValue(); 146 base::ListValue* settings_list = new base::ListValue(); 147 for (SettingsMap::const_iterator it = settings->begin(); 148 it != settings->end(); ++it) { 149 const SpdySettingsIds id = it->first; 150 const SpdySettingsFlags flags = it->second.first; 151 const uint32 value = it->second.second; 152 settings_list->Append(new base::StringValue( 153 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value))); 154 } 155 dict->Set("settings", settings_list); 156 return dict; 157} 158 159base::Value* NetLogSpdyWindowUpdateFrameCallback( 160 SpdyStreamId stream_id, 161 uint32 delta, 162 NetLog::LogLevel /* log_level */) { 163 base::DictionaryValue* dict = new base::DictionaryValue(); 164 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 165 dict->SetInteger("delta", delta); 166 return dict; 167} 168 169base::Value* NetLogSpdySessionWindowUpdateCallback( 170 int32 delta, 171 int32 window_size, 172 NetLog::LogLevel /* log_level */) { 173 base::DictionaryValue* dict = new base::DictionaryValue(); 174 dict->SetInteger("delta", delta); 175 dict->SetInteger("window_size", window_size); 176 return dict; 177} 178 179base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, 180 int size, 181 bool fin, 182 NetLog::LogLevel /* log_level */) { 183 base::DictionaryValue* dict = new base::DictionaryValue(); 184 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 185 dict->SetInteger("size", size); 186 dict->SetBoolean("fin", fin); 187 return dict; 188} 189 190base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id, 191 int status, 192 const std::string* description, 193 NetLog::LogLevel /* log_level */) { 194 base::DictionaryValue* dict = new base::DictionaryValue(); 195 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 196 dict->SetInteger("status", status); 197 dict->SetString("description", *description); 198 return dict; 199} 200 201base::Value* NetLogSpdyPingCallback(uint32 unique_id, 202 const char* type, 203 NetLog::LogLevel /* log_level */) { 204 base::DictionaryValue* dict = new base::DictionaryValue(); 205 dict->SetInteger("unique_id", unique_id); 206 dict->SetString("type", type); 207 return dict; 208} 209 210base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id, 211 int active_streams, 212 int unclaimed_streams, 213 SpdyGoAwayStatus status, 214 NetLog::LogLevel /* log_level */) { 215 base::DictionaryValue* dict = new base::DictionaryValue(); 216 dict->SetInteger("last_accepted_stream_id", 217 static_cast<int>(last_stream_id)); 218 dict->SetInteger("active_streams", active_streams); 219 dict->SetInteger("unclaimed_streams", unclaimed_streams); 220 dict->SetInteger("status", static_cast<int>(status)); 221 return dict; 222} 223 224// Maximum number of concurrent streams we will create, unless the server 225// sends a SETTINGS frame with a different value. 226const size_t kInitialMaxConcurrentStreams = 100; 227// The maximum number of concurrent streams we will ever create. Even if 228// the server permits more, we will never exceed this limit. 229const size_t kMaxConcurrentStreamLimit = 256; 230 231} // namespace 232 233SpdyStreamRequest::SpdyStreamRequest() { 234 Reset(); 235} 236 237SpdyStreamRequest::~SpdyStreamRequest() { 238 CancelRequest(); 239} 240 241int SpdyStreamRequest::StartRequest( 242 SpdyStreamType type, 243 const base::WeakPtr<SpdySession>& session, 244 const GURL& url, 245 RequestPriority priority, 246 const BoundNetLog& net_log, 247 const CompletionCallback& callback) { 248 DCHECK(session.get()); 249 DCHECK(!session_.get()); 250 DCHECK(!stream_.get()); 251 DCHECK(callback_.is_null()); 252 253 type_ = type; 254 session_ = session; 255 url_ = url; 256 priority_ = priority; 257 net_log_ = net_log; 258 callback_ = callback; 259 260 base::WeakPtr<SpdyStream> stream; 261 int rv = session->TryCreateStream(this, &stream); 262 if (rv == OK) { 263 Reset(); 264 stream_ = stream; 265 } 266 return rv; 267} 268 269void SpdyStreamRequest::CancelRequest() { 270 if (session_.get()) 271 session_->CancelStreamRequest(this); 272 Reset(); 273} 274 275base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() { 276 DCHECK(!session_.get()); 277 base::WeakPtr<SpdyStream> stream = stream_; 278 DCHECK(stream.get()); 279 Reset(); 280 return stream; 281} 282 283void SpdyStreamRequest::OnRequestCompleteSuccess( 284 base::WeakPtr<SpdyStream>* stream) { 285 DCHECK(session_.get()); 286 DCHECK(!stream_.get()); 287 DCHECK(!callback_.is_null()); 288 CompletionCallback callback = callback_; 289 Reset(); 290 DCHECK(*stream); 291 stream_ = *stream; 292 callback.Run(OK); 293} 294 295void SpdyStreamRequest::OnRequestCompleteFailure(int rv) { 296 DCHECK(session_.get()); 297 DCHECK(!stream_.get()); 298 DCHECK(!callback_.is_null()); 299 CompletionCallback callback = callback_; 300 Reset(); 301 DCHECK_NE(rv, OK); 302 callback.Run(rv); 303} 304 305void SpdyStreamRequest::Reset() { 306 type_ = SPDY_BIDIRECTIONAL_STREAM; 307 session_.reset(); 308 stream_.reset(); 309 url_ = GURL(); 310 priority_ = MINIMUM_PRIORITY; 311 net_log_ = BoundNetLog(); 312 callback_.Reset(); 313} 314 315SpdySession::ActiveStreamInfo::ActiveStreamInfo() 316 : stream(NULL), 317 waiting_for_syn_reply(false) {} 318 319SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream) 320 : stream(stream), 321 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {} 322 323SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {} 324 325SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {} 326 327SpdySession::PushedStreamInfo::PushedStreamInfo( 328 SpdyStreamId stream_id, 329 base::TimeTicks creation_time) 330 : stream_id(stream_id), 331 creation_time(creation_time) {} 332 333SpdySession::PushedStreamInfo::~PushedStreamInfo() {} 334 335SpdySession::SpdySession( 336 const SpdySessionKey& spdy_session_key, 337 const base::WeakPtr<HttpServerProperties>& http_server_properties, 338 bool verify_domain_authentication, 339 bool enable_sending_initial_settings, 340 bool enable_credential_frames, 341 bool enable_compression, 342 bool enable_ping_based_connection_checking, 343 NextProto default_protocol, 344 size_t stream_initial_recv_window_size, 345 size_t initial_max_concurrent_streams, 346 size_t max_concurrent_streams_limit, 347 TimeFunc time_func, 348 const HostPortPair& trusted_spdy_proxy, 349 NetLog* net_log) 350 : weak_factory_(this), 351 in_io_loop_(false), 352 spdy_session_key_(spdy_session_key), 353 pool_(NULL), 354 http_server_properties_(http_server_properties), 355 read_buffer_(new IOBuffer(kReadBufferSize)), 356 stream_hi_water_mark_(kFirstStreamId), 357 in_flight_write_frame_type_(DATA), 358 in_flight_write_frame_size_(0), 359 is_secure_(false), 360 certificate_error_code_(OK), 361 availability_state_(STATE_AVAILABLE), 362 read_state_(READ_STATE_DO_READ), 363 write_state_(WRITE_STATE_IDLE), 364 error_on_close_(OK), 365 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 366 kInitialMaxConcurrentStreams : 367 initial_max_concurrent_streams), 368 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 369 kMaxConcurrentStreamLimit : 370 max_concurrent_streams_limit), 371 streams_initiated_count_(0), 372 streams_pushed_count_(0), 373 streams_pushed_and_claimed_count_(0), 374 streams_abandoned_count_(0), 375 total_bytes_received_(0), 376 sent_settings_(false), 377 received_settings_(false), 378 stalled_streams_(0), 379 pings_in_flight_(0), 380 next_ping_id_(1), 381 last_activity_time_(time_func()), 382 check_ping_status_pending_(false), 383 flow_control_state_(FLOW_CONTROL_NONE), 384 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), 385 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ? 386 kDefaultInitialRecvWindowSize : 387 stream_initial_recv_window_size), 388 session_send_window_size_(0), 389 session_recv_window_size_(0), 390 session_unacked_recv_window_bytes_(0), 391 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), 392 verify_domain_authentication_(verify_domain_authentication), 393 enable_sending_initial_settings_(enable_sending_initial_settings), 394 enable_credential_frames_(enable_credential_frames), 395 enable_compression_(enable_compression), 396 enable_ping_based_connection_checking_( 397 enable_ping_based_connection_checking), 398 default_protocol_(default_protocol), 399 credential_state_(SpdyCredentialState::kDefaultNumSlots), 400 connection_at_risk_of_loss_time_( 401 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)), 402 hung_interval_( 403 base::TimeDelta::FromSeconds(kHungIntervalSeconds)), 404 trusted_spdy_proxy_(trusted_spdy_proxy), 405 time_func_(time_func) { 406 DCHECK(HttpStreamFactory::spdy_enabled()); 407 net_log_.BeginEvent( 408 NetLog::TYPE_SPDY_SESSION, 409 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 410 next_unclaimed_push_stream_sweep_time_ = time_func_() + 411 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 412 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 413} 414 415SpdySession::~SpdySession() { 416 CHECK(!in_io_loop_); 417 DCHECK(!pool_); 418 DcheckClosed(); 419 420 // TODO(akalin): Check connection->is_initialized() instead. This 421 // requires re-working CreateFakeSpdySession(), though. 422 DCHECK(connection_->socket()); 423 // With SPDY we can't recycle sockets. 424 connection_->socket()->Disconnect(); 425 426 RecordHistograms(); 427 428 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 429} 430 431Error SpdySession::InitializeWithSocket( 432 scoped_ptr<ClientSocketHandle> connection, 433 SpdySessionPool* pool, 434 bool is_secure, 435 int certificate_error_code) { 436 CHECK(!in_io_loop_); 437 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 438 DCHECK_EQ(read_state_, READ_STATE_DO_READ); 439 DCHECK_EQ(write_state_, WRITE_STATE_IDLE); 440 DCHECK(!connection_); 441 442 DCHECK(certificate_error_code == OK || 443 certificate_error_code < ERR_IO_PENDING); 444 // TODO(akalin): Check connection->is_initialized() instead. This 445 // requires re-working CreateFakeSpdySession(), though. 446 DCHECK(connection->socket()); 447 448 base::StatsCounter spdy_sessions("spdy.sessions"); 449 spdy_sessions.Increment(); 450 451 connection_ = connection.Pass(); 452 is_secure_ = is_secure; 453 certificate_error_code_ = certificate_error_code; 454 455 NextProto protocol = default_protocol_; 456 NextProto protocol_negotiated = 457 connection_->socket()->GetNegotiatedProtocol(); 458 if (protocol_negotiated != kProtoUnknown) { 459 protocol = protocol_negotiated; 460 } 461 462 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 463 if (ssl_socket && ssl_socket->WasChannelIDSent()) { 464 // According to the SPDY spec, the credential associated with the TLS 465 // connection is stored in slot[1]. 466 credential_state_.SetHasCredential(GURL("https://" + 467 host_port_pair().ToString())); 468 } 469 470 // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we 471 // stop supporting SPDY/1. 472 DCHECK_GE(protocol, kProtoSPDY2); 473 DCHECK_LE(protocol, kProtoSPDYMaximumVersion); 474 if (protocol >= kProtoSPDY31) { 475 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; 476 session_send_window_size_ = kSpdySessionInitialWindowSize; 477 session_recv_window_size_ = kSpdySessionInitialWindowSize; 478 } else if (protocol >= kProtoSPDY3) { 479 flow_control_state_ = FLOW_CONTROL_STREAM; 480 } else { 481 flow_control_state_ = FLOW_CONTROL_NONE; 482 } 483 484 buffered_spdy_framer_.reset( 485 new BufferedSpdyFramer(NPNToSpdyVersion(protocol), enable_compression_)); 486 buffered_spdy_framer_->set_visitor(this); 487 buffered_spdy_framer_->set_debug_visitor(this); 488 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol, kProtoMaximumVersion); 489#if defined(SPDY_PROXY_AUTH_ORIGIN) 490 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 491 host_port_pair().Equals(HostPortPair::FromURL( 492 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 493#endif 494 495 net_log_.AddEvent( 496 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 497 connection_->socket()->NetLog().source().ToEventParametersCallback()); 498 499 int error = DoReadLoop(READ_STATE_DO_READ, OK); 500 if (error == ERR_IO_PENDING) 501 error = OK; 502 if (error == OK) { 503 DCHECK_NE(availability_state_, STATE_CLOSED); 504 connection_->AddLayeredPool(this); 505 SendInitialSettings(); 506 pool_ = pool; 507 } else { 508 DcheckClosed(); 509 } 510 return static_cast<Error>(error); 511} 512 513bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 514 if (!verify_domain_authentication_) 515 return true; 516 517 if (availability_state_ == STATE_CLOSED) 518 return false; 519 520 SSLInfo ssl_info; 521 bool was_npn_negotiated; 522 NextProto protocol_negotiated = kProtoUnknown; 523 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 524 return true; // This is not a secure session, so all domains are okay. 525 526 return !ssl_info.client_cert_sent && 527 (enable_credential_frames_ || !ssl_info.channel_id_sent || 528 ServerBoundCertService::GetDomainForHost(domain) == 529 ServerBoundCertService::GetDomainForHost(host_port_pair().host())) && 530 ssl_info.cert->VerifyNameMatch(domain); 531} 532 533int SpdySession::GetPushStream( 534 const GURL& url, 535 base::WeakPtr<SpdyStream>* stream, 536 const BoundNetLog& stream_net_log) { 537 CHECK(!in_io_loop_); 538 539 stream->reset(); 540 541 // TODO(akalin): Add unit test exercising this code path. 542 if (availability_state_ == STATE_CLOSED) 543 return ERR_CONNECTION_CLOSED; 544 545 Error err = TryAccessStream(url); 546 if (err != OK) 547 return err; 548 549 *stream = GetActivePushStream(url); 550 if (*stream) { 551 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 552 streams_pushed_and_claimed_count_++; 553 } 554 return OK; 555} 556 557Error SpdySession::TryAccessStream(const GURL& url) { 558 CHECK(!in_io_loop_); 559 DCHECK_NE(availability_state_, STATE_CLOSED); 560 561 if (is_secure_ && certificate_error_code_ != OK && 562 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 563 RecordProtocolErrorHistogram( 564 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 565 CloseSessionResult result = DoCloseSession( 566 static_cast<Error>(certificate_error_code_), 567 "Tried to get SPDY stream for secure content over an unauthenticated " 568 "session."); 569 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 570 return ERR_SPDY_PROTOCOL_ERROR; 571 } 572 return OK; 573} 574 575int SpdySession::TryCreateStream(SpdyStreamRequest* request, 576 base::WeakPtr<SpdyStream>* stream) { 577 CHECK(request); 578 CHECK(!in_io_loop_); 579 580 if (availability_state_ == STATE_GOING_AWAY) 581 return ERR_FAILED; 582 583 // TODO(akalin): Add unit test exercising this code path. 584 if (availability_state_ == STATE_CLOSED) 585 return ERR_CONNECTION_CLOSED; 586 587 Error err = TryAccessStream(request->url()); 588 if (err != OK) 589 return err; 590 591 if (!max_concurrent_streams_ || 592 (active_streams_.size() + created_streams_.size() < 593 max_concurrent_streams_)) { 594 return CreateStream(*request, stream); 595 } 596 597 stalled_streams_++; 598 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 599 pending_create_stream_queues_[request->priority()].push_back(request); 600 return ERR_IO_PENDING; 601} 602 603int SpdySession::CreateStream(const SpdyStreamRequest& request, 604 base::WeakPtr<SpdyStream>* stream) { 605 CHECK(!in_io_loop_); 606 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 607 DCHECK_LT(request.priority(), NUM_PRIORITIES); 608 609 if (availability_state_ == STATE_GOING_AWAY) 610 return ERR_FAILED; 611 612 // TODO(akalin): Add unit test exercising this code path. 613 if (availability_state_ == STATE_CLOSED) 614 return ERR_CONNECTION_CLOSED; 615 616 Error err = TryAccessStream(request.url()); 617 if (err != OK) { 618 // This should have been caught in TryCreateStream(). 619 NOTREACHED(); 620 return err; 621 } 622 623 DCHECK(connection_->socket()); 624 DCHECK(connection_->socket()->IsConnected()); 625 if (connection_->socket()) { 626 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 627 connection_->socket()->IsConnected()); 628 if (!connection_->socket()->IsConnected()) { 629 CloseSessionResult result = DoCloseSession( 630 ERR_CONNECTION_CLOSED, 631 "Tried to create SPDY stream for a closed socket connection."); 632 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 633 return ERR_CONNECTION_CLOSED; 634 } 635 } 636 637 scoped_ptr<SpdyStream> new_stream( 638 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 639 request.priority(), 640 stream_initial_send_window_size_, 641 stream_initial_recv_window_size_, 642 request.net_log())); 643 *stream = new_stream->GetWeakPtr(); 644 InsertCreatedStream(new_stream.Pass()); 645 646 UMA_HISTOGRAM_CUSTOM_COUNTS( 647 "Net.SpdyPriorityCount", 648 static_cast<int>(request.priority()), 0, 10, 11); 649 650 return OK; 651} 652 653void SpdySession::CancelStreamRequest(SpdyStreamRequest* request) { 654 CHECK(request); 655 656 if (DCHECK_IS_ON()) { 657 // |request| should not be in a queue not matching its priority. 658 for (int i = 0; i < NUM_PRIORITIES; ++i) { 659 if (request->priority() == i) 660 continue; 661 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i]; 662 DCHECK(std::find(queue->begin(), queue->end(), request) == queue->end()); 663 } 664 } 665 666 PendingStreamRequestQueue* queue = 667 &pending_create_stream_queues_[request->priority()]; 668 // Remove |request| from |queue| while preserving the order of the 669 // other elements. 670 PendingStreamRequestQueue::iterator it = 671 std::find(queue->begin(), queue->end(), request); 672 if (it != queue->end()) { 673 it = queue->erase(it); 674 // |request| should be in the queue at most once, and if it is 675 // present, should not be pending completion. 676 DCHECK(std::find(it, queue->end(), request) == queue->end()); 677 DCHECK(!ContainsKey(pending_stream_request_completions_, 678 request)); 679 return; 680 } 681 682 pending_stream_request_completions_.erase(request); 683} 684 685void SpdySession::ProcessPendingStreamRequests() { 686 while (!max_concurrent_streams_ || 687 (active_streams_.size() + created_streams_.size() < 688 max_concurrent_streams_)) { 689 bool no_pending_create_streams = true; 690 for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { 691 if (!pending_create_stream_queues_[i].empty()) { 692 SpdyStreamRequest* pending_request = 693 pending_create_stream_queues_[i].front(); 694 CHECK(pending_request); 695 pending_create_stream_queues_[i].pop_front(); 696 no_pending_create_streams = false; 697 DCHECK(!ContainsKey(pending_stream_request_completions_, 698 pending_request)); 699 pending_stream_request_completions_.insert(pending_request); 700 base::MessageLoop::current()->PostTask( 701 FROM_HERE, 702 base::Bind(&SpdySession::CompleteStreamRequest, 703 weak_factory_.GetWeakPtr(), pending_request)); 704 break; 705 } 706 } 707 if (no_pending_create_streams) 708 return; // there were no streams in any queue 709 } 710} 711 712bool SpdySession::NeedsCredentials() const { 713 if (!is_secure_) 714 return false; 715 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 716 if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3) 717 return false; 718 return ssl_socket->WasChannelIDSent(); 719} 720 721void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) { 722 pooled_aliases_.insert(alias_key); 723} 724 725int SpdySession::GetProtocolVersion() const { 726 DCHECK(buffered_spdy_framer_.get()); 727 return buffered_spdy_framer_->protocol_version(); 728} 729 730base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 731 return weak_factory_.GetWeakPtr(); 732} 733 734bool SpdySession::CloseOneIdleConnection() { 735 CHECK(!in_io_loop_); 736 DCHECK_NE(availability_state_, STATE_CLOSED); 737 DCHECK(pool_); 738 if (!active_streams_.empty()) 739 return false; 740 CloseSessionResult result = 741 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection."); 742 if (result != SESSION_CLOSED_AND_REMOVED) { 743 NOTREACHED(); 744 return false; 745 } 746 return true; 747} 748 749void SpdySession::EnqueueStreamWrite( 750 const base::WeakPtr<SpdyStream>& stream, 751 SpdyFrameType frame_type, 752 scoped_ptr<SpdyBufferProducer> producer) { 753 DCHECK(frame_type == HEADERS || 754 frame_type == DATA || 755 frame_type == CREDENTIAL || 756 frame_type == SYN_STREAM); 757 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 758} 759 760scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 761 SpdyStreamId stream_id, 762 RequestPriority priority, 763 uint8 credential_slot, 764 SpdyControlFlags flags, 765 const SpdyHeaderBlock& headers) { 766 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 767 CHECK(it != active_streams_.end()); 768 CHECK_EQ(it->second.stream->stream_id(), stream_id); 769 770 SendPrefacePingIfNoneInFlight(); 771 772 DCHECK(buffered_spdy_framer_.get()); 773 scoped_ptr<SpdyFrame> syn_frame( 774 buffered_spdy_framer_->CreateSynStream( 775 stream_id, 0, 776 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), 777 credential_slot, flags, enable_compression_, &headers)); 778 779 base::StatsCounter spdy_requests("spdy.requests"); 780 spdy_requests.Increment(); 781 streams_initiated_count_++; 782 783 if (net_log().IsLoggingAllEvents()) { 784 net_log().AddEvent( 785 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 786 base::Bind(&NetLogSpdySynCallback, &headers, 787 (flags & CONTROL_FLAG_FIN) != 0, 788 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 789 stream_id, 0)); 790 } 791 792 return syn_frame.Pass(); 793} 794 795int SpdySession::CreateCredentialFrame( 796 const std::string& origin, 797 SSLClientCertType type, 798 const std::string& key, 799 const std::string& cert, 800 RequestPriority priority, 801 scoped_ptr<SpdyFrame>* credential_frame) { 802 DCHECK(is_secure_); 803 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 804 DCHECK(ssl_socket); 805 DCHECK(ssl_socket->WasChannelIDSent()); 806 807 SpdyCredential credential; 808 std::string tls_unique; 809 ssl_socket->GetTLSUniqueChannelBinding(&tls_unique); 810 size_t slot = credential_state_.SetHasCredential(GURL(origin)); 811 int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot, 812 &credential); 813 DCHECK_NE(rv, ERR_IO_PENDING); 814 if (rv != OK) 815 return rv; 816 817 DCHECK(buffered_spdy_framer_.get()); 818 credential_frame->reset( 819 buffered_spdy_framer_->CreateCredentialFrame(credential)); 820 821 if (net_log().IsLoggingAllEvents()) { 822 net_log().AddEvent( 823 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, 824 base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); 825 } 826 return OK; 827} 828 829scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 830 IOBuffer* data, 831 int len, 832 SpdyDataFlags flags) { 833 if (availability_state_ == STATE_CLOSED) { 834 NOTREACHED(); 835 return scoped_ptr<SpdyBuffer>(); 836 } 837 838 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 839 CHECK(it != active_streams_.end()); 840 SpdyStream* stream = it->second.stream; 841 CHECK_EQ(stream->stream_id(), stream_id); 842 843 if (len < 0) { 844 NOTREACHED(); 845 return scoped_ptr<SpdyBuffer>(); 846 } 847 848 int effective_len = std::min(len, kMaxSpdyFrameChunkSize); 849 850 bool send_stalled_by_stream = 851 (flow_control_state_ >= FLOW_CONTROL_STREAM) && 852 (stream->send_window_size() <= 0); 853 bool send_stalled_by_session = IsSendStalled(); 854 855 // NOTE: There's an enum of the same name in histograms.xml. 856 enum SpdyFrameFlowControlState { 857 SEND_NOT_STALLED, 858 SEND_STALLED_BY_STREAM, 859 SEND_STALLED_BY_SESSION, 860 SEND_STALLED_BY_STREAM_AND_SESSION, 861 }; 862 863 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED; 864 if (send_stalled_by_stream) { 865 if (send_stalled_by_session) { 866 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION; 867 } else { 868 frame_flow_control_state = SEND_STALLED_BY_STREAM; 869 } 870 } else if (send_stalled_by_session) { 871 frame_flow_control_state = SEND_STALLED_BY_SESSION; 872 } 873 874 if (flow_control_state_ == FLOW_CONTROL_STREAM) { 875 UMA_HISTOGRAM_ENUMERATION( 876 "Net.SpdyFrameStreamFlowControlState", 877 frame_flow_control_state, 878 SEND_STALLED_BY_STREAM + 1); 879 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 880 UMA_HISTOGRAM_ENUMERATION( 881 "Net.SpdyFrameStreamAndSessionFlowControlState", 882 frame_flow_control_state, 883 SEND_STALLED_BY_STREAM_AND_SESSION + 1); 884 } 885 886 // Obey send window size of the stream if stream flow control is 887 // enabled. 888 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 889 if (send_stalled_by_stream) { 890 stream->set_send_stalled_by_flow_control(true); 891 // Even though we're currently stalled only by the stream, we 892 // might end up being stalled by the session also. 893 QueueSendStalledStream(*stream); 894 net_log().AddEvent( 895 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, 896 NetLog::IntegerCallback("stream_id", stream_id)); 897 return scoped_ptr<SpdyBuffer>(); 898 } 899 900 effective_len = std::min(effective_len, stream->send_window_size()); 901 } 902 903 // Obey send window size of the session if session flow control is 904 // enabled. 905 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 906 if (send_stalled_by_session) { 907 stream->set_send_stalled_by_flow_control(true); 908 QueueSendStalledStream(*stream); 909 net_log().AddEvent( 910 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, 911 NetLog::IntegerCallback("stream_id", stream_id)); 912 return scoped_ptr<SpdyBuffer>(); 913 } 914 915 effective_len = std::min(effective_len, session_send_window_size_); 916 } 917 918 DCHECK_GE(effective_len, 0); 919 920 // Clear FIN flag if only some of the data will be in the data 921 // frame. 922 if (effective_len < len) 923 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 924 925 if (net_log().IsLoggingAllEvents()) { 926 net_log().AddEvent( 927 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 928 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len, 929 (flags & DATA_FLAG_FIN) != 0)); 930 } 931 932 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 933 if (effective_len > 0) 934 SendPrefacePingIfNoneInFlight(); 935 936 // TODO(mbelshe): reduce memory copies here. 937 DCHECK(buffered_spdy_framer_.get()); 938 scoped_ptr<SpdyFrame> frame( 939 buffered_spdy_framer_->CreateDataFrame( 940 stream_id, data->data(), 941 static_cast<uint32>(effective_len), flags)); 942 943 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); 944 945 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 946 DecreaseSendWindowSize(static_cast<int32>(effective_len)); 947 data_buffer->AddConsumeCallback( 948 base::Bind(&SpdySession::OnWriteBufferConsumed, 949 weak_factory_.GetWeakPtr(), 950 static_cast<size_t>(effective_len))); 951 } 952 953 return data_buffer.Pass(); 954} 955 956void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { 957 DCHECK_NE(stream_id, 0u); 958 959 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 960 if (it == active_streams_.end()) { 961 NOTREACHED(); 962 return; 963 } 964 965 CloseActiveStreamIterator(it, status); 966} 967 968void SpdySession::CloseCreatedStream( 969 const base::WeakPtr<SpdyStream>& stream, int status) { 970 DCHECK_EQ(stream->stream_id(), 0u); 971 972 CreatedStreamSet::iterator it = created_streams_.find(stream.get()); 973 if (it == created_streams_.end()) { 974 NOTREACHED(); 975 return; 976 } 977 978 CloseCreatedStreamIterator(it, status); 979} 980 981void SpdySession::ResetStream(SpdyStreamId stream_id, 982 SpdyRstStreamStatus status, 983 const std::string& description) { 984 DCHECK_NE(stream_id, 0u); 985 986 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 987 if (it == active_streams_.end()) { 988 NOTREACHED(); 989 return; 990 } 991 992 ResetStreamIterator(it, status, description); 993} 994 995bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 996 return ContainsKey(active_streams_, stream_id); 997} 998 999LoadState SpdySession::GetLoadState() const { 1000 // Just report that we're idle since the session could be doing 1001 // many things concurrently. 1002 return LOAD_STATE_IDLE; 1003} 1004 1005void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, 1006 int status) { 1007 // TODO(mbelshe): We should send a RST_STREAM control frame here 1008 // so that the server can cancel a large send. 1009 1010 scoped_ptr<SpdyStream> owned_stream(it->second.stream); 1011 active_streams_.erase(it); 1012 1013 // TODO(akalin): When SpdyStream was ref-counted (and 1014 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1015 // was only done when status was not OK. This meant that pushed 1016 // streams can still be claimed after they're closed. This is 1017 // probably something that we still want to support, although server 1018 // push is hardly used. Write tests for this and fix this. (See 1019 // http://crbug.com/261712 .) 1020 if (owned_stream->type() == SPDY_PUSH_STREAM) 1021 unclaimed_pushed_streams_.erase(owned_stream->url()); 1022 1023 DeleteStream(owned_stream.Pass(), status); 1024} 1025 1026void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1027 int status) { 1028 scoped_ptr<SpdyStream> owned_stream(*it); 1029 created_streams_.erase(it); 1030 DeleteStream(owned_stream.Pass(), status); 1031} 1032 1033void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it, 1034 SpdyRstStreamStatus status, 1035 const std::string& description) { 1036 SpdyStreamId stream_id = it->first; 1037 RequestPriority priority = it->second.stream->priority(); 1038 // Removes any pending writes for the stream except for possibly an 1039 // in-flight one. 1040 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 1041 1042 SendResetStreamFrame(stream_id, priority, status, description); 1043} 1044 1045void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id, 1046 RequestPriority priority, 1047 SpdyRstStreamStatus status, 1048 const std::string& description) { 1049 DCHECK_NE(stream_id, 0u); 1050 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 1051 1052 net_log().AddEvent( 1053 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 1054 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description)); 1055 1056 DCHECK(buffered_spdy_framer_.get()); 1057 scoped_ptr<SpdyFrame> rst_frame( 1058 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1059 1060 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1061 RecordProtocolErrorHistogram( 1062 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 1063} 1064 1065void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1066 CHECK(!in_io_loop_); 1067 DCHECK_NE(availability_state_, STATE_CLOSED); 1068 DCHECK_EQ(read_state_, expected_read_state); 1069 1070 result = DoReadLoop(expected_read_state, result); 1071 1072 if (availability_state_ == STATE_CLOSED) { 1073 DCHECK_EQ(result, error_on_close_); 1074 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1075 RemoveFromPool(); 1076 return; 1077 } 1078 1079 DCHECK(result == OK || result == ERR_IO_PENDING); 1080} 1081 1082int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1083 CHECK(!in_io_loop_); 1084 DCHECK_NE(availability_state_, STATE_CLOSED); 1085 DCHECK_EQ(read_state_, expected_read_state); 1086 1087 in_io_loop_ = true; 1088 1089 int bytes_read_without_yielding = 0; 1090 1091 // Loop until the session is closed, the read becomes blocked, or 1092 // the read limit is exceeded. 1093 while (true) { 1094 switch (read_state_) { 1095 case READ_STATE_DO_READ: 1096 DCHECK_EQ(result, OK); 1097 result = DoRead(); 1098 break; 1099 case READ_STATE_DO_READ_COMPLETE: 1100 if (result > 0) 1101 bytes_read_without_yielding += result; 1102 result = DoReadComplete(result); 1103 break; 1104 default: 1105 NOTREACHED() << "read_state_: " << read_state_; 1106 break; 1107 } 1108 1109 if (availability_state_ == STATE_CLOSED) { 1110 DCHECK_EQ(result, error_on_close_); 1111 DCHECK_LT(result, ERR_IO_PENDING); 1112 break; 1113 } 1114 1115 if (result == ERR_IO_PENDING) 1116 break; 1117 1118 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1119 read_state_ = READ_STATE_DO_READ; 1120 base::MessageLoop::current()->PostTask( 1121 FROM_HERE, 1122 base::Bind(&SpdySession::PumpReadLoop, 1123 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1124 result = ERR_IO_PENDING; 1125 break; 1126 } 1127 } 1128 1129 CHECK(in_io_loop_); 1130 in_io_loop_ = false; 1131 1132 return result; 1133} 1134 1135int SpdySession::DoRead() { 1136 CHECK(in_io_loop_); 1137 DCHECK_NE(availability_state_, STATE_CLOSED); 1138 1139 CHECK(connection_); 1140 CHECK(connection_->socket()); 1141 read_state_ = READ_STATE_DO_READ_COMPLETE; 1142 return connection_->socket()->Read( 1143 read_buffer_.get(), 1144 kReadBufferSize, 1145 base::Bind(&SpdySession::PumpReadLoop, 1146 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1147} 1148 1149int SpdySession::DoReadComplete(int result) { 1150 CHECK(in_io_loop_); 1151 DCHECK_NE(availability_state_, STATE_CLOSED); 1152 1153 // Parse a frame. For now this code requires that the frame fit into our 1154 // buffer (kReadBufferSize). 1155 // TODO(mbelshe): support arbitrarily large frames! 1156 1157 if (result == 0) { 1158 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1159 total_bytes_received_, 1, 100000000, 50); 1160 CloseSessionResult close_session_result = 1161 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1162 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1163 DCHECK_EQ(availability_state_, STATE_CLOSED); 1164 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); 1165 return ERR_CONNECTION_CLOSED; 1166 } 1167 1168 if (result < 0) { 1169 CloseSessionResult close_session_result = 1170 DoCloseSession(static_cast<Error>(result), "result is < 0."); 1171 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1172 DCHECK_EQ(availability_state_, STATE_CLOSED); 1173 DCHECK_EQ(error_on_close_, result); 1174 return result; 1175 } 1176 1177 total_bytes_received_ += result; 1178 1179 last_activity_time_ = time_func_(); 1180 1181 DCHECK(buffered_spdy_framer_.get()); 1182 char* data = read_buffer_->data(); 1183 while (result > 0) { 1184 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1185 result -= bytes_processed; 1186 data += bytes_processed; 1187 1188 if (availability_state_ == STATE_CLOSED) { 1189 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1190 return error_on_close_; 1191 } 1192 1193 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1194 } 1195 1196 read_state_ = READ_STATE_DO_READ; 1197 return OK; 1198} 1199 1200void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1201 CHECK(!in_io_loop_); 1202 DCHECK_NE(availability_state_, STATE_CLOSED); 1203 DCHECK_EQ(write_state_, expected_write_state); 1204 1205 result = DoWriteLoop(expected_write_state, result); 1206 1207 if (availability_state_ == STATE_CLOSED) { 1208 DCHECK_EQ(result, error_on_close_); 1209 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1210 RemoveFromPool(); 1211 return; 1212 } 1213 1214 DCHECK(result == OK || result == ERR_IO_PENDING); 1215} 1216 1217int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1218 CHECK(!in_io_loop_); 1219 DCHECK_NE(availability_state_, STATE_CLOSED); 1220 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1221 DCHECK_EQ(write_state_, expected_write_state); 1222 1223 in_io_loop_ = true; 1224 1225 // Loop until the session is closed or the write becomes blocked. 1226 while (true) { 1227 switch (write_state_) { 1228 case WRITE_STATE_DO_WRITE: 1229 DCHECK_EQ(result, OK); 1230 result = DoWrite(); 1231 break; 1232 case WRITE_STATE_DO_WRITE_COMPLETE: 1233 result = DoWriteComplete(result); 1234 break; 1235 case WRITE_STATE_IDLE: 1236 default: 1237 NOTREACHED() << "write_state_: " << write_state_; 1238 break; 1239 } 1240 1241 if (availability_state_ == STATE_CLOSED) { 1242 DCHECK_EQ(result, error_on_close_); 1243 DCHECK_LT(result, ERR_IO_PENDING); 1244 break; 1245 } 1246 1247 if (write_state_ == WRITE_STATE_IDLE) { 1248 DCHECK_EQ(result, ERR_IO_PENDING); 1249 break; 1250 } 1251 1252 if (result == ERR_IO_PENDING) 1253 break; 1254 } 1255 1256 CHECK(in_io_loop_); 1257 in_io_loop_ = false; 1258 1259 return result; 1260} 1261 1262int SpdySession::DoWrite() { 1263 CHECK(in_io_loop_); 1264 DCHECK_NE(availability_state_, STATE_CLOSED); 1265 1266 DCHECK(buffered_spdy_framer_); 1267 if (in_flight_write_) { 1268 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1269 } else { 1270 // Grab the next frame to send. 1271 SpdyFrameType frame_type = DATA; 1272 scoped_ptr<SpdyBufferProducer> producer; 1273 base::WeakPtr<SpdyStream> stream; 1274 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1275 write_state_ = WRITE_STATE_IDLE; 1276 return ERR_IO_PENDING; 1277 } 1278 1279 if (stream.get()) 1280 DCHECK(!stream->IsClosed()); 1281 1282 // Activate the stream only when sending the SYN_STREAM frame to 1283 // guarantee monotonically-increasing stream IDs. 1284 if (frame_type == SYN_STREAM) { 1285 if (stream.get() && stream->stream_id() == 0) { 1286 scoped_ptr<SpdyStream> owned_stream = 1287 ActivateCreatedStream(stream.get()); 1288 InsertActivatedStream(owned_stream.Pass()); 1289 } else { 1290 NOTREACHED(); 1291 return ERR_UNEXPECTED; 1292 } 1293 } 1294 1295 in_flight_write_ = producer->ProduceBuffer(); 1296 if (!in_flight_write_) { 1297 NOTREACHED(); 1298 return ERR_UNEXPECTED; 1299 } 1300 in_flight_write_frame_type_ = frame_type; 1301 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); 1302 DCHECK_GE(in_flight_write_frame_size_, 1303 buffered_spdy_framer_->GetFrameMinimumSize()); 1304 in_flight_write_stream_ = stream; 1305 } 1306 1307 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; 1308 1309 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems 1310 // with Socket implementations that don't store their IOBuffer 1311 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). 1312 scoped_refptr<IOBuffer> write_io_buffer = 1313 in_flight_write_->GetIOBufferForRemainingData(); 1314 return connection_->socket()->Write( 1315 write_io_buffer.get(), 1316 in_flight_write_->GetRemainingSize(), 1317 base::Bind(&SpdySession::PumpWriteLoop, 1318 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1319} 1320 1321int SpdySession::DoWriteComplete(int result) { 1322 CHECK(in_io_loop_); 1323 DCHECK_NE(availability_state_, STATE_CLOSED); 1324 DCHECK_NE(result, ERR_IO_PENDING); 1325 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1326 1327 last_activity_time_ = time_func_(); 1328 1329 if (result < 0) { 1330 DCHECK_NE(result, ERR_IO_PENDING); 1331 in_flight_write_.reset(); 1332 in_flight_write_frame_type_ = DATA; 1333 in_flight_write_frame_size_ = 0; 1334 in_flight_write_stream_.reset(); 1335 CloseSessionResult close_session_result = 1336 DoCloseSession(static_cast<Error>(result), "Write error"); 1337 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1338 DCHECK_EQ(availability_state_, STATE_CLOSED); 1339 DCHECK_EQ(error_on_close_, result); 1340 return result; 1341 } 1342 1343 // It should not be possible to have written more bytes than our 1344 // in_flight_write_. 1345 DCHECK_LE(static_cast<size_t>(result), 1346 in_flight_write_->GetRemainingSize()); 1347 1348 if (result > 0) { 1349 in_flight_write_->Consume(static_cast<size_t>(result)); 1350 1351 // We only notify the stream when we've fully written the pending frame. 1352 if (in_flight_write_->GetRemainingSize() == 0) { 1353 // It is possible that the stream was cancelled while we were 1354 // writing to the socket. 1355 if (in_flight_write_stream_.get()) { 1356 DCHECK_GT(in_flight_write_frame_size_, 0u); 1357 in_flight_write_stream_->OnFrameWriteComplete( 1358 in_flight_write_frame_type_, 1359 in_flight_write_frame_size_); 1360 } 1361 1362 // Cleanup the write which just completed. 1363 in_flight_write_.reset(); 1364 in_flight_write_frame_type_ = DATA; 1365 in_flight_write_frame_size_ = 0; 1366 in_flight_write_stream_.reset(); 1367 } 1368 } 1369 1370 write_state_ = WRITE_STATE_DO_WRITE; 1371 return OK; 1372} 1373 1374void SpdySession::DcheckGoingAway() const { 1375 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1376 if (DCHECK_IS_ON()) { 1377 for (int i = 0; i < NUM_PRIORITIES; ++i) { 1378 DCHECK(pending_create_stream_queues_[i].empty()); 1379 } 1380 } 1381 DCHECK(pending_stream_request_completions_.empty()); 1382 DCHECK(created_streams_.empty()); 1383} 1384 1385void SpdySession::DcheckClosed() const { 1386 DcheckGoingAway(); 1387 DCHECK_EQ(availability_state_, STATE_CLOSED); 1388 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1389 DCHECK(active_streams_.empty()); 1390 DCHECK(unclaimed_pushed_streams_.empty()); 1391 DCHECK(write_queue_.IsEmpty()); 1392} 1393 1394void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1395 Error status) { 1396 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1397 // The loops below are carefully written to avoid reentrancy problems. 1398 1399 for (int i = 0; i < NUM_PRIORITIES; ++i) { 1400 PendingStreamRequestQueue queue; 1401 queue.swap(pending_create_stream_queues_[i]); 1402 for (PendingStreamRequestQueue::const_iterator it = queue.begin(); 1403 it != queue.end(); ++it) { 1404 CHECK(*it); 1405 (*it)->OnRequestCompleteFailure(ERR_ABORTED); 1406 } 1407 } 1408 1409 PendingStreamRequestCompletionSet pending_completions; 1410 pending_completions.swap(pending_stream_request_completions_); 1411 for (PendingStreamRequestCompletionSet::const_iterator it = 1412 pending_completions.begin(); 1413 it != pending_completions.end(); ++it) { 1414 (*it)->OnRequestCompleteFailure(ERR_ABORTED); 1415 } 1416 1417 while (true) { 1418 ActiveStreamMap::iterator it = 1419 active_streams_.lower_bound(last_good_stream_id + 1); 1420 if (it == active_streams_.end()) 1421 break; 1422 LogAbandonedActiveStream(it, status); 1423 CloseActiveStreamIterator(it, status); 1424 } 1425 1426 while (!created_streams_.empty()) { 1427 CreatedStreamSet::iterator it = created_streams_.begin(); 1428 LogAbandonedStream(*it, status); 1429 CloseCreatedStreamIterator(it, status); 1430 } 1431 1432 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1433 1434 DcheckGoingAway(); 1435} 1436 1437void SpdySession::MaybeFinishGoingAway() { 1438 DcheckGoingAway(); 1439 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) { 1440 CloseSessionResult result = 1441 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away"); 1442 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 1443 } 1444} 1445 1446SpdySession::CloseSessionResult SpdySession::DoCloseSession( 1447 Error err, 1448 const std::string& description) { 1449 DCHECK_LT(err, ERR_IO_PENDING); 1450 1451 if (availability_state_ == STATE_CLOSED) 1452 return SESSION_ALREADY_CLOSED; 1453 1454 net_log_.AddEvent( 1455 NetLog::TYPE_SPDY_SESSION_CLOSE, 1456 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1457 1458 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1459 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1460 total_bytes_received_, 1, 100000000, 50); 1461 1462 // |pool_| will be NULL when |InitializeWithSocket()| is in the 1463 // call stack. 1464 if (pool_ && availability_state_ != STATE_GOING_AWAY) 1465 pool_->MakeSessionUnavailable(GetWeakPtr()); 1466 1467 availability_state_ = STATE_CLOSED; 1468 error_on_close_ = err; 1469 1470 StartGoingAway(0, err); 1471 write_queue_.Clear(); 1472 1473 DcheckClosed(); 1474 1475 if (in_io_loop_) 1476 return SESSION_CLOSED_BUT_NOT_REMOVED; 1477 1478 RemoveFromPool(); 1479 return SESSION_CLOSED_AND_REMOVED; 1480} 1481 1482void SpdySession::RemoveFromPool() { 1483 DcheckClosed(); 1484 CHECK(pool_); 1485 1486 SpdySessionPool* pool = pool_; 1487 pool_ = NULL; 1488 pool->RemoveUnavailableSession(GetWeakPtr()); 1489} 1490 1491void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1492 DCHECK(stream); 1493 std::string description = base::StringPrintf( 1494 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1495 stream->url().spec(); 1496 stream->LogStreamError(status, description); 1497 // We don't increment the streams abandoned counter here. If the 1498 // stream isn't active (i.e., it hasn't written anything to the wire 1499 // yet) then it's as if it never existed. If it is active, then 1500 // LogAbandonedActiveStream() will increment the counters. 1501} 1502 1503void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it, 1504 Error status) { 1505 DCHECK_GT(it->first, 0u); 1506 LogAbandonedStream(it->second.stream, status); 1507 ++streams_abandoned_count_; 1508 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 1509 abandoned_streams.Increment(); 1510 if (it->second.stream->type() == SPDY_PUSH_STREAM && 1511 unclaimed_pushed_streams_.find(it->second.stream->url()) != 1512 unclaimed_pushed_streams_.end()) { 1513 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); 1514 abandoned_push_streams.Increment(); 1515 } 1516} 1517 1518int SpdySession::GetNewStreamId() { 1519 int id = stream_hi_water_mark_; 1520 stream_hi_water_mark_ += 2; 1521 if (stream_hi_water_mark_ > 0x7fff) 1522 stream_hi_water_mark_ = 1; 1523 return id; 1524} 1525 1526void SpdySession::CloseSessionOnError(Error err, 1527 const std::string& description) { 1528 // We may be called from anywhere, so we can't expect a particular 1529 // return value. 1530 ignore_result(DoCloseSession(err, description)); 1531} 1532 1533base::Value* SpdySession::GetInfoAsValue() const { 1534 base::DictionaryValue* dict = new base::DictionaryValue(); 1535 1536 dict->SetInteger("source_id", net_log_.source().id); 1537 1538 dict->SetString("host_port_pair", host_port_pair().ToString()); 1539 if (!pooled_aliases_.empty()) { 1540 base::ListValue* alias_list = new base::ListValue(); 1541 for (std::set<SpdySessionKey>::const_iterator it = 1542 pooled_aliases_.begin(); 1543 it != pooled_aliases_.end(); it++) { 1544 alias_list->Append(new base::StringValue( 1545 it->host_port_pair().ToString())); 1546 } 1547 dict->Set("aliases", alias_list); 1548 } 1549 dict->SetString("proxy", host_port_proxy_pair().second.ToURI()); 1550 1551 dict->SetInteger("active_streams", active_streams_.size()); 1552 1553 dict->SetInteger("unclaimed_pushed_streams", 1554 unclaimed_pushed_streams_.size()); 1555 1556 dict->SetBoolean("is_secure", is_secure_); 1557 1558 dict->SetString("protocol_negotiated", 1559 SSLClientSocket::NextProtoToString( 1560 connection_->socket()->GetNegotiatedProtocol())); 1561 1562 dict->SetInteger("error", error_on_close_); 1563 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 1564 1565 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 1566 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 1567 dict->SetInteger("streams_pushed_and_claimed_count", 1568 streams_pushed_and_claimed_count_); 1569 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 1570 DCHECK(buffered_spdy_framer_.get()); 1571 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received()); 1572 1573 dict->SetBoolean("sent_settings", sent_settings_); 1574 dict->SetBoolean("received_settings", received_settings_); 1575 1576 dict->SetInteger("send_window_size", session_send_window_size_); 1577 dict->SetInteger("recv_window_size", session_recv_window_size_); 1578 dict->SetInteger("unacked_recv_window_bytes", 1579 session_unacked_recv_window_bytes_); 1580 return dict; 1581} 1582 1583bool SpdySession::IsReused() const { 1584 return buffered_spdy_framer_->frames_received() > 0; 1585} 1586 1587bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id, 1588 LoadTimingInfo* load_timing_info) const { 1589 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId, 1590 load_timing_info); 1591} 1592 1593int SpdySession::GetPeerAddress(IPEndPoint* address) const { 1594 int rv = ERR_SOCKET_NOT_CONNECTED; 1595 if (connection_->socket()) { 1596 rv = connection_->socket()->GetPeerAddress(address); 1597 } 1598 1599 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress", 1600 rv == ERR_SOCKET_NOT_CONNECTED); 1601 1602 return rv; 1603} 1604 1605int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1606 int rv = ERR_SOCKET_NOT_CONNECTED; 1607 if (connection_->socket()) { 1608 rv = connection_->socket()->GetLocalAddress(address); 1609 } 1610 1611 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress", 1612 rv == ERR_SOCKET_NOT_CONNECTED); 1613 1614 return rv; 1615} 1616 1617void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1618 SpdyFrameType frame_type, 1619 scoped_ptr<SpdyFrame> frame) { 1620 DCHECK(frame_type == RST_STREAM || 1621 frame_type == SETTINGS || 1622 frame_type == WINDOW_UPDATE || 1623 frame_type == PING); 1624 EnqueueWrite( 1625 priority, frame_type, 1626 scoped_ptr<SpdyBufferProducer>( 1627 new SimpleBufferProducer( 1628 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1629 base::WeakPtr<SpdyStream>()); 1630} 1631 1632void SpdySession::EnqueueWrite(RequestPriority priority, 1633 SpdyFrameType frame_type, 1634 scoped_ptr<SpdyBufferProducer> producer, 1635 const base::WeakPtr<SpdyStream>& stream) { 1636 if (availability_state_ == STATE_CLOSED) 1637 return; 1638 1639 bool was_idle = write_queue_.IsEmpty(); 1640 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1641 if (write_state_ == WRITE_STATE_IDLE) { 1642 DCHECK(was_idle); 1643 DCHECK(!in_flight_write_); 1644 write_state_ = WRITE_STATE_DO_WRITE; 1645 base::MessageLoop::current()->PostTask( 1646 FROM_HERE, 1647 base::Bind(&SpdySession::PumpWriteLoop, 1648 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1649 } 1650} 1651 1652void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1653 DCHECK_EQ(stream->stream_id(), 0u); 1654 DCHECK(created_streams_.find(stream.get()) == created_streams_.end()); 1655 created_streams_.insert(stream.release()); 1656} 1657 1658scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { 1659 DCHECK_EQ(stream->stream_id(), 0u); 1660 DCHECK(created_streams_.find(stream) != created_streams_.end()); 1661 stream->set_stream_id(GetNewStreamId()); 1662 scoped_ptr<SpdyStream> owned_stream(stream); 1663 created_streams_.erase(stream); 1664 return owned_stream.Pass(); 1665} 1666 1667void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { 1668 SpdyStreamId stream_id = stream->stream_id(); 1669 DCHECK_NE(stream_id, 0u); 1670 std::pair<ActiveStreamMap::iterator, bool> result = 1671 active_streams_.insert( 1672 std::make_pair(stream_id, ActiveStreamInfo(stream.get()))); 1673 if (result.second) { 1674 ignore_result(stream.release()); 1675 } else { 1676 NOTREACHED(); 1677 } 1678} 1679 1680void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1681 if (in_flight_write_stream_.get() == stream.get()) { 1682 // If we're deleting the stream for the in-flight write, we still 1683 // need to let the write complete, so we clear 1684 // |in_flight_write_stream_| and let the write finish on its own 1685 // without notifying |in_flight_write_stream_|. 1686 in_flight_write_stream_.reset(); 1687 } 1688 1689 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1690 1691 stream->OnClose(status); 1692 1693 switch (availability_state_) { 1694 case STATE_AVAILABLE: 1695 ProcessPendingStreamRequests(); 1696 break; 1697 case STATE_GOING_AWAY: 1698 DcheckGoingAway(); 1699 MaybeFinishGoingAway(); 1700 break; 1701 case STATE_CLOSED: 1702 // Do nothing. 1703 break; 1704 } 1705} 1706 1707base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1708 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1709 1710 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1711 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1712 return base::WeakPtr<SpdyStream>(); 1713 1714 SpdyStreamId stream_id = unclaimed_it->second.stream_id; 1715 unclaimed_pushed_streams_.erase(unclaimed_it); 1716 1717 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 1718 if (active_it == active_streams_.end()) { 1719 NOTREACHED(); 1720 return base::WeakPtr<SpdyStream>(); 1721 } 1722 1723 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM); 1724 used_push_streams.Increment(); 1725 return active_it->second.stream->GetWeakPtr(); 1726} 1727 1728bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, 1729 bool* was_npn_negotiated, 1730 NextProto* protocol_negotiated) { 1731 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated(); 1732 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol(); 1733 return connection_->socket()->GetSSLInfo(ssl_info); 1734} 1735 1736bool SpdySession::GetSSLCertRequestInfo( 1737 SSLCertRequestInfo* cert_request_info) { 1738 if (!is_secure_) 1739 return false; 1740 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1741 return true; 1742} 1743 1744ServerBoundCertService* SpdySession::GetServerBoundCertService() const { 1745 if (!is_secure_) 1746 return NULL; 1747 return GetSSLClientSocket()->GetServerBoundCertService(); 1748} 1749 1750void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1751 CHECK(in_io_loop_); 1752 1753 if (availability_state_ == STATE_CLOSED) 1754 return; 1755 1756 RecordProtocolErrorHistogram( 1757 static_cast<SpdyProtocolErrorDetails>(error_code)); 1758 std::string description = base::StringPrintf( 1759 "SPDY_ERROR error_code: %d.", error_code); 1760 CloseSessionResult result = 1761 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description); 1762 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 1763} 1764 1765void SpdySession::OnStreamError(SpdyStreamId stream_id, 1766 const std::string& description) { 1767 CHECK(in_io_loop_); 1768 1769 if (availability_state_ == STATE_CLOSED) 1770 return; 1771 1772 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1773 if (it == active_streams_.end()) { 1774 // We still want to send a frame to reset the stream even if we 1775 // don't know anything about it. 1776 SendResetStreamFrame( 1777 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1778 return; 1779 } 1780 1781 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1782} 1783 1784void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1785 const char* data, 1786 size_t len, 1787 bool fin) { 1788 CHECK(in_io_loop_); 1789 1790 if (availability_state_ == STATE_CLOSED) 1791 return; 1792 1793 DCHECK_LT(len, 1u << 24); 1794 if (net_log().IsLoggingAllEvents()) { 1795 net_log().AddEvent( 1796 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1797 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1798 } 1799 1800 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1801 1802 // By the time data comes in, the stream may already be inactive. 1803 if (it == active_streams_.end()) 1804 return; 1805 1806 SpdyStream* stream = it->second.stream; 1807 CHECK_EQ(stream->stream_id(), stream_id); 1808 1809 if (it->second.waiting_for_syn_reply) { 1810 const std::string& error = "Data received before SYN_REPLY."; 1811 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 1812 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 1813 return; 1814 } 1815 1816 scoped_ptr<SpdyBuffer> buffer; 1817 if (data) { 1818 DCHECK_GT(len, 0u); 1819 buffer.reset(new SpdyBuffer(data, len)); 1820 1821 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1822 DecreaseRecvWindowSize(static_cast<int32>(len)); 1823 buffer->AddConsumeCallback( 1824 base::Bind(&SpdySession::OnReadBufferConsumed, 1825 weak_factory_.GetWeakPtr())); 1826 } 1827 } else { 1828 DCHECK_EQ(len, 0u); 1829 } 1830 stream->OnDataReceived(buffer.Pass()); 1831} 1832 1833void SpdySession::OnSettings(bool clear_persisted) { 1834 CHECK(in_io_loop_); 1835 1836 if (availability_state_ == STATE_CLOSED) 1837 return; 1838 1839 if (clear_persisted) 1840 http_server_properties_->ClearSpdySettings(host_port_pair()); 1841 1842 if (net_log_.IsLoggingAllEvents()) { 1843 net_log_.AddEvent( 1844 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1845 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 1846 clear_persisted)); 1847 } 1848} 1849 1850void SpdySession::OnSetting(SpdySettingsIds id, 1851 uint8 flags, 1852 uint32 value) { 1853 CHECK(in_io_loop_); 1854 1855 if (availability_state_ == STATE_CLOSED) 1856 return; 1857 1858 HandleSetting(id, value); 1859 http_server_properties_->SetSpdySetting( 1860 host_port_pair(), 1861 id, 1862 static_cast<SpdySettingsFlags>(flags), 1863 value); 1864 received_settings_ = true; 1865 1866 // Log the setting. 1867 net_log_.AddEvent( 1868 NetLog::TYPE_SPDY_SESSION_RECV_SETTING, 1869 base::Bind(&NetLogSpdySettingCallback, 1870 id, static_cast<SpdySettingsFlags>(flags), value)); 1871} 1872 1873void SpdySession::OnSendCompressedFrame( 1874 SpdyStreamId stream_id, 1875 SpdyFrameType type, 1876 size_t payload_len, 1877 size_t frame_len) { 1878 if (type != SYN_STREAM) 1879 return; 1880 1881 DCHECK(buffered_spdy_framer_.get()); 1882 size_t compressed_len = 1883 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize(); 1884 1885 if (payload_len) { 1886 // Make sure we avoid early decimal truncation. 1887 int compression_pct = 100 - (100 * compressed_len) / payload_len; 1888 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", 1889 compression_pct); 1890 } 1891} 1892 1893int SpdySession::OnInitialResponseHeadersReceived( 1894 const SpdyHeaderBlock& response_headers, 1895 base::Time response_time, 1896 base::TimeTicks recv_first_byte_time, 1897 SpdyStream* stream) { 1898 CHECK(in_io_loop_); 1899 SpdyStreamId stream_id = stream->stream_id(); 1900 // May invalidate |stream|. 1901 int rv = stream->OnInitialResponseHeadersReceived( 1902 response_headers, response_time, recv_first_byte_time); 1903 if (rv < 0) { 1904 DCHECK_NE(rv, ERR_IO_PENDING); 1905 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 1906 } 1907 return rv; 1908} 1909 1910void SpdySession::OnSynStream(SpdyStreamId stream_id, 1911 SpdyStreamId associated_stream_id, 1912 SpdyPriority priority, 1913 uint8 credential_slot, 1914 bool fin, 1915 bool unidirectional, 1916 const SpdyHeaderBlock& headers) { 1917 CHECK(in_io_loop_); 1918 1919 if (availability_state_ == STATE_CLOSED) 1920 return; 1921 1922 base::Time response_time = base::Time::Now(); 1923 base::TimeTicks recv_first_byte_time = time_func_(); 1924 1925 if (net_log_.IsLoggingAllEvents()) { 1926 net_log_.AddEvent( 1927 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1928 base::Bind(&NetLogSpdySynCallback, 1929 &headers, fin, unidirectional, 1930 stream_id, associated_stream_id)); 1931 } 1932 1933 // Server-initiated streams should have even sequence numbers. 1934 if ((stream_id & 0x1) != 0) { 1935 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; 1936 return; 1937 } 1938 1939 if (IsStreamActive(stream_id)) { 1940 LOG(WARNING) << "Received OnSyn for active stream " << stream_id; 1941 return; 1942 } 1943 1944 RequestPriority request_priority = 1945 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion()); 1946 1947 if (availability_state_ == STATE_GOING_AWAY) { 1948 // TODO(akalin): This behavior isn't in the SPDY spec, although it 1949 // probably should be. 1950 SendResetStreamFrame(stream_id, request_priority, 1951 RST_STREAM_REFUSED_STREAM, 1952 "OnSyn received when going away"); 1953 return; 1954 } 1955 1956 if (associated_stream_id == 0) { 1957 std::string description = base::StringPrintf( 1958 "Received invalid OnSyn associated stream id %d for stream %d", 1959 associated_stream_id, stream_id); 1960 SendResetStreamFrame(stream_id, request_priority, 1961 RST_STREAM_REFUSED_STREAM, description); 1962 return; 1963 } 1964 1965 streams_pushed_count_++; 1966 1967 // TODO(mbelshe): DCHECK that this is a GET method? 1968 1969 // Verify that the response had a URL for us. 1970 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true); 1971 if (!gurl.is_valid()) { 1972 SendResetStreamFrame(stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 1973 "Pushed stream url was invalid: " + gurl.spec()); 1974 return; 1975 } 1976 1977 // Verify we have a valid stream association. 1978 ActiveStreamMap::iterator associated_it = 1979 active_streams_.find(associated_stream_id); 1980 if (associated_it == active_streams_.end()) { 1981 SendResetStreamFrame( 1982 stream_id, request_priority, RST_STREAM_INVALID_STREAM, 1983 base::StringPrintf( 1984 "Received OnSyn with inactive associated stream %d", 1985 associated_stream_id)); 1986 return; 1987 } 1988 1989 // Check that the SYN advertises the same origin as its associated stream. 1990 // Bypass this check if and only if this session is with a SPDY proxy that 1991 // is trusted explicitly via the --trusted-spdy-proxy switch. 1992 if (trusted_spdy_proxy_.Equals(host_port_pair())) { 1993 // Disallow pushing of HTTPS content. 1994 if (gurl.SchemeIs("https")) { 1995 SendResetStreamFrame( 1996 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 1997 base::StringPrintf( 1998 "Rejected push of Cross Origin HTTPS content %d", 1999 associated_stream_id)); 2000 } 2001 } else { 2002 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders()); 2003 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 2004 SendResetStreamFrame( 2005 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 2006 base::StringPrintf( 2007 "Rejected Cross Origin Push Stream %d", 2008 associated_stream_id)); 2009 return; 2010 } 2011 } 2012 2013 // There should not be an existing pushed stream with the same path. 2014 PushedStreamMap::iterator pushed_it = 2015 unclaimed_pushed_streams_.lower_bound(gurl); 2016 if (pushed_it != unclaimed_pushed_streams_.end() && 2017 pushed_it->first == gurl) { 2018 SendResetStreamFrame(stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 2019 "Received duplicate pushed stream with url: " + 2020 gurl.spec()); 2021 return; 2022 } 2023 2024 scoped_ptr<SpdyStream> stream( 2025 new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl, 2026 request_priority, 2027 stream_initial_send_window_size_, 2028 stream_initial_recv_window_size_, 2029 net_log_)); 2030 stream->set_stream_id(stream_id); 2031 2032 DeleteExpiredPushedStreams(); 2033 PushedStreamMap::iterator inserted_pushed_it = 2034 unclaimed_pushed_streams_.insert( 2035 pushed_it, 2036 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_()))); 2037 DCHECK(inserted_pushed_it != pushed_it); 2038 2039 InsertActivatedStream(stream.Pass()); 2040 2041 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2042 if (active_it == active_streams_.end()) { 2043 NOTREACHED(); 2044 return; 2045 } 2046 2047 // Parse the headers. 2048 if (OnInitialResponseHeadersReceived( 2049 headers, response_time, 2050 recv_first_byte_time, active_it->second.stream) != OK) 2051 return; 2052 2053 base::StatsCounter push_requests("spdy.pushed_streams"); 2054 push_requests.Increment(); 2055} 2056 2057void SpdySession::DeleteExpiredPushedStreams() { 2058 if (unclaimed_pushed_streams_.empty()) 2059 return; 2060 2061 // Check that adequate time has elapsed since the last sweep. 2062 if (time_func_() < next_unclaimed_push_stream_sweep_time_) 2063 return; 2064 2065 // Gather old streams to delete. 2066 base::TimeTicks minimum_freshness = time_func_() - 2067 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2068 std::vector<SpdyStreamId> streams_to_close; 2069 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); 2070 it != unclaimed_pushed_streams_.end(); ++it) { 2071 if (minimum_freshness > it->second.creation_time) 2072 streams_to_close.push_back(it->second.stream_id); 2073 } 2074 2075 for (std::vector<SpdyStreamId>::const_iterator to_close_it = 2076 streams_to_close.begin(); 2077 to_close_it != streams_to_close.end(); ++to_close_it) { 2078 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it); 2079 if (active_it == active_streams_.end()) 2080 continue; 2081 2082 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM); 2083 // CloseActiveStreamIterator() will remove the stream from 2084 // |unclaimed_pushed_streams_|. 2085 CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM); 2086 } 2087 2088 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2089 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2090} 2091 2092void SpdySession::OnSynReply(SpdyStreamId stream_id, 2093 bool fin, 2094 const SpdyHeaderBlock& headers) { 2095 CHECK(in_io_loop_); 2096 2097 if (availability_state_ == STATE_CLOSED) 2098 return; 2099 2100 base::Time response_time = base::Time::Now(); 2101 base::TimeTicks recv_first_byte_time = time_func_(); 2102 2103 if (net_log().IsLoggingAllEvents()) { 2104 net_log().AddEvent( 2105 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2106 base::Bind(&NetLogSpdySynCallback, 2107 &headers, fin, false, // not unidirectional 2108 stream_id, 0)); 2109 } 2110 2111 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2112 if (it == active_streams_.end()) { 2113 // NOTE: it may just be that the stream was cancelled. 2114 return; 2115 } 2116 2117 SpdyStream* stream = it->second.stream; 2118 CHECK_EQ(stream->stream_id(), stream_id); 2119 2120 if (!it->second.waiting_for_syn_reply) { 2121 const std::string& error = 2122 "Received duplicate SYN_REPLY for stream."; 2123 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2124 ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error); 2125 return; 2126 } 2127 it->second.waiting_for_syn_reply = false; 2128 2129 ignore_result(OnInitialResponseHeadersReceived( 2130 headers, response_time, recv_first_byte_time, stream)); 2131} 2132 2133void SpdySession::OnHeaders(SpdyStreamId stream_id, 2134 bool fin, 2135 const SpdyHeaderBlock& headers) { 2136 CHECK(in_io_loop_); 2137 2138 if (availability_state_ == STATE_CLOSED) 2139 return; 2140 2141 if (net_log().IsLoggingAllEvents()) { 2142 net_log().AddEvent( 2143 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2144 base::Bind(&NetLogSpdySynCallback, 2145 &headers, fin, /*unidirectional=*/false, 2146 stream_id, 0)); 2147 } 2148 2149 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2150 if (it == active_streams_.end()) { 2151 // NOTE: it may just be that the stream was cancelled. 2152 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 2153 return; 2154 } 2155 2156 SpdyStream* stream = it->second.stream; 2157 CHECK_EQ(stream->stream_id(), stream_id); 2158 2159 int rv = stream->OnAdditionalResponseHeadersReceived(headers); 2160 if (rv < 0) { 2161 DCHECK_NE(rv, ERR_IO_PENDING); 2162 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2163 } 2164} 2165 2166void SpdySession::OnRstStream(SpdyStreamId stream_id, 2167 SpdyRstStreamStatus status) { 2168 CHECK(in_io_loop_); 2169 2170 if (availability_state_ == STATE_CLOSED) 2171 return; 2172 2173 std::string description; 2174 net_log().AddEvent( 2175 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2176 base::Bind(&NetLogSpdyRstCallback, 2177 stream_id, status, &description)); 2178 2179 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2180 if (it == active_streams_.end()) { 2181 // NOTE: it may just be that the stream was cancelled. 2182 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2183 return; 2184 } 2185 2186 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2187 2188 if (status == 0) { 2189 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 2190 } else if (status == RST_STREAM_REFUSED_STREAM) { 2191 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM); 2192 } else { 2193 RecordProtocolErrorHistogram( 2194 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 2195 it->second.stream->LogStreamError( 2196 ERR_SPDY_PROTOCOL_ERROR, 2197 base::StringPrintf("SPDY stream closed with status: %d", status)); 2198 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2199 // For now, it doesn't matter much - it is a protocol error. 2200 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2201 } 2202} 2203 2204void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2205 SpdyGoAwayStatus status) { 2206 CHECK(in_io_loop_); 2207 2208 if (availability_state_ == STATE_CLOSED) 2209 return; 2210 2211 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2212 base::Bind(&NetLogSpdyGoAwayCallback, 2213 last_accepted_stream_id, 2214 active_streams_.size(), 2215 unclaimed_pushed_streams_.size(), 2216 status)); 2217 if (availability_state_ < STATE_GOING_AWAY) { 2218 availability_state_ = STATE_GOING_AWAY; 2219 // |pool_| will be NULL when |InitializeWithSocket()| is in the 2220 // call stack. 2221 if (pool_) 2222 pool_->MakeSessionUnavailable(GetWeakPtr()); 2223 } 2224 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2225 // This is to handle the case when we already don't have any active 2226 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2227 // active streams and so the last one being closed will finish the 2228 // going away process (see DeleteStream()). 2229 MaybeFinishGoingAway(); 2230} 2231 2232void SpdySession::OnPing(uint32 unique_id) { 2233 CHECK(in_io_loop_); 2234 2235 if (availability_state_ == STATE_CLOSED) 2236 return; 2237 2238 net_log_.AddEvent( 2239 NetLog::TYPE_SPDY_SESSION_PING, 2240 base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); 2241 2242 // Send response to a PING from server. 2243 if (unique_id % 2 == 0) { 2244 WritePingFrame(unique_id); 2245 return; 2246 } 2247 2248 --pings_in_flight_; 2249 if (pings_in_flight_ < 0) { 2250 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2251 CloseSessionResult result = 2252 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2253 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2254 pings_in_flight_ = 0; 2255 return; 2256 } 2257 2258 if (pings_in_flight_ > 0) 2259 return; 2260 2261 // We will record RTT in histogram when there are no more client sent 2262 // pings_in_flight_. 2263 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2264} 2265 2266void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2267 uint32 delta_window_size) { 2268 CHECK(in_io_loop_); 2269 2270 if (availability_state_ == STATE_CLOSED) 2271 return; 2272 2273 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2274 net_log_.AddEvent( 2275 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2276 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2277 stream_id, delta_window_size)); 2278 2279 if (stream_id == kSessionFlowControlStreamId) { 2280 // WINDOW_UPDATE for the session. 2281 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2282 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2283 << "session flow control is not turned on"; 2284 // TODO(akalin): Record an error and close the session. 2285 return; 2286 } 2287 2288 if (delta_window_size < 1u) { 2289 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2290 CloseSessionResult result = DoCloseSession( 2291 ERR_SPDY_PROTOCOL_ERROR, 2292 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2293 base::UintToString(delta_window_size)); 2294 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2295 return; 2296 } 2297 2298 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2299 } else { 2300 // WINDOW_UPDATE for a stream. 2301 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2302 // TODO(akalin): Record an error and close the session. 2303 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2304 << " when flow control is not turned on"; 2305 return; 2306 } 2307 2308 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2309 2310 if (it == active_streams_.end()) { 2311 // NOTE: it may just be that the stream was cancelled. 2312 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 2313 return; 2314 } 2315 2316 SpdyStream* stream = it->second.stream; 2317 CHECK_EQ(stream->stream_id(), stream_id); 2318 2319 if (delta_window_size < 1u) { 2320 ResetStreamIterator(it, 2321 RST_STREAM_FLOW_CONTROL_ERROR, 2322 base::StringPrintf( 2323 "Received WINDOW_UPDATE with an invalid " 2324 "delta_window_size %ud", delta_window_size)); 2325 return; 2326 } 2327 2328 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2329 it->second.stream->IncreaseSendWindowSize( 2330 static_cast<int32>(delta_window_size)); 2331 } 2332} 2333 2334void SpdySession::OnPushPromise(SpdyStreamId stream_id, 2335 SpdyStreamId promised_stream_id) { 2336 // TODO(akalin): Handle PUSH_PROMISE frames. 2337} 2338 2339void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, 2340 uint32 delta_window_size) { 2341 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2342 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2343 CHECK(it != active_streams_.end()); 2344 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2345 SendWindowUpdateFrame( 2346 stream_id, delta_window_size, it->second.stream->priority()); 2347} 2348 2349void SpdySession::SendInitialSettings() { 2350 DCHECK_NE(availability_state_, STATE_CLOSED); 2351 2352 // First, notify the server about the settings they should use when 2353 // communicating with us. 2354 if (GetProtocolVersion() >= 2 && enable_sending_initial_settings_) { 2355 SettingsMap settings_map; 2356 // Create a new settings frame notifying the sever of our 2357 // max_concurrent_streams_ and initial window size. 2358 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] = 2359 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams); 2360 if (GetProtocolVersion() > 2 && 2361 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) { 2362 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = 2363 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2364 stream_initial_recv_window_size_); 2365 } 2366 SendSettings(settings_map); 2367 } 2368 2369 // Next, notify the server about our initial recv window size. 2370 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && 2371 enable_sending_initial_settings_) { 2372 // Bump up the receive window size to the real initial value. This 2373 // has to go here since the WINDOW_UPDATE frame sent by 2374 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|. 2375 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_); 2376 // This condition implies that |kDefaultInitialRecvWindowSize| - 2377 // |session_recv_window_size_| doesn't overflow. 2378 DCHECK_GT(session_recv_window_size_, 0); 2379 IncreaseRecvWindowSize( 2380 kDefaultInitialRecvWindowSize - session_recv_window_size_); 2381 } 2382 2383 // Finally, notify the server about the settings they have previously 2384 // told us to use when communicating with them. 2385 const SettingsMap& settings_map = 2386 http_server_properties_->GetSpdySettings(host_port_pair()); 2387 if (settings_map.empty()) 2388 return; 2389 2390 const SpdySettingsIds id = SETTINGS_CURRENT_CWND; 2391 SettingsMap::const_iterator it = settings_map.find(id); 2392 uint32 value = 0; 2393 if (it != settings_map.end()) 2394 value = it->second.second; 2395 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", value, 1, 200, 100); 2396 2397 const SettingsMap& settings_map_new = 2398 http_server_properties_->GetSpdySettings(host_port_pair()); 2399 for (SettingsMap::const_iterator i = settings_map_new.begin(), 2400 end = settings_map_new.end(); i != end; ++i) { 2401 const SpdySettingsIds new_id = i->first; 2402 const uint32 new_val = i->second.second; 2403 HandleSetting(new_id, new_val); 2404 } 2405 2406 SendSettings(settings_map_new); 2407} 2408 2409 2410void SpdySession::SendSettings(const SettingsMap& settings) { 2411 DCHECK_NE(availability_state_, STATE_CLOSED); 2412 2413 net_log_.AddEvent( 2414 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2415 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2416 2417 // Create the SETTINGS frame and send it. 2418 DCHECK(buffered_spdy_framer_.get()); 2419 scoped_ptr<SpdyFrame> settings_frame( 2420 buffered_spdy_framer_->CreateSettings(settings)); 2421 sent_settings_ = true; 2422 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2423} 2424 2425void SpdySession::HandleSetting(uint32 id, uint32 value) { 2426 switch (id) { 2427 case SETTINGS_MAX_CONCURRENT_STREAMS: 2428 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 2429 kMaxConcurrentStreamLimit); 2430 ProcessPendingStreamRequests(); 2431 break; 2432 case SETTINGS_INITIAL_WINDOW_SIZE: { 2433 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2434 net_log().AddEvent( 2435 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL); 2436 return; 2437 } 2438 2439 if (value > static_cast<uint32>(kint32max)) { 2440 net_log().AddEvent( 2441 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE, 2442 NetLog::IntegerCallback("initial_window_size", value)); 2443 return; 2444 } 2445 2446 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only. 2447 int32 delta_window_size = 2448 static_cast<int32>(value) - stream_initial_send_window_size_; 2449 stream_initial_send_window_size_ = static_cast<int32>(value); 2450 UpdateStreamsSendWindowSize(delta_window_size); 2451 net_log().AddEvent( 2452 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE, 2453 NetLog::IntegerCallback("delta_window_size", delta_window_size)); 2454 break; 2455 } 2456 } 2457} 2458 2459void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 2460 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2461 for (ActiveStreamMap::iterator it = active_streams_.begin(); 2462 it != active_streams_.end(); ++it) { 2463 it->second.stream->AdjustSendWindowSize(delta_window_size); 2464 } 2465 2466 for (CreatedStreamSet::const_iterator it = created_streams_.begin(); 2467 it != created_streams_.end(); it++) { 2468 (*it)->AdjustSendWindowSize(delta_window_size); 2469 } 2470} 2471 2472void SpdySession::SendPrefacePingIfNoneInFlight() { 2473 if (pings_in_flight_ || !enable_ping_based_connection_checking_) 2474 return; 2475 2476 base::TimeTicks now = time_func_(); 2477 // If there is no activity in the session, then send a preface-PING. 2478 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 2479 SendPrefacePing(); 2480} 2481 2482void SpdySession::SendPrefacePing() { 2483 WritePingFrame(next_ping_id_); 2484} 2485 2486void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, 2487 uint32 delta_window_size, 2488 RequestPriority priority) { 2489 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2490 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2491 if (it != active_streams_.end()) { 2492 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2493 } else { 2494 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2495 CHECK_EQ(stream_id, kSessionFlowControlStreamId); 2496 } 2497 2498 net_log_.AddEvent( 2499 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2500 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2501 stream_id, delta_window_size)); 2502 2503 DCHECK(buffered_spdy_framer_.get()); 2504 scoped_ptr<SpdyFrame> window_update_frame( 2505 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2506 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass()); 2507} 2508 2509void SpdySession::WritePingFrame(uint32 unique_id) { 2510 DCHECK(buffered_spdy_framer_.get()); 2511 scoped_ptr<SpdyFrame> ping_frame( 2512 buffered_spdy_framer_->CreatePingFrame(unique_id)); 2513 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass()); 2514 2515 if (net_log().IsLoggingAllEvents()) { 2516 net_log().AddEvent( 2517 NetLog::TYPE_SPDY_SESSION_PING, 2518 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); 2519 } 2520 if (unique_id % 2 != 0) { 2521 next_ping_id_ += 2; 2522 ++pings_in_flight_; 2523 PlanToCheckPingStatus(); 2524 last_ping_sent_time_ = time_func_(); 2525 } 2526} 2527 2528void SpdySession::PlanToCheckPingStatus() { 2529 if (check_ping_status_pending_) 2530 return; 2531 2532 check_ping_status_pending_ = true; 2533 base::MessageLoop::current()->PostDelayedTask( 2534 FROM_HERE, 2535 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2536 time_func_()), hung_interval_); 2537} 2538 2539void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2540 CHECK(!in_io_loop_); 2541 DCHECK_NE(availability_state_, STATE_CLOSED); 2542 2543 // Check if we got a response back for all PINGs we had sent. 2544 if (pings_in_flight_ == 0) { 2545 check_ping_status_pending_ = false; 2546 return; 2547 } 2548 2549 DCHECK(check_ping_status_pending_); 2550 2551 base::TimeTicks now = time_func_(); 2552 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2553 2554 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2555 // Track all failed PING messages in a separate bucket. 2556 const base::TimeDelta kFailedPing = 2557 base::TimeDelta::FromInternalValue(INT_MAX); 2558 RecordPingRTTHistogram(kFailedPing); 2559 CloseSessionResult result = 2560 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2561 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 2562 return; 2563 } 2564 2565 // Check the status of connection after a delay. 2566 base::MessageLoop::current()->PostDelayedTask( 2567 FROM_HERE, 2568 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2569 now), 2570 delay); 2571} 2572 2573void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) { 2574 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration); 2575} 2576 2577void SpdySession::RecordProtocolErrorHistogram( 2578 SpdyProtocolErrorDetails details) { 2579 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details, 2580 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2581 if (EndsWith(host_port_pair().host(), "google.com", false)) { 2582 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details, 2583 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2584 } 2585} 2586 2587void SpdySession::RecordHistograms() { 2588 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 2589 streams_initiated_count_, 2590 0, 300, 50); 2591 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 2592 streams_pushed_count_, 2593 0, 300, 50); 2594 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 2595 streams_pushed_and_claimed_count_, 2596 0, 300, 50); 2597 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 2598 streams_abandoned_count_, 2599 0, 300, 50); 2600 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 2601 sent_settings_ ? 1 : 0, 2); 2602 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 2603 received_settings_ ? 1 : 0, 2); 2604 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 2605 stalled_streams_, 2606 0, 300, 50); 2607 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 2608 stalled_streams_ > 0 ? 1 : 0, 2); 2609 2610 if (received_settings_) { 2611 // Enumerate the saved settings, and set histograms for it. 2612 const SettingsMap& settings_map = 2613 http_server_properties_->GetSpdySettings(host_port_pair()); 2614 2615 SettingsMap::const_iterator it; 2616 for (it = settings_map.begin(); it != settings_map.end(); ++it) { 2617 const SpdySettingsIds id = it->first; 2618 const uint32 val = it->second.second; 2619 switch (id) { 2620 case SETTINGS_CURRENT_CWND: 2621 // Record several different histograms to see if cwnd converges 2622 // for larger volumes of data being sent. 2623 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 2624 val, 1, 200, 100); 2625 if (total_bytes_received_ > 10 * 1024) { 2626 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 2627 val, 1, 200, 100); 2628 if (total_bytes_received_ > 25 * 1024) { 2629 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 2630 val, 1, 200, 100); 2631 if (total_bytes_received_ > 50 * 1024) { 2632 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 2633 val, 1, 200, 100); 2634 if (total_bytes_received_ > 100 * 1024) { 2635 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 2636 val, 1, 200, 100); 2637 } 2638 } 2639 } 2640 } 2641 break; 2642 case SETTINGS_ROUND_TRIP_TIME: 2643 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 2644 val, 1, 1200, 100); 2645 break; 2646 case SETTINGS_DOWNLOAD_RETRANS_RATE: 2647 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 2648 val, 1, 100, 50); 2649 break; 2650 default: 2651 break; 2652 } 2653 } 2654 } 2655} 2656 2657void SpdySession::CompleteStreamRequest(SpdyStreamRequest* pending_request) { 2658 CHECK(pending_request); 2659 2660 PendingStreamRequestCompletionSet::iterator it = 2661 pending_stream_request_completions_.find(pending_request); 2662 2663 // Abort if the request has already been cancelled. 2664 if (it == pending_stream_request_completions_.end()) 2665 return; 2666 2667 base::WeakPtr<SpdyStream> stream; 2668 int rv = CreateStream(*pending_request, &stream); 2669 pending_stream_request_completions_.erase(it); 2670 2671 if (rv == OK) { 2672 DCHECK(stream.get()); 2673 pending_request->OnRequestCompleteSuccess(&stream); 2674 } else { 2675 DCHECK(!stream.get()); 2676 pending_request->OnRequestCompleteFailure(rv); 2677 } 2678} 2679 2680SSLClientSocket* SpdySession::GetSSLClientSocket() const { 2681 if (!is_secure_) 2682 return NULL; 2683 SSLClientSocket* ssl_socket = 2684 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 2685 DCHECK(ssl_socket); 2686 return ssl_socket; 2687} 2688 2689void SpdySession::OnWriteBufferConsumed( 2690 size_t frame_payload_size, 2691 size_t consume_size, 2692 SpdyBuffer::ConsumeSource consume_source) { 2693 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2694 // deleted (e.g., a stream is closed due to incoming data). 2695 2696 if (availability_state_ == STATE_CLOSED) 2697 return; 2698 2699 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2700 2701 if (consume_source == SpdyBuffer::DISCARD) { 2702 // If we're discarding a frame or part of it, increase the send 2703 // window by the number of discarded bytes. (Although if we're 2704 // discarding part of a frame, it's probably because of a write 2705 // error and we'll be tearing down the session soon.) 2706 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2707 DCHECK_GT(remaining_payload_bytes, 0u); 2708 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2709 } 2710 // For consumed bytes, the send window is increased when we receive 2711 // a WINDOW_UPDATE frame. 2712} 2713 2714void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2715 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2716 // deleted (e.g., a stream is closed due to incoming data). 2717 2718 DCHECK_NE(availability_state_, STATE_CLOSED); 2719 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2720 DCHECK_GE(delta_window_size, 1); 2721 2722 // Check for overflow. 2723 int32 max_delta_window_size = kint32max - session_send_window_size_; 2724 if (delta_window_size > max_delta_window_size) { 2725 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2726 CloseSessionResult result = DoCloseSession( 2727 ERR_SPDY_PROTOCOL_ERROR, 2728 "Received WINDOW_UPDATE [delta: " + 2729 base::IntToString(delta_window_size) + 2730 "] for session overflows session_send_window_size_ [current: " + 2731 base::IntToString(session_send_window_size_) + "]"); 2732 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 2733 return; 2734 } 2735 2736 session_send_window_size_ += delta_window_size; 2737 2738 net_log_.AddEvent( 2739 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2740 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2741 delta_window_size, session_send_window_size_)); 2742 2743 DCHECK(!IsSendStalled()); 2744 ResumeSendStalledStreams(); 2745} 2746 2747void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 2748 DCHECK_NE(availability_state_, STATE_CLOSED); 2749 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2750 2751 // We only call this method when sending a frame. Therefore, 2752 // |delta_window_size| should be within the valid frame size range. 2753 DCHECK_GE(delta_window_size, 1); 2754 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 2755 2756 // |send_window_size_| should have been at least |delta_window_size| for 2757 // this call to happen. 2758 DCHECK_GE(session_send_window_size_, delta_window_size); 2759 2760 session_send_window_size_ -= delta_window_size; 2761 2762 net_log_.AddEvent( 2763 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2764 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2765 -delta_window_size, session_send_window_size_)); 2766} 2767 2768void SpdySession::OnReadBufferConsumed( 2769 size_t consume_size, 2770 SpdyBuffer::ConsumeSource consume_source) { 2771 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 2772 // deleted (e.g., discarded by a SpdyReadQueue). 2773 2774 if (availability_state_ == STATE_CLOSED) 2775 return; 2776 2777 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2778 DCHECK_GE(consume_size, 1u); 2779 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 2780 2781 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 2782} 2783 2784void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 2785 DCHECK_NE(availability_state_, STATE_CLOSED); 2786 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2787 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 2788 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 2789 DCHECK_GE(delta_window_size, 1); 2790 // Check for overflow. 2791 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 2792 2793 session_recv_window_size_ += delta_window_size; 2794 net_log_.AddEvent( 2795 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 2796 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2797 delta_window_size, session_recv_window_size_)); 2798 2799 session_unacked_recv_window_bytes_ += delta_window_size; 2800 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { 2801 SendWindowUpdateFrame(kSessionFlowControlStreamId, 2802 session_unacked_recv_window_bytes_, 2803 HIGHEST); 2804 session_unacked_recv_window_bytes_ = 0; 2805 } 2806} 2807 2808void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 2809 CHECK(in_io_loop_); 2810 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2811 DCHECK_GE(delta_window_size, 1); 2812 2813 // Since we never decrease the initial receive window size, 2814 // |delta_window_size| should never cause |recv_window_size_| to go 2815 // negative. If we do, the receive window isn't being respected. 2816 if (delta_window_size > session_recv_window_size_) { 2817 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 2818 CloseSessionResult result = DoCloseSession( 2819 ERR_SPDY_PROTOCOL_ERROR, 2820 "delta_window_size is " + base::IntToString(delta_window_size) + 2821 " in DecreaseRecvWindowSize, which is larger than the receive " + 2822 "window size of " + base::IntToString(session_recv_window_size_)); 2823 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2824 return; 2825 } 2826 2827 session_recv_window_size_ -= delta_window_size; 2828 net_log_.AddEvent( 2829 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2830 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2831 -delta_window_size, session_recv_window_size_)); 2832} 2833 2834void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 2835 DCHECK(stream.send_stalled_by_flow_control()); 2836 stream_send_unstall_queue_[stream.priority()].push_back(stream.stream_id()); 2837} 2838 2839namespace { 2840 2841// Helper function to return the total size of an array of objects 2842// with .size() member functions. 2843template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { 2844 size_t total_size = 0; 2845 for (size_t i = 0; i < N; ++i) { 2846 total_size += arr[i].size(); 2847 } 2848 return total_size; 2849} 2850 2851} // namespace 2852 2853void SpdySession::ResumeSendStalledStreams() { 2854 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2855 2856 // We don't have to worry about new streams being queued, since 2857 // doing so would cause IsSendStalled() to return true. But we do 2858 // have to worry about streams being closed, as well as ourselves 2859 // being closed. 2860 2861 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { 2862 size_t old_size = 0; 2863 if (DCHECK_IS_ON()) 2864 old_size = GetTotalSize(stream_send_unstall_queue_); 2865 2866 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 2867 if (stream_id == 0) 2868 break; 2869 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2870 // The stream may actually still be send-stalled after this (due 2871 // to its own send window) but that's okay -- it'll then be 2872 // resumed once its send window increases. 2873 if (it != active_streams_.end()) 2874 it->second.stream->PossiblyResumeIfSendStalled(); 2875 2876 // The size should decrease unless we got send-stalled again. 2877 if (!IsSendStalled()) 2878 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); 2879 } 2880} 2881 2882SpdyStreamId SpdySession::PopStreamToPossiblyResume() { 2883 for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { 2884 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; 2885 if (!queue->empty()) { 2886 SpdyStreamId stream_id = queue->front(); 2887 queue->pop_front(); 2888 return stream_id; 2889 } 2890 } 2891 return 0; 2892} 2893 2894} // namespace net 2895