spdy_session.cc revision ca12bfac764ba476d6cd062bf1dde12cc64c3f40
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_session.h"
6
7#include <algorithm>
8#include <map>
9
10#include "base/basictypes.h"
11#include "base/bind.h"
12#include "base/compiler_specific.h"
13#include "base/logging.h"
14#include "base/message_loop/message_loop.h"
15#include "base/metrics/field_trial.h"
16#include "base/metrics/histogram.h"
17#include "base/metrics/sparse_histogram.h"
18#include "base/metrics/stats_counters.h"
19#include "base/stl_util.h"
20#include "base/strings/string_number_conversions.h"
21#include "base/strings/string_util.h"
22#include "base/strings/stringprintf.h"
23#include "base/strings/utf_string_conversions.h"
24#include "base/time/time.h"
25#include "base/values.h"
26#include "crypto/ec_private_key.h"
27#include "crypto/ec_signature_creator.h"
28#include "net/base/connection_type_histograms.h"
29#include "net/base/net_log.h"
30#include "net/base/net_util.h"
31#include "net/cert/asn1_util.h"
32#include "net/http/http_network_session.h"
33#include "net/http/http_server_properties.h"
34#include "net/spdy/spdy_buffer_producer.h"
35#include "net/spdy/spdy_credential_builder.h"
36#include "net/spdy/spdy_frame_builder.h"
37#include "net/spdy/spdy_http_utils.h"
38#include "net/spdy/spdy_protocol.h"
39#include "net/spdy/spdy_session_pool.h"
40#include "net/spdy/spdy_stream.h"
41#include "net/ssl/server_bound_cert_service.h"
42
43namespace net {
44
45namespace {
46
47const int kReadBufferSize = 8 * 1024;
48const int kDefaultConnectionAtRiskOfLossSeconds = 10;
49const int kHungIntervalSeconds = 10;
50
51// Always start at 1 for the first stream id.
52const SpdyStreamId kFirstStreamId = 1;
53
54// Minimum seconds that unclaimed pushed streams will be kept in memory.
55const int kMinPushedStreamLifetimeSeconds = 300;
56
57SpdyMajorVersion NPNToSpdyVersion(NextProto next_proto) {
58  switch (next_proto) {
59    case kProtoSPDY2:
60    case kProtoSPDY21:
61      return SPDY2;
62    case kProtoSPDY3:
63    case kProtoSPDY31:
64      return SPDY3;
65    case kProtoSPDY4a2:
66      return SPDY4;
67    default:
68      NOTREACHED();
69  }
70  return SPDY2;
71}
72
73base::Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers,
74                                   bool fin,
75                                   bool unidirectional,
76                                   SpdyStreamId stream_id,
77                                   SpdyStreamId associated_stream,
78                                   NetLog::LogLevel /* log_level */) {
79  base::DictionaryValue* dict = new base::DictionaryValue();
80  base::ListValue* headers_list = new base::ListValue();
81  for (SpdyHeaderBlock::const_iterator it = headers->begin();
82       it != headers->end(); ++it) {
83    headers_list->Append(new base::StringValue(base::StringPrintf(
84        "%s: %s", it->first.c_str(),
85        (ShouldShowHttpHeaderValue(
86            it->first) ? it->second : "[elided]").c_str())));
87  }
88  dict->SetBoolean("fin", fin);
89  dict->SetBoolean("unidirectional", unidirectional);
90  dict->Set("headers", headers_list);
91  dict->SetInteger("stream_id", stream_id);
92  if (associated_stream)
93    dict->SetInteger("associated_stream", associated_stream);
94  return dict;
95}
96
97base::Value* NetLogSpdyCredentialCallback(size_t slot,
98                                          const std::string* origin,
99                                          NetLog::LogLevel /* log_level */) {
100  base::DictionaryValue* dict = new base::DictionaryValue();
101  dict->SetInteger("slot", slot);
102  dict->SetString("origin", *origin);
103  return dict;
104}
105
106base::Value* NetLogSpdySessionCloseCallback(int net_error,
107                                            const std::string* description,
108                                            NetLog::LogLevel /* log_level */) {
109  base::DictionaryValue* dict = new base::DictionaryValue();
110  dict->SetInteger("net_error", net_error);
111  dict->SetString("description", *description);
112  return dict;
113}
114
115base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
116                                       NetLog::LogLevel /* log_level */) {
117  base::DictionaryValue* dict = new base::DictionaryValue();
118  dict->SetString("host", host_pair->first.ToString());
119  dict->SetString("proxy", host_pair->second.ToPacString());
120  return dict;
121}
122
123base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
124                                        bool clear_persisted,
125                                        NetLog::LogLevel /* log_level */) {
126  base::DictionaryValue* dict = new base::DictionaryValue();
127  dict->SetString("host", host_port_pair.ToString());
128  dict->SetBoolean("clear_persisted", clear_persisted);
129  return dict;
130}
131
132base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
133                                       SpdySettingsFlags flags,
134                                       uint32 value,
135                                       NetLog::LogLevel /* log_level */) {
136  base::DictionaryValue* dict = new base::DictionaryValue();
137  dict->SetInteger("id", id);
138  dict->SetInteger("flags", flags);
139  dict->SetInteger("value", value);
140  return dict;
141}
142
143base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
144                                            NetLog::LogLevel /* log_level */) {
145  base::DictionaryValue* dict = new base::DictionaryValue();
146  base::ListValue* settings_list = new base::ListValue();
147  for (SettingsMap::const_iterator it = settings->begin();
148       it != settings->end(); ++it) {
149    const SpdySettingsIds id = it->first;
150    const SpdySettingsFlags flags = it->second.first;
151    const uint32 value = it->second.second;
152    settings_list->Append(new base::StringValue(
153        base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
154  }
155  dict->Set("settings", settings_list);
156  return dict;
157}
158
159base::Value* NetLogSpdyWindowUpdateFrameCallback(
160    SpdyStreamId stream_id,
161    uint32 delta,
162    NetLog::LogLevel /* log_level */) {
163  base::DictionaryValue* dict = new base::DictionaryValue();
164  dict->SetInteger("stream_id", static_cast<int>(stream_id));
165  dict->SetInteger("delta", delta);
166  return dict;
167}
168
169base::Value* NetLogSpdySessionWindowUpdateCallback(
170    int32 delta,
171    int32 window_size,
172    NetLog::LogLevel /* log_level */) {
173  base::DictionaryValue* dict = new base::DictionaryValue();
174  dict->SetInteger("delta", delta);
175  dict->SetInteger("window_size", window_size);
176  return dict;
177}
178
179base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
180                                    int size,
181                                    bool fin,
182                                    NetLog::LogLevel /* log_level */) {
183  base::DictionaryValue* dict = new base::DictionaryValue();
184  dict->SetInteger("stream_id", static_cast<int>(stream_id));
185  dict->SetInteger("size", size);
186  dict->SetBoolean("fin", fin);
187  return dict;
188}
189
190base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
191                                   int status,
192                                   const std::string* description,
193                                   NetLog::LogLevel /* log_level */) {
194  base::DictionaryValue* dict = new base::DictionaryValue();
195  dict->SetInteger("stream_id", static_cast<int>(stream_id));
196  dict->SetInteger("status", status);
197  dict->SetString("description", *description);
198  return dict;
199}
200
201base::Value* NetLogSpdyPingCallback(uint32 unique_id,
202                                    const char* type,
203                                    NetLog::LogLevel /* log_level */) {
204  base::DictionaryValue* dict = new base::DictionaryValue();
205  dict->SetInteger("unique_id", unique_id);
206  dict->SetString("type", type);
207  return dict;
208}
209
210base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
211                                      int active_streams,
212                                      int unclaimed_streams,
213                                      SpdyGoAwayStatus status,
214                                      NetLog::LogLevel /* log_level */) {
215  base::DictionaryValue* dict = new base::DictionaryValue();
216  dict->SetInteger("last_accepted_stream_id",
217                   static_cast<int>(last_stream_id));
218  dict->SetInteger("active_streams", active_streams);
219  dict->SetInteger("unclaimed_streams", unclaimed_streams);
220  dict->SetInteger("status", static_cast<int>(status));
221  return dict;
222}
223
224// Maximum number of concurrent streams we will create, unless the server
225// sends a SETTINGS frame with a different value.
226const size_t kInitialMaxConcurrentStreams = 100;
227// The maximum number of concurrent streams we will ever create.  Even if
228// the server permits more, we will never exceed this limit.
229const size_t kMaxConcurrentStreamLimit = 256;
230
231}  // namespace
232
233SpdyStreamRequest::SpdyStreamRequest() {
234  Reset();
235}
236
237SpdyStreamRequest::~SpdyStreamRequest() {
238  CancelRequest();
239}
240
241int SpdyStreamRequest::StartRequest(
242    SpdyStreamType type,
243    const base::WeakPtr<SpdySession>& session,
244    const GURL& url,
245    RequestPriority priority,
246    const BoundNetLog& net_log,
247    const CompletionCallback& callback) {
248  DCHECK(session.get());
249  DCHECK(!session_.get());
250  DCHECK(!stream_.get());
251  DCHECK(callback_.is_null());
252
253  type_ = type;
254  session_ = session;
255  url_ = url;
256  priority_ = priority;
257  net_log_ = net_log;
258  callback_ = callback;
259
260  base::WeakPtr<SpdyStream> stream;
261  int rv = session->TryCreateStream(this, &stream);
262  if (rv == OK) {
263    Reset();
264    stream_ = stream;
265  }
266  return rv;
267}
268
269void SpdyStreamRequest::CancelRequest() {
270  if (session_.get())
271    session_->CancelStreamRequest(this);
272  Reset();
273}
274
275base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
276  DCHECK(!session_.get());
277  base::WeakPtr<SpdyStream> stream = stream_;
278  DCHECK(stream.get());
279  Reset();
280  return stream;
281}
282
283void SpdyStreamRequest::OnRequestCompleteSuccess(
284    base::WeakPtr<SpdyStream>* stream) {
285  DCHECK(session_.get());
286  DCHECK(!stream_.get());
287  DCHECK(!callback_.is_null());
288  CompletionCallback callback = callback_;
289  Reset();
290  DCHECK(*stream);
291  stream_ = *stream;
292  callback.Run(OK);
293}
294
295void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
296  DCHECK(session_.get());
297  DCHECK(!stream_.get());
298  DCHECK(!callback_.is_null());
299  CompletionCallback callback = callback_;
300  Reset();
301  DCHECK_NE(rv, OK);
302  callback.Run(rv);
303}
304
305void SpdyStreamRequest::Reset() {
306  type_ = SPDY_BIDIRECTIONAL_STREAM;
307  session_.reset();
308  stream_.reset();
309  url_ = GURL();
310  priority_ = MINIMUM_PRIORITY;
311  net_log_ = BoundNetLog();
312  callback_.Reset();
313}
314
315SpdySession::ActiveStreamInfo::ActiveStreamInfo()
316    : stream(NULL),
317      waiting_for_syn_reply(false) {}
318
319SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
320    : stream(stream),
321      waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {}
322
323SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
324
325SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
326
327SpdySession::PushedStreamInfo::PushedStreamInfo(
328    SpdyStreamId stream_id,
329    base::TimeTicks creation_time)
330    : stream_id(stream_id),
331      creation_time(creation_time) {}
332
333SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
334
335SpdySession::SpdySession(
336    const SpdySessionKey& spdy_session_key,
337    const base::WeakPtr<HttpServerProperties>& http_server_properties,
338    bool verify_domain_authentication,
339    bool enable_sending_initial_settings,
340    bool enable_credential_frames,
341    bool enable_compression,
342    bool enable_ping_based_connection_checking,
343    NextProto default_protocol,
344    size_t stream_initial_recv_window_size,
345    size_t initial_max_concurrent_streams,
346    size_t max_concurrent_streams_limit,
347    TimeFunc time_func,
348    const HostPortPair& trusted_spdy_proxy,
349    NetLog* net_log)
350    : weak_factory_(this),
351      in_io_loop_(false),
352      spdy_session_key_(spdy_session_key),
353      pool_(NULL),
354      http_server_properties_(http_server_properties),
355      read_buffer_(new IOBuffer(kReadBufferSize)),
356      stream_hi_water_mark_(kFirstStreamId),
357      in_flight_write_frame_type_(DATA),
358      in_flight_write_frame_size_(0),
359      is_secure_(false),
360      certificate_error_code_(OK),
361      availability_state_(STATE_AVAILABLE),
362      read_state_(READ_STATE_DO_READ),
363      write_state_(WRITE_STATE_IDLE),
364      error_on_close_(OK),
365      max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
366                              kInitialMaxConcurrentStreams :
367                              initial_max_concurrent_streams),
368      max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
369                                    kMaxConcurrentStreamLimit :
370                                    max_concurrent_streams_limit),
371      streams_initiated_count_(0),
372      streams_pushed_count_(0),
373      streams_pushed_and_claimed_count_(0),
374      streams_abandoned_count_(0),
375      total_bytes_received_(0),
376      sent_settings_(false),
377      received_settings_(false),
378      stalled_streams_(0),
379      pings_in_flight_(0),
380      next_ping_id_(1),
381      last_activity_time_(time_func()),
382      check_ping_status_pending_(false),
383      flow_control_state_(FLOW_CONTROL_NONE),
384      stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
385      stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ?
386                                       kDefaultInitialRecvWindowSize :
387                                       stream_initial_recv_window_size),
388      session_send_window_size_(0),
389      session_recv_window_size_(0),
390      session_unacked_recv_window_bytes_(0),
391      net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
392      verify_domain_authentication_(verify_domain_authentication),
393      enable_sending_initial_settings_(enable_sending_initial_settings),
394      enable_credential_frames_(enable_credential_frames),
395      enable_compression_(enable_compression),
396      enable_ping_based_connection_checking_(
397          enable_ping_based_connection_checking),
398      default_protocol_(default_protocol),
399      credential_state_(SpdyCredentialState::kDefaultNumSlots),
400      connection_at_risk_of_loss_time_(
401          base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
402      hung_interval_(
403          base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
404      trusted_spdy_proxy_(trusted_spdy_proxy),
405      time_func_(time_func) {
406  DCHECK(HttpStreamFactory::spdy_enabled());
407  net_log_.BeginEvent(
408      NetLog::TYPE_SPDY_SESSION,
409      base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
410  next_unclaimed_push_stream_sweep_time_ = time_func_() +
411      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
412  // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
413}
414
415SpdySession::~SpdySession() {
416  CHECK(!in_io_loop_);
417  DCHECK(!pool_);
418  DcheckClosed();
419
420  // TODO(akalin): Check connection->is_initialized() instead. This
421  // requires re-working CreateFakeSpdySession(), though.
422  DCHECK(connection_->socket());
423  // With SPDY we can't recycle sockets.
424  connection_->socket()->Disconnect();
425
426  RecordHistograms();
427
428  net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
429}
430
431Error SpdySession::InitializeWithSocket(
432    scoped_ptr<ClientSocketHandle> connection,
433    SpdySessionPool* pool,
434    bool is_secure,
435    int certificate_error_code) {
436  CHECK(!in_io_loop_);
437  DCHECK_EQ(availability_state_, STATE_AVAILABLE);
438  DCHECK_EQ(read_state_, READ_STATE_DO_READ);
439  DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
440  DCHECK(!connection_);
441
442  DCHECK(certificate_error_code == OK ||
443         certificate_error_code < ERR_IO_PENDING);
444  // TODO(akalin): Check connection->is_initialized() instead. This
445  // requires re-working CreateFakeSpdySession(), though.
446  DCHECK(connection->socket());
447
448  base::StatsCounter spdy_sessions("spdy.sessions");
449  spdy_sessions.Increment();
450
451  connection_ = connection.Pass();
452  is_secure_ = is_secure;
453  certificate_error_code_ = certificate_error_code;
454
455  NextProto protocol = default_protocol_;
456  NextProto protocol_negotiated =
457      connection_->socket()->GetNegotiatedProtocol();
458  if (protocol_negotiated != kProtoUnknown) {
459    protocol = protocol_negotiated;
460  }
461
462  SSLClientSocket* ssl_socket = GetSSLClientSocket();
463  if (ssl_socket && ssl_socket->WasChannelIDSent()) {
464    // According to the SPDY spec, the credential associated with the TLS
465    // connection is stored in slot[1].
466    credential_state_.SetHasCredential(GURL("https://" +
467                                            host_port_pair().ToString()));
468  }
469
470  // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we
471  // stop supporting SPDY/1.
472  DCHECK_GE(protocol, kProtoSPDY2);
473  DCHECK_LE(protocol, kProtoSPDYMaximumVersion);
474  if (protocol >= kProtoSPDY31) {
475    flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
476    session_send_window_size_ = kSpdySessionInitialWindowSize;
477    session_recv_window_size_ = kSpdySessionInitialWindowSize;
478  } else if (protocol >= kProtoSPDY3) {
479    flow_control_state_ = FLOW_CONTROL_STREAM;
480  } else {
481    flow_control_state_ = FLOW_CONTROL_NONE;
482  }
483
484  buffered_spdy_framer_.reset(
485      new BufferedSpdyFramer(NPNToSpdyVersion(protocol), enable_compression_));
486  buffered_spdy_framer_->set_visitor(this);
487  buffered_spdy_framer_->set_debug_visitor(this);
488  UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol, kProtoMaximumVersion);
489#if defined(SPDY_PROXY_AUTH_ORIGIN)
490  UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
491                        host_port_pair().Equals(HostPortPair::FromURL(
492                            GURL(SPDY_PROXY_AUTH_ORIGIN))));
493#endif
494
495  net_log_.AddEvent(
496      NetLog::TYPE_SPDY_SESSION_INITIALIZED,
497      connection_->socket()->NetLog().source().ToEventParametersCallback());
498
499  int error = DoReadLoop(READ_STATE_DO_READ, OK);
500  if (error == ERR_IO_PENDING)
501    error = OK;
502  if (error == OK) {
503    DCHECK_NE(availability_state_, STATE_CLOSED);
504    connection_->AddLayeredPool(this);
505    SendInitialSettings();
506    pool_ = pool;
507  } else {
508    DcheckClosed();
509  }
510  return static_cast<Error>(error);
511}
512
513bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
514  if (!verify_domain_authentication_)
515    return true;
516
517  if (availability_state_ == STATE_CLOSED)
518    return false;
519
520  SSLInfo ssl_info;
521  bool was_npn_negotiated;
522  NextProto protocol_negotiated = kProtoUnknown;
523  if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
524    return true;   // This is not a secure session, so all domains are okay.
525
526  return !ssl_info.client_cert_sent &&
527      (enable_credential_frames_ || !ssl_info.channel_id_sent ||
528       ServerBoundCertService::GetDomainForHost(domain) ==
529       ServerBoundCertService::GetDomainForHost(host_port_pair().host())) &&
530       ssl_info.cert->VerifyNameMatch(domain);
531}
532
533int SpdySession::GetPushStream(
534    const GURL& url,
535    base::WeakPtr<SpdyStream>* stream,
536    const BoundNetLog& stream_net_log) {
537  CHECK(!in_io_loop_);
538
539  stream->reset();
540
541  // TODO(akalin): Add unit test exercising this code path.
542  if (availability_state_ == STATE_CLOSED)
543    return ERR_CONNECTION_CLOSED;
544
545  Error err = TryAccessStream(url);
546  if (err != OK)
547    return err;
548
549  *stream = GetActivePushStream(url);
550  if (*stream) {
551    DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
552    streams_pushed_and_claimed_count_++;
553  }
554  return OK;
555}
556
557Error SpdySession::TryAccessStream(const GURL& url) {
558  CHECK(!in_io_loop_);
559  DCHECK_NE(availability_state_, STATE_CLOSED);
560
561  if (is_secure_ && certificate_error_code_ != OK &&
562      (url.SchemeIs("https") || url.SchemeIs("wss"))) {
563    RecordProtocolErrorHistogram(
564        PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
565    CloseSessionResult result = DoCloseSession(
566        static_cast<Error>(certificate_error_code_),
567        "Tried to get SPDY stream for secure content over an unauthenticated "
568        "session.");
569    DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
570    return ERR_SPDY_PROTOCOL_ERROR;
571  }
572  return OK;
573}
574
575int SpdySession::TryCreateStream(SpdyStreamRequest* request,
576                                 base::WeakPtr<SpdyStream>* stream) {
577  CHECK(request);
578  CHECK(!in_io_loop_);
579
580  if (availability_state_ == STATE_GOING_AWAY)
581    return ERR_FAILED;
582
583  // TODO(akalin): Add unit test exercising this code path.
584  if (availability_state_ == STATE_CLOSED)
585    return ERR_CONNECTION_CLOSED;
586
587  Error err = TryAccessStream(request->url());
588  if (err != OK)
589    return err;
590
591  if (!max_concurrent_streams_ ||
592      (active_streams_.size() + created_streams_.size() <
593       max_concurrent_streams_)) {
594    return CreateStream(*request, stream);
595  }
596
597  stalled_streams_++;
598  net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
599  pending_create_stream_queues_[request->priority()].push_back(request);
600  return ERR_IO_PENDING;
601}
602
603int SpdySession::CreateStream(const SpdyStreamRequest& request,
604                              base::WeakPtr<SpdyStream>* stream) {
605  CHECK(!in_io_loop_);
606  DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
607  DCHECK_LT(request.priority(), NUM_PRIORITIES);
608
609  if (availability_state_ == STATE_GOING_AWAY)
610    return ERR_FAILED;
611
612  // TODO(akalin): Add unit test exercising this code path.
613  if (availability_state_ == STATE_CLOSED)
614    return ERR_CONNECTION_CLOSED;
615
616  Error err = TryAccessStream(request.url());
617  if (err != OK) {
618    // This should have been caught in TryCreateStream().
619    NOTREACHED();
620    return err;
621  }
622
623  DCHECK(connection_->socket());
624  DCHECK(connection_->socket()->IsConnected());
625  if (connection_->socket()) {
626    UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
627                          connection_->socket()->IsConnected());
628    if (!connection_->socket()->IsConnected()) {
629      CloseSessionResult result = DoCloseSession(
630          ERR_CONNECTION_CLOSED,
631          "Tried to create SPDY stream for a closed socket connection.");
632      DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
633      return ERR_CONNECTION_CLOSED;
634    }
635  }
636
637  scoped_ptr<SpdyStream> new_stream(
638      new SpdyStream(request.type(), GetWeakPtr(), request.url(),
639                     request.priority(),
640                     stream_initial_send_window_size_,
641                     stream_initial_recv_window_size_,
642                     request.net_log()));
643  *stream = new_stream->GetWeakPtr();
644  InsertCreatedStream(new_stream.Pass());
645
646  UMA_HISTOGRAM_CUSTOM_COUNTS(
647      "Net.SpdyPriorityCount",
648      static_cast<int>(request.priority()), 0, 10, 11);
649
650  return OK;
651}
652
653void SpdySession::CancelStreamRequest(SpdyStreamRequest* request) {
654  CHECK(request);
655
656  if (DCHECK_IS_ON()) {
657    // |request| should not be in a queue not matching its priority.
658    for (int i = 0; i < NUM_PRIORITIES; ++i) {
659      if (request->priority() == i)
660        continue;
661      PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
662      DCHECK(std::find(queue->begin(), queue->end(), request) == queue->end());
663    }
664  }
665
666  PendingStreamRequestQueue* queue =
667      &pending_create_stream_queues_[request->priority()];
668  // Remove |request| from |queue| while preserving the order of the
669  // other elements.
670  PendingStreamRequestQueue::iterator it =
671      std::find(queue->begin(), queue->end(), request);
672  if (it != queue->end()) {
673    it = queue->erase(it);
674    // |request| should be in the queue at most once, and if it is
675    // present, should not be pending completion.
676    DCHECK(std::find(it, queue->end(), request) == queue->end());
677    DCHECK(!ContainsKey(pending_stream_request_completions_,
678                        request));
679    return;
680  }
681
682  pending_stream_request_completions_.erase(request);
683}
684
685void SpdySession::ProcessPendingStreamRequests() {
686  while (!max_concurrent_streams_ ||
687         (active_streams_.size() + created_streams_.size() <
688          max_concurrent_streams_)) {
689    bool no_pending_create_streams = true;
690    for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
691      if (!pending_create_stream_queues_[i].empty()) {
692        SpdyStreamRequest* pending_request =
693            pending_create_stream_queues_[i].front();
694        CHECK(pending_request);
695        pending_create_stream_queues_[i].pop_front();
696        no_pending_create_streams = false;
697        DCHECK(!ContainsKey(pending_stream_request_completions_,
698                            pending_request));
699        pending_stream_request_completions_.insert(pending_request);
700        base::MessageLoop::current()->PostTask(
701            FROM_HERE,
702            base::Bind(&SpdySession::CompleteStreamRequest,
703                       weak_factory_.GetWeakPtr(), pending_request));
704        break;
705      }
706    }
707    if (no_pending_create_streams)
708      return;  // there were no streams in any queue
709  }
710}
711
712bool SpdySession::NeedsCredentials() const {
713  if (!is_secure_)
714    return false;
715  SSLClientSocket* ssl_socket = GetSSLClientSocket();
716  if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3)
717    return false;
718  return ssl_socket->WasChannelIDSent();
719}
720
721void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
722  pooled_aliases_.insert(alias_key);
723}
724
725int SpdySession::GetProtocolVersion() const {
726  DCHECK(buffered_spdy_framer_.get());
727  return buffered_spdy_framer_->protocol_version();
728}
729
730base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
731  return weak_factory_.GetWeakPtr();
732}
733
734bool SpdySession::CloseOneIdleConnection() {
735  CHECK(!in_io_loop_);
736  DCHECK_NE(availability_state_, STATE_CLOSED);
737  DCHECK(pool_);
738  if (!active_streams_.empty())
739    return false;
740  CloseSessionResult result =
741      DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
742  if (result != SESSION_CLOSED_AND_REMOVED) {
743    NOTREACHED();
744    return false;
745  }
746  return true;
747}
748
749void SpdySession::EnqueueStreamWrite(
750    const base::WeakPtr<SpdyStream>& stream,
751    SpdyFrameType frame_type,
752    scoped_ptr<SpdyBufferProducer> producer) {
753  DCHECK(frame_type == HEADERS ||
754         frame_type == DATA ||
755         frame_type == CREDENTIAL ||
756         frame_type == SYN_STREAM);
757  EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
758}
759
760scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
761    SpdyStreamId stream_id,
762    RequestPriority priority,
763    uint8 credential_slot,
764    SpdyControlFlags flags,
765    const SpdyHeaderBlock& headers) {
766  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
767  CHECK(it != active_streams_.end());
768  CHECK_EQ(it->second.stream->stream_id(), stream_id);
769
770  SendPrefacePingIfNoneInFlight();
771
772  DCHECK(buffered_spdy_framer_.get());
773  scoped_ptr<SpdyFrame> syn_frame(
774      buffered_spdy_framer_->CreateSynStream(
775          stream_id, 0,
776          ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
777          credential_slot, flags, enable_compression_, &headers));
778
779  base::StatsCounter spdy_requests("spdy.requests");
780  spdy_requests.Increment();
781  streams_initiated_count_++;
782
783  if (net_log().IsLoggingAllEvents()) {
784    net_log().AddEvent(
785        NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
786        base::Bind(&NetLogSpdySynCallback, &headers,
787                   (flags & CONTROL_FLAG_FIN) != 0,
788                   (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
789                   stream_id, 0));
790  }
791
792  return syn_frame.Pass();
793}
794
795int SpdySession::CreateCredentialFrame(
796    const std::string& origin,
797    SSLClientCertType type,
798    const std::string& key,
799    const std::string& cert,
800    RequestPriority priority,
801    scoped_ptr<SpdyFrame>* credential_frame) {
802  DCHECK(is_secure_);
803  SSLClientSocket* ssl_socket = GetSSLClientSocket();
804  DCHECK(ssl_socket);
805  DCHECK(ssl_socket->WasChannelIDSent());
806
807  SpdyCredential credential;
808  std::string tls_unique;
809  ssl_socket->GetTLSUniqueChannelBinding(&tls_unique);
810  size_t slot = credential_state_.SetHasCredential(GURL(origin));
811  int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot,
812                                        &credential);
813  DCHECK_NE(rv, ERR_IO_PENDING);
814  if (rv != OK)
815    return rv;
816
817  DCHECK(buffered_spdy_framer_.get());
818  credential_frame->reset(
819      buffered_spdy_framer_->CreateCredentialFrame(credential));
820
821  if (net_log().IsLoggingAllEvents()) {
822    net_log().AddEvent(
823        NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
824        base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
825  }
826  return OK;
827}
828
829scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
830                                                     IOBuffer* data,
831                                                     int len,
832                                                     SpdyDataFlags flags) {
833  if (availability_state_ == STATE_CLOSED) {
834    NOTREACHED();
835    return scoped_ptr<SpdyBuffer>();
836  }
837
838  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
839  CHECK(it != active_streams_.end());
840  SpdyStream* stream = it->second.stream;
841  CHECK_EQ(stream->stream_id(), stream_id);
842
843  if (len < 0) {
844    NOTREACHED();
845    return scoped_ptr<SpdyBuffer>();
846  }
847
848  int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
849
850  bool send_stalled_by_stream =
851      (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
852      (stream->send_window_size() <= 0);
853  bool send_stalled_by_session = IsSendStalled();
854
855  // NOTE: There's an enum of the same name in histograms.xml.
856  enum SpdyFrameFlowControlState {
857    SEND_NOT_STALLED,
858    SEND_STALLED_BY_STREAM,
859    SEND_STALLED_BY_SESSION,
860    SEND_STALLED_BY_STREAM_AND_SESSION,
861  };
862
863  SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
864  if (send_stalled_by_stream) {
865    if (send_stalled_by_session) {
866      frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
867    } else {
868      frame_flow_control_state = SEND_STALLED_BY_STREAM;
869    }
870  } else if (send_stalled_by_session) {
871    frame_flow_control_state = SEND_STALLED_BY_SESSION;
872  }
873
874  if (flow_control_state_ == FLOW_CONTROL_STREAM) {
875    UMA_HISTOGRAM_ENUMERATION(
876        "Net.SpdyFrameStreamFlowControlState",
877        frame_flow_control_state,
878        SEND_STALLED_BY_STREAM + 1);
879  } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
880    UMA_HISTOGRAM_ENUMERATION(
881        "Net.SpdyFrameStreamAndSessionFlowControlState",
882        frame_flow_control_state,
883        SEND_STALLED_BY_STREAM_AND_SESSION + 1);
884  }
885
886  // Obey send window size of the stream if stream flow control is
887  // enabled.
888  if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
889    if (send_stalled_by_stream) {
890      stream->set_send_stalled_by_flow_control(true);
891      // Even though we're currently stalled only by the stream, we
892      // might end up being stalled by the session also.
893      QueueSendStalledStream(*stream);
894      net_log().AddEvent(
895          NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
896          NetLog::IntegerCallback("stream_id", stream_id));
897      return scoped_ptr<SpdyBuffer>();
898    }
899
900    effective_len = std::min(effective_len, stream->send_window_size());
901  }
902
903  // Obey send window size of the session if session flow control is
904  // enabled.
905  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
906    if (send_stalled_by_session) {
907      stream->set_send_stalled_by_flow_control(true);
908      QueueSendStalledStream(*stream);
909      net_log().AddEvent(
910          NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
911          NetLog::IntegerCallback("stream_id", stream_id));
912      return scoped_ptr<SpdyBuffer>();
913    }
914
915    effective_len = std::min(effective_len, session_send_window_size_);
916  }
917
918  DCHECK_GE(effective_len, 0);
919
920  // Clear FIN flag if only some of the data will be in the data
921  // frame.
922  if (effective_len < len)
923    flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
924
925  if (net_log().IsLoggingAllEvents()) {
926    net_log().AddEvent(
927        NetLog::TYPE_SPDY_SESSION_SEND_DATA,
928        base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
929                   (flags & DATA_FLAG_FIN) != 0));
930  }
931
932  // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
933  if (effective_len > 0)
934    SendPrefacePingIfNoneInFlight();
935
936  // TODO(mbelshe): reduce memory copies here.
937  DCHECK(buffered_spdy_framer_.get());
938  scoped_ptr<SpdyFrame> frame(
939      buffered_spdy_framer_->CreateDataFrame(
940          stream_id, data->data(),
941          static_cast<uint32>(effective_len), flags));
942
943  scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
944
945  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
946    DecreaseSendWindowSize(static_cast<int32>(effective_len));
947    data_buffer->AddConsumeCallback(
948        base::Bind(&SpdySession::OnWriteBufferConsumed,
949                   weak_factory_.GetWeakPtr(),
950                   static_cast<size_t>(effective_len)));
951  }
952
953  return data_buffer.Pass();
954}
955
956void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
957  DCHECK_NE(stream_id, 0u);
958
959  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
960  if (it == active_streams_.end()) {
961    NOTREACHED();
962    return;
963  }
964
965  CloseActiveStreamIterator(it, status);
966}
967
968void SpdySession::CloseCreatedStream(
969    const base::WeakPtr<SpdyStream>& stream, int status) {
970  DCHECK_EQ(stream->stream_id(), 0u);
971
972  CreatedStreamSet::iterator it = created_streams_.find(stream.get());
973  if (it == created_streams_.end()) {
974    NOTREACHED();
975    return;
976  }
977
978  CloseCreatedStreamIterator(it, status);
979}
980
981void SpdySession::ResetStream(SpdyStreamId stream_id,
982                              SpdyRstStreamStatus status,
983                              const std::string& description) {
984  DCHECK_NE(stream_id, 0u);
985
986  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
987  if (it == active_streams_.end()) {
988    NOTREACHED();
989    return;
990  }
991
992  ResetStreamIterator(it, status, description);
993}
994
995bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
996  return ContainsKey(active_streams_, stream_id);
997}
998
999LoadState SpdySession::GetLoadState() const {
1000  // Just report that we're idle since the session could be doing
1001  // many things concurrently.
1002  return LOAD_STATE_IDLE;
1003}
1004
1005void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1006                                            int status) {
1007  // TODO(mbelshe): We should send a RST_STREAM control frame here
1008  //                so that the server can cancel a large send.
1009
1010  scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1011  active_streams_.erase(it);
1012
1013  // TODO(akalin): When SpdyStream was ref-counted (and
1014  // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1015  // was only done when status was not OK. This meant that pushed
1016  // streams can still be claimed after they're closed. This is
1017  // probably something that we still want to support, although server
1018  // push is hardly used. Write tests for this and fix this. (See
1019  // http://crbug.com/261712 .)
1020  if (owned_stream->type() == SPDY_PUSH_STREAM)
1021      unclaimed_pushed_streams_.erase(owned_stream->url());
1022
1023  DeleteStream(owned_stream.Pass(), status);
1024}
1025
1026void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1027                                             int status) {
1028  scoped_ptr<SpdyStream> owned_stream(*it);
1029  created_streams_.erase(it);
1030  DeleteStream(owned_stream.Pass(), status);
1031}
1032
1033void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1034                                      SpdyRstStreamStatus status,
1035                                      const std::string& description) {
1036  SpdyStreamId stream_id = it->first;
1037  RequestPriority priority = it->second.stream->priority();
1038  // Removes any pending writes for the stream except for possibly an
1039  // in-flight one.
1040  CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1041
1042  SendResetStreamFrame(stream_id, priority, status, description);
1043}
1044
1045void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id,
1046                                       RequestPriority priority,
1047                                       SpdyRstStreamStatus status,
1048                                       const std::string& description) {
1049  DCHECK_NE(stream_id, 0u);
1050  DCHECK(active_streams_.find(stream_id) == active_streams_.end());
1051
1052  net_log().AddEvent(
1053      NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1054      base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1055
1056  DCHECK(buffered_spdy_framer_.get());
1057  scoped_ptr<SpdyFrame> rst_frame(
1058      buffered_spdy_framer_->CreateRstStream(stream_id, status));
1059
1060  EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1061  RecordProtocolErrorHistogram(
1062      static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
1063}
1064
1065void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1066  CHECK(!in_io_loop_);
1067  DCHECK_NE(availability_state_, STATE_CLOSED);
1068  DCHECK_EQ(read_state_, expected_read_state);
1069
1070  result = DoReadLoop(expected_read_state, result);
1071
1072  if (availability_state_ == STATE_CLOSED) {
1073    DCHECK_EQ(result, error_on_close_);
1074    DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1075    RemoveFromPool();
1076    return;
1077  }
1078
1079  DCHECK(result == OK || result == ERR_IO_PENDING);
1080}
1081
1082int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1083  CHECK(!in_io_loop_);
1084  DCHECK_NE(availability_state_, STATE_CLOSED);
1085  DCHECK_EQ(read_state_, expected_read_state);
1086
1087  in_io_loop_ = true;
1088
1089  int bytes_read_without_yielding = 0;
1090
1091  // Loop until the session is closed, the read becomes blocked, or
1092  // the read limit is exceeded.
1093  while (true) {
1094    switch (read_state_) {
1095      case READ_STATE_DO_READ:
1096        DCHECK_EQ(result, OK);
1097        result = DoRead();
1098        break;
1099      case READ_STATE_DO_READ_COMPLETE:
1100        if (result > 0)
1101          bytes_read_without_yielding += result;
1102        result = DoReadComplete(result);
1103        break;
1104      default:
1105        NOTREACHED() << "read_state_: " << read_state_;
1106        break;
1107    }
1108
1109    if (availability_state_ == STATE_CLOSED) {
1110      DCHECK_EQ(result, error_on_close_);
1111      DCHECK_LT(result, ERR_IO_PENDING);
1112      break;
1113    }
1114
1115    if (result == ERR_IO_PENDING)
1116      break;
1117
1118    if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1119      read_state_ = READ_STATE_DO_READ;
1120      base::MessageLoop::current()->PostTask(
1121          FROM_HERE,
1122          base::Bind(&SpdySession::PumpReadLoop,
1123                     weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1124      result = ERR_IO_PENDING;
1125      break;
1126    }
1127  }
1128
1129  CHECK(in_io_loop_);
1130  in_io_loop_ = false;
1131
1132  return result;
1133}
1134
1135int SpdySession::DoRead() {
1136  CHECK(in_io_loop_);
1137  DCHECK_NE(availability_state_, STATE_CLOSED);
1138
1139  CHECK(connection_);
1140  CHECK(connection_->socket());
1141  read_state_ = READ_STATE_DO_READ_COMPLETE;
1142  return connection_->socket()->Read(
1143      read_buffer_.get(),
1144      kReadBufferSize,
1145      base::Bind(&SpdySession::PumpReadLoop,
1146                 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1147}
1148
1149int SpdySession::DoReadComplete(int result) {
1150  CHECK(in_io_loop_);
1151  DCHECK_NE(availability_state_, STATE_CLOSED);
1152
1153  // Parse a frame.  For now this code requires that the frame fit into our
1154  // buffer (kReadBufferSize).
1155  // TODO(mbelshe): support arbitrarily large frames!
1156
1157  if (result == 0) {
1158    UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1159                                total_bytes_received_, 1, 100000000, 50);
1160    CloseSessionResult close_session_result =
1161        DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
1162    DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1163    DCHECK_EQ(availability_state_, STATE_CLOSED);
1164    DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
1165    return ERR_CONNECTION_CLOSED;
1166  }
1167
1168  if (result < 0) {
1169    CloseSessionResult close_session_result =
1170        DoCloseSession(static_cast<Error>(result), "result is < 0.");
1171    DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1172    DCHECK_EQ(availability_state_, STATE_CLOSED);
1173    DCHECK_EQ(error_on_close_, result);
1174    return result;
1175  }
1176
1177  total_bytes_received_ += result;
1178
1179  last_activity_time_ = time_func_();
1180
1181  DCHECK(buffered_spdy_framer_.get());
1182  char* data = read_buffer_->data();
1183  while (result > 0) {
1184    uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1185    result -= bytes_processed;
1186    data += bytes_processed;
1187
1188    if (availability_state_ == STATE_CLOSED) {
1189      DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1190      return error_on_close_;
1191    }
1192
1193    DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1194  }
1195
1196  read_state_ = READ_STATE_DO_READ;
1197  return OK;
1198}
1199
1200void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1201  CHECK(!in_io_loop_);
1202  DCHECK_NE(availability_state_, STATE_CLOSED);
1203  DCHECK_EQ(write_state_, expected_write_state);
1204
1205  result = DoWriteLoop(expected_write_state, result);
1206
1207  if (availability_state_ == STATE_CLOSED) {
1208    DCHECK_EQ(result, error_on_close_);
1209    DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1210    RemoveFromPool();
1211    return;
1212  }
1213
1214  DCHECK(result == OK || result == ERR_IO_PENDING);
1215}
1216
1217int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1218  CHECK(!in_io_loop_);
1219  DCHECK_NE(availability_state_, STATE_CLOSED);
1220  DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1221  DCHECK_EQ(write_state_, expected_write_state);
1222
1223  in_io_loop_ = true;
1224
1225  // Loop until the session is closed or the write becomes blocked.
1226  while (true) {
1227    switch (write_state_) {
1228      case WRITE_STATE_DO_WRITE:
1229        DCHECK_EQ(result, OK);
1230        result = DoWrite();
1231        break;
1232      case WRITE_STATE_DO_WRITE_COMPLETE:
1233        result = DoWriteComplete(result);
1234        break;
1235      case WRITE_STATE_IDLE:
1236      default:
1237        NOTREACHED() << "write_state_: " << write_state_;
1238        break;
1239    }
1240
1241    if (availability_state_ == STATE_CLOSED) {
1242      DCHECK_EQ(result, error_on_close_);
1243      DCHECK_LT(result, ERR_IO_PENDING);
1244      break;
1245    }
1246
1247    if (write_state_ == WRITE_STATE_IDLE) {
1248      DCHECK_EQ(result, ERR_IO_PENDING);
1249      break;
1250    }
1251
1252    if (result == ERR_IO_PENDING)
1253      break;
1254  }
1255
1256  CHECK(in_io_loop_);
1257  in_io_loop_ = false;
1258
1259  return result;
1260}
1261
1262int SpdySession::DoWrite() {
1263  CHECK(in_io_loop_);
1264  DCHECK_NE(availability_state_, STATE_CLOSED);
1265
1266  DCHECK(buffered_spdy_framer_);
1267  if (in_flight_write_) {
1268    DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1269  } else {
1270    // Grab the next frame to send.
1271    SpdyFrameType frame_type = DATA;
1272    scoped_ptr<SpdyBufferProducer> producer;
1273    base::WeakPtr<SpdyStream> stream;
1274    if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1275      write_state_ = WRITE_STATE_IDLE;
1276      return ERR_IO_PENDING;
1277    }
1278
1279    if (stream.get())
1280      DCHECK(!stream->IsClosed());
1281
1282    // Activate the stream only when sending the SYN_STREAM frame to
1283    // guarantee monotonically-increasing stream IDs.
1284    if (frame_type == SYN_STREAM) {
1285      if (stream.get() && stream->stream_id() == 0) {
1286        scoped_ptr<SpdyStream> owned_stream =
1287            ActivateCreatedStream(stream.get());
1288        InsertActivatedStream(owned_stream.Pass());
1289      } else {
1290        NOTREACHED();
1291        return ERR_UNEXPECTED;
1292      }
1293    }
1294
1295    in_flight_write_ = producer->ProduceBuffer();
1296    if (!in_flight_write_) {
1297      NOTREACHED();
1298      return ERR_UNEXPECTED;
1299    }
1300    in_flight_write_frame_type_ = frame_type;
1301    in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1302    DCHECK_GE(in_flight_write_frame_size_,
1303              buffered_spdy_framer_->GetFrameMinimumSize());
1304    in_flight_write_stream_ = stream;
1305  }
1306
1307  write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1308
1309  // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1310  // with Socket implementations that don't store their IOBuffer
1311  // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1312  scoped_refptr<IOBuffer> write_io_buffer =
1313      in_flight_write_->GetIOBufferForRemainingData();
1314  return connection_->socket()->Write(
1315      write_io_buffer.get(),
1316      in_flight_write_->GetRemainingSize(),
1317      base::Bind(&SpdySession::PumpWriteLoop,
1318                 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1319}
1320
1321int SpdySession::DoWriteComplete(int result) {
1322  CHECK(in_io_loop_);
1323  DCHECK_NE(availability_state_, STATE_CLOSED);
1324  DCHECK_NE(result, ERR_IO_PENDING);
1325  DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1326
1327  last_activity_time_ = time_func_();
1328
1329  if (result < 0) {
1330    DCHECK_NE(result, ERR_IO_PENDING);
1331    in_flight_write_.reset();
1332    in_flight_write_frame_type_ = DATA;
1333    in_flight_write_frame_size_ = 0;
1334    in_flight_write_stream_.reset();
1335    CloseSessionResult close_session_result =
1336        DoCloseSession(static_cast<Error>(result), "Write error");
1337    DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1338    DCHECK_EQ(availability_state_, STATE_CLOSED);
1339    DCHECK_EQ(error_on_close_, result);
1340    return result;
1341  }
1342
1343  // It should not be possible to have written more bytes than our
1344  // in_flight_write_.
1345  DCHECK_LE(static_cast<size_t>(result),
1346            in_flight_write_->GetRemainingSize());
1347
1348  if (result > 0) {
1349    in_flight_write_->Consume(static_cast<size_t>(result));
1350
1351    // We only notify the stream when we've fully written the pending frame.
1352    if (in_flight_write_->GetRemainingSize() == 0) {
1353      // It is possible that the stream was cancelled while we were
1354      // writing to the socket.
1355      if (in_flight_write_stream_.get()) {
1356        DCHECK_GT(in_flight_write_frame_size_, 0u);
1357        in_flight_write_stream_->OnFrameWriteComplete(
1358            in_flight_write_frame_type_,
1359            in_flight_write_frame_size_);
1360      }
1361
1362      // Cleanup the write which just completed.
1363      in_flight_write_.reset();
1364      in_flight_write_frame_type_ = DATA;
1365      in_flight_write_frame_size_ = 0;
1366      in_flight_write_stream_.reset();
1367    }
1368  }
1369
1370  write_state_ = WRITE_STATE_DO_WRITE;
1371  return OK;
1372}
1373
1374void SpdySession::DcheckGoingAway() const {
1375  DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1376  if (DCHECK_IS_ON()) {
1377    for (int i = 0; i < NUM_PRIORITIES; ++i) {
1378      DCHECK(pending_create_stream_queues_[i].empty());
1379    }
1380  }
1381  DCHECK(pending_stream_request_completions_.empty());
1382  DCHECK(created_streams_.empty());
1383}
1384
1385void SpdySession::DcheckClosed() const {
1386  DcheckGoingAway();
1387  DCHECK_EQ(availability_state_, STATE_CLOSED);
1388  DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1389  DCHECK(active_streams_.empty());
1390  DCHECK(unclaimed_pushed_streams_.empty());
1391  DCHECK(write_queue_.IsEmpty());
1392}
1393
1394void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1395                                 Error status) {
1396  DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1397  // The loops below are carefully written to avoid reentrancy problems.
1398
1399  for (int i = 0; i < NUM_PRIORITIES; ++i) {
1400    PendingStreamRequestQueue queue;
1401    queue.swap(pending_create_stream_queues_[i]);
1402    for (PendingStreamRequestQueue::const_iterator it = queue.begin();
1403         it != queue.end(); ++it) {
1404      CHECK(*it);
1405      (*it)->OnRequestCompleteFailure(ERR_ABORTED);
1406    }
1407  }
1408
1409  PendingStreamRequestCompletionSet pending_completions;
1410  pending_completions.swap(pending_stream_request_completions_);
1411  for (PendingStreamRequestCompletionSet::const_iterator it =
1412           pending_completions.begin();
1413       it != pending_completions.end(); ++it) {
1414    (*it)->OnRequestCompleteFailure(ERR_ABORTED);
1415  }
1416
1417  while (true) {
1418    ActiveStreamMap::iterator it =
1419        active_streams_.lower_bound(last_good_stream_id + 1);
1420    if (it == active_streams_.end())
1421      break;
1422    LogAbandonedActiveStream(it, status);
1423    CloseActiveStreamIterator(it, status);
1424  }
1425
1426  while (!created_streams_.empty()) {
1427    CreatedStreamSet::iterator it = created_streams_.begin();
1428    LogAbandonedStream(*it, status);
1429    CloseCreatedStreamIterator(it, status);
1430  }
1431
1432  write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1433
1434  DcheckGoingAway();
1435}
1436
1437void SpdySession::MaybeFinishGoingAway() {
1438  DcheckGoingAway();
1439  if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
1440    CloseSessionResult result =
1441        DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
1442    DCHECK_NE(result, SESSION_ALREADY_CLOSED);
1443  }
1444}
1445
1446SpdySession::CloseSessionResult SpdySession::DoCloseSession(
1447    Error err,
1448    const std::string& description) {
1449  DCHECK_LT(err, ERR_IO_PENDING);
1450
1451  if (availability_state_ == STATE_CLOSED)
1452    return SESSION_ALREADY_CLOSED;
1453
1454  net_log_.AddEvent(
1455      NetLog::TYPE_SPDY_SESSION_CLOSE,
1456      base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1457
1458  UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1459  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1460                              total_bytes_received_, 1, 100000000, 50);
1461
1462  // |pool_| will be NULL when |InitializeWithSocket()| is in the
1463  // call stack.
1464  if (pool_ && availability_state_ != STATE_GOING_AWAY)
1465    pool_->MakeSessionUnavailable(GetWeakPtr());
1466
1467  availability_state_ = STATE_CLOSED;
1468  error_on_close_ = err;
1469
1470  StartGoingAway(0, err);
1471  write_queue_.Clear();
1472
1473  DcheckClosed();
1474
1475  if (in_io_loop_)
1476    return SESSION_CLOSED_BUT_NOT_REMOVED;
1477
1478  RemoveFromPool();
1479  return SESSION_CLOSED_AND_REMOVED;
1480}
1481
1482void SpdySession::RemoveFromPool() {
1483  DcheckClosed();
1484  CHECK(pool_);
1485
1486  SpdySessionPool* pool = pool_;
1487  pool_ = NULL;
1488  pool->RemoveUnavailableSession(GetWeakPtr());
1489}
1490
1491void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1492  DCHECK(stream);
1493  std::string description = base::StringPrintf(
1494      "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1495      stream->url().spec();
1496  stream->LogStreamError(status, description);
1497  // We don't increment the streams abandoned counter here. If the
1498  // stream isn't active (i.e., it hasn't written anything to the wire
1499  // yet) then it's as if it never existed. If it is active, then
1500  // LogAbandonedActiveStream() will increment the counters.
1501}
1502
1503void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1504                                           Error status) {
1505  DCHECK_GT(it->first, 0u);
1506  LogAbandonedStream(it->second.stream, status);
1507  ++streams_abandoned_count_;
1508  base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1509  abandoned_streams.Increment();
1510  if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1511      unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1512      unclaimed_pushed_streams_.end()) {
1513    base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1514    abandoned_push_streams.Increment();
1515  }
1516}
1517
1518int SpdySession::GetNewStreamId() {
1519  int id = stream_hi_water_mark_;
1520  stream_hi_water_mark_ += 2;
1521  if (stream_hi_water_mark_ > 0x7fff)
1522    stream_hi_water_mark_ = 1;
1523  return id;
1524}
1525
1526void SpdySession::CloseSessionOnError(Error err,
1527                                      const std::string& description) {
1528  // We may be called from anywhere, so we can't expect a particular
1529  // return value.
1530  ignore_result(DoCloseSession(err, description));
1531}
1532
1533base::Value* SpdySession::GetInfoAsValue() const {
1534  base::DictionaryValue* dict = new base::DictionaryValue();
1535
1536  dict->SetInteger("source_id", net_log_.source().id);
1537
1538  dict->SetString("host_port_pair", host_port_pair().ToString());
1539  if (!pooled_aliases_.empty()) {
1540    base::ListValue* alias_list = new base::ListValue();
1541    for (std::set<SpdySessionKey>::const_iterator it =
1542             pooled_aliases_.begin();
1543         it != pooled_aliases_.end(); it++) {
1544      alias_list->Append(new base::StringValue(
1545          it->host_port_pair().ToString()));
1546    }
1547    dict->Set("aliases", alias_list);
1548  }
1549  dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1550
1551  dict->SetInteger("active_streams", active_streams_.size());
1552
1553  dict->SetInteger("unclaimed_pushed_streams",
1554                   unclaimed_pushed_streams_.size());
1555
1556  dict->SetBoolean("is_secure", is_secure_);
1557
1558  dict->SetString("protocol_negotiated",
1559                  SSLClientSocket::NextProtoToString(
1560                      connection_->socket()->GetNegotiatedProtocol()));
1561
1562  dict->SetInteger("error", error_on_close_);
1563  dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1564
1565  dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1566  dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1567  dict->SetInteger("streams_pushed_and_claimed_count",
1568      streams_pushed_and_claimed_count_);
1569  dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1570  DCHECK(buffered_spdy_framer_.get());
1571  dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1572
1573  dict->SetBoolean("sent_settings", sent_settings_);
1574  dict->SetBoolean("received_settings", received_settings_);
1575
1576  dict->SetInteger("send_window_size", session_send_window_size_);
1577  dict->SetInteger("recv_window_size", session_recv_window_size_);
1578  dict->SetInteger("unacked_recv_window_bytes",
1579                   session_unacked_recv_window_bytes_);
1580  return dict;
1581}
1582
1583bool SpdySession::IsReused() const {
1584  return buffered_spdy_framer_->frames_received() > 0;
1585}
1586
1587bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1588                                    LoadTimingInfo* load_timing_info) const {
1589  return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1590                                        load_timing_info);
1591}
1592
1593int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1594  int rv = ERR_SOCKET_NOT_CONNECTED;
1595  if (connection_->socket()) {
1596    rv = connection_->socket()->GetPeerAddress(address);
1597  }
1598
1599  UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1600                        rv == ERR_SOCKET_NOT_CONNECTED);
1601
1602  return rv;
1603}
1604
1605int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1606  int rv = ERR_SOCKET_NOT_CONNECTED;
1607  if (connection_->socket()) {
1608    rv = connection_->socket()->GetLocalAddress(address);
1609  }
1610
1611  UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1612                        rv == ERR_SOCKET_NOT_CONNECTED);
1613
1614  return rv;
1615}
1616
1617void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1618                                      SpdyFrameType frame_type,
1619                                      scoped_ptr<SpdyFrame> frame) {
1620  DCHECK(frame_type == RST_STREAM ||
1621         frame_type == SETTINGS ||
1622         frame_type == WINDOW_UPDATE ||
1623         frame_type == PING);
1624  EnqueueWrite(
1625      priority, frame_type,
1626      scoped_ptr<SpdyBufferProducer>(
1627          new SimpleBufferProducer(
1628              scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1629      base::WeakPtr<SpdyStream>());
1630}
1631
1632void SpdySession::EnqueueWrite(RequestPriority priority,
1633                               SpdyFrameType frame_type,
1634                               scoped_ptr<SpdyBufferProducer> producer,
1635                               const base::WeakPtr<SpdyStream>& stream) {
1636  if (availability_state_ == STATE_CLOSED)
1637    return;
1638
1639  bool was_idle = write_queue_.IsEmpty();
1640  write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1641  if (write_state_ == WRITE_STATE_IDLE) {
1642    DCHECK(was_idle);
1643    DCHECK(!in_flight_write_);
1644    write_state_ = WRITE_STATE_DO_WRITE;
1645    base::MessageLoop::current()->PostTask(
1646        FROM_HERE,
1647        base::Bind(&SpdySession::PumpWriteLoop,
1648                   weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1649  }
1650}
1651
1652void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1653  DCHECK_EQ(stream->stream_id(), 0u);
1654  DCHECK(created_streams_.find(stream.get()) == created_streams_.end());
1655  created_streams_.insert(stream.release());
1656}
1657
1658scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1659  DCHECK_EQ(stream->stream_id(), 0u);
1660  DCHECK(created_streams_.find(stream) != created_streams_.end());
1661  stream->set_stream_id(GetNewStreamId());
1662  scoped_ptr<SpdyStream> owned_stream(stream);
1663  created_streams_.erase(stream);
1664  return owned_stream.Pass();
1665}
1666
1667void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1668  SpdyStreamId stream_id = stream->stream_id();
1669  DCHECK_NE(stream_id, 0u);
1670  std::pair<ActiveStreamMap::iterator, bool> result =
1671      active_streams_.insert(
1672          std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1673  if (result.second) {
1674    ignore_result(stream.release());
1675  } else {
1676    NOTREACHED();
1677  }
1678}
1679
1680void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1681  if (in_flight_write_stream_.get() == stream.get()) {
1682    // If we're deleting the stream for the in-flight write, we still
1683    // need to let the write complete, so we clear
1684    // |in_flight_write_stream_| and let the write finish on its own
1685    // without notifying |in_flight_write_stream_|.
1686    in_flight_write_stream_.reset();
1687  }
1688
1689  write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1690
1691  stream->OnClose(status);
1692
1693  switch (availability_state_) {
1694    case STATE_AVAILABLE:
1695      ProcessPendingStreamRequests();
1696      break;
1697    case STATE_GOING_AWAY:
1698      DcheckGoingAway();
1699      MaybeFinishGoingAway();
1700      break;
1701    case STATE_CLOSED:
1702      // Do nothing.
1703      break;
1704  }
1705}
1706
1707base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1708  base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1709
1710  PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1711  if (unclaimed_it == unclaimed_pushed_streams_.end())
1712    return base::WeakPtr<SpdyStream>();
1713
1714  SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1715  unclaimed_pushed_streams_.erase(unclaimed_it);
1716
1717  ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1718  if (active_it == active_streams_.end()) {
1719    NOTREACHED();
1720    return base::WeakPtr<SpdyStream>();
1721  }
1722
1723  net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1724  used_push_streams.Increment();
1725  return active_it->second.stream->GetWeakPtr();
1726}
1727
1728bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1729                             bool* was_npn_negotiated,
1730                             NextProto* protocol_negotiated) {
1731  *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1732  *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1733  return connection_->socket()->GetSSLInfo(ssl_info);
1734}
1735
1736bool SpdySession::GetSSLCertRequestInfo(
1737    SSLCertRequestInfo* cert_request_info) {
1738  if (!is_secure_)
1739    return false;
1740  GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1741  return true;
1742}
1743
1744ServerBoundCertService* SpdySession::GetServerBoundCertService() const {
1745  if (!is_secure_)
1746    return NULL;
1747  return GetSSLClientSocket()->GetServerBoundCertService();
1748}
1749
1750void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1751  CHECK(in_io_loop_);
1752
1753  if (availability_state_ == STATE_CLOSED)
1754    return;
1755
1756  RecordProtocolErrorHistogram(
1757      static_cast<SpdyProtocolErrorDetails>(error_code));
1758  std::string description = base::StringPrintf(
1759      "SPDY_ERROR error_code: %d.", error_code);
1760  CloseSessionResult result =
1761      DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
1762  DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
1763}
1764
1765void SpdySession::OnStreamError(SpdyStreamId stream_id,
1766                                const std::string& description) {
1767  CHECK(in_io_loop_);
1768
1769  if (availability_state_ == STATE_CLOSED)
1770    return;
1771
1772  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1773  if (it == active_streams_.end()) {
1774    // We still want to send a frame to reset the stream even if we
1775    // don't know anything about it.
1776    SendResetStreamFrame(
1777        stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1778    return;
1779  }
1780
1781  ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1782}
1783
1784void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1785                                    const char* data,
1786                                    size_t len,
1787                                    bool fin) {
1788  CHECK(in_io_loop_);
1789
1790  if (availability_state_ == STATE_CLOSED)
1791    return;
1792
1793  DCHECK_LT(len, 1u << 24);
1794  if (net_log().IsLoggingAllEvents()) {
1795    net_log().AddEvent(
1796        NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1797        base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1798  }
1799
1800  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1801
1802  // By the time data comes in, the stream may already be inactive.
1803  if (it == active_streams_.end())
1804    return;
1805
1806  SpdyStream* stream = it->second.stream;
1807  CHECK_EQ(stream->stream_id(), stream_id);
1808
1809  if (it->second.waiting_for_syn_reply) {
1810    const std::string& error = "Data received before SYN_REPLY.";
1811    stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
1812    ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
1813    return;
1814  }
1815
1816  scoped_ptr<SpdyBuffer> buffer;
1817  if (data) {
1818    DCHECK_GT(len, 0u);
1819    buffer.reset(new SpdyBuffer(data, len));
1820
1821    if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1822      DecreaseRecvWindowSize(static_cast<int32>(len));
1823      buffer->AddConsumeCallback(
1824          base::Bind(&SpdySession::OnReadBufferConsumed,
1825                     weak_factory_.GetWeakPtr()));
1826    }
1827  } else {
1828    DCHECK_EQ(len, 0u);
1829  }
1830  stream->OnDataReceived(buffer.Pass());
1831}
1832
1833void SpdySession::OnSettings(bool clear_persisted) {
1834  CHECK(in_io_loop_);
1835
1836  if (availability_state_ == STATE_CLOSED)
1837    return;
1838
1839  if (clear_persisted)
1840    http_server_properties_->ClearSpdySettings(host_port_pair());
1841
1842  if (net_log_.IsLoggingAllEvents()) {
1843    net_log_.AddEvent(
1844        NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1845        base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
1846                   clear_persisted));
1847  }
1848}
1849
1850void SpdySession::OnSetting(SpdySettingsIds id,
1851                            uint8 flags,
1852                            uint32 value) {
1853  CHECK(in_io_loop_);
1854
1855  if (availability_state_ == STATE_CLOSED)
1856    return;
1857
1858  HandleSetting(id, value);
1859  http_server_properties_->SetSpdySetting(
1860      host_port_pair(),
1861      id,
1862      static_cast<SpdySettingsFlags>(flags),
1863      value);
1864  received_settings_ = true;
1865
1866  // Log the setting.
1867  net_log_.AddEvent(
1868      NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
1869      base::Bind(&NetLogSpdySettingCallback,
1870                 id, static_cast<SpdySettingsFlags>(flags), value));
1871}
1872
1873void SpdySession::OnSendCompressedFrame(
1874    SpdyStreamId stream_id,
1875    SpdyFrameType type,
1876    size_t payload_len,
1877    size_t frame_len) {
1878  if (type != SYN_STREAM)
1879    return;
1880
1881  DCHECK(buffered_spdy_framer_.get());
1882  size_t compressed_len =
1883      frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
1884
1885  if (payload_len) {
1886    // Make sure we avoid early decimal truncation.
1887    int compression_pct = 100 - (100 * compressed_len) / payload_len;
1888    UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
1889                             compression_pct);
1890  }
1891}
1892
1893int SpdySession::OnInitialResponseHeadersReceived(
1894    const SpdyHeaderBlock& response_headers,
1895    base::Time response_time,
1896    base::TimeTicks recv_first_byte_time,
1897    SpdyStream* stream) {
1898  CHECK(in_io_loop_);
1899  SpdyStreamId stream_id = stream->stream_id();
1900  // May invalidate |stream|.
1901  int rv = stream->OnInitialResponseHeadersReceived(
1902      response_headers, response_time, recv_first_byte_time);
1903  if (rv < 0) {
1904    DCHECK_NE(rv, ERR_IO_PENDING);
1905    DCHECK(active_streams_.find(stream_id) == active_streams_.end());
1906  }
1907  return rv;
1908}
1909
1910void SpdySession::OnSynStream(SpdyStreamId stream_id,
1911                              SpdyStreamId associated_stream_id,
1912                              SpdyPriority priority,
1913                              uint8 credential_slot,
1914                              bool fin,
1915                              bool unidirectional,
1916                              const SpdyHeaderBlock& headers) {
1917  CHECK(in_io_loop_);
1918
1919  if (availability_state_ == STATE_CLOSED)
1920    return;
1921
1922  base::Time response_time = base::Time::Now();
1923  base::TimeTicks recv_first_byte_time = time_func_();
1924
1925  if (net_log_.IsLoggingAllEvents()) {
1926    net_log_.AddEvent(
1927        NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1928        base::Bind(&NetLogSpdySynCallback,
1929                   &headers, fin, unidirectional,
1930                   stream_id, associated_stream_id));
1931  }
1932
1933  // Server-initiated streams should have even sequence numbers.
1934  if ((stream_id & 0x1) != 0) {
1935    LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1936    return;
1937  }
1938
1939  if (IsStreamActive(stream_id)) {
1940    LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1941    return;
1942  }
1943
1944  RequestPriority request_priority =
1945      ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
1946
1947  if (availability_state_ == STATE_GOING_AWAY) {
1948    // TODO(akalin): This behavior isn't in the SPDY spec, although it
1949    // probably should be.
1950    SendResetStreamFrame(stream_id, request_priority,
1951                         RST_STREAM_REFUSED_STREAM,
1952                         "OnSyn received when going away");
1953    return;
1954  }
1955
1956  if (associated_stream_id == 0) {
1957    std::string description = base::StringPrintf(
1958        "Received invalid OnSyn associated stream id %d for stream %d",
1959        associated_stream_id, stream_id);
1960    SendResetStreamFrame(stream_id, request_priority,
1961                         RST_STREAM_REFUSED_STREAM, description);
1962    return;
1963  }
1964
1965  streams_pushed_count_++;
1966
1967  // TODO(mbelshe): DCHECK that this is a GET method?
1968
1969  // Verify that the response had a URL for us.
1970  GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
1971  if (!gurl.is_valid()) {
1972    SendResetStreamFrame(stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
1973                         "Pushed stream url was invalid: " + gurl.spec());
1974    return;
1975  }
1976
1977  // Verify we have a valid stream association.
1978  ActiveStreamMap::iterator associated_it =
1979      active_streams_.find(associated_stream_id);
1980  if (associated_it == active_streams_.end()) {
1981    SendResetStreamFrame(
1982        stream_id, request_priority, RST_STREAM_INVALID_STREAM,
1983        base::StringPrintf(
1984            "Received OnSyn with inactive associated stream %d",
1985            associated_stream_id));
1986    return;
1987  }
1988
1989  // Check that the SYN advertises the same origin as its associated stream.
1990  // Bypass this check if and only if this session is with a SPDY proxy that
1991  // is trusted explicitly via the --trusted-spdy-proxy switch.
1992  if (trusted_spdy_proxy_.Equals(host_port_pair())) {
1993    // Disallow pushing of HTTPS content.
1994    if (gurl.SchemeIs("https")) {
1995      SendResetStreamFrame(
1996          stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
1997          base::StringPrintf(
1998              "Rejected push of Cross Origin HTTPS content %d",
1999              associated_stream_id));
2000    }
2001  } else {
2002    GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2003    if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2004      SendResetStreamFrame(
2005          stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
2006          base::StringPrintf(
2007              "Rejected Cross Origin Push Stream %d",
2008              associated_stream_id));
2009      return;
2010    }
2011  }
2012
2013  // There should not be an existing pushed stream with the same path.
2014  PushedStreamMap::iterator pushed_it =
2015      unclaimed_pushed_streams_.lower_bound(gurl);
2016  if (pushed_it != unclaimed_pushed_streams_.end() &&
2017      pushed_it->first == gurl) {
2018    SendResetStreamFrame(stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
2019                         "Received duplicate pushed stream with url: " +
2020                         gurl.spec());
2021    return;
2022  }
2023
2024  scoped_ptr<SpdyStream> stream(
2025      new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl,
2026                     request_priority,
2027                     stream_initial_send_window_size_,
2028                     stream_initial_recv_window_size_,
2029                     net_log_));
2030  stream->set_stream_id(stream_id);
2031
2032  DeleteExpiredPushedStreams();
2033  PushedStreamMap::iterator inserted_pushed_it =
2034      unclaimed_pushed_streams_.insert(
2035          pushed_it,
2036          std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2037  DCHECK(inserted_pushed_it != pushed_it);
2038
2039  InsertActivatedStream(stream.Pass());
2040
2041  ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2042  if (active_it == active_streams_.end()) {
2043    NOTREACHED();
2044    return;
2045  }
2046
2047  // Parse the headers.
2048  if (OnInitialResponseHeadersReceived(
2049          headers, response_time,
2050          recv_first_byte_time, active_it->second.stream) != OK)
2051    return;
2052
2053  base::StatsCounter push_requests("spdy.pushed_streams");
2054  push_requests.Increment();
2055}
2056
2057void SpdySession::DeleteExpiredPushedStreams() {
2058  if (unclaimed_pushed_streams_.empty())
2059    return;
2060
2061  // Check that adequate time has elapsed since the last sweep.
2062  if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2063    return;
2064
2065  // Gather old streams to delete.
2066  base::TimeTicks minimum_freshness = time_func_() -
2067      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2068  std::vector<SpdyStreamId> streams_to_close;
2069  for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2070       it != unclaimed_pushed_streams_.end(); ++it) {
2071    if (minimum_freshness > it->second.creation_time)
2072      streams_to_close.push_back(it->second.stream_id);
2073  }
2074
2075  for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2076           streams_to_close.begin();
2077       to_close_it != streams_to_close.end(); ++to_close_it) {
2078    ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2079    if (active_it == active_streams_.end())
2080      continue;
2081
2082    LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2083    // CloseActiveStreamIterator() will remove the stream from
2084    // |unclaimed_pushed_streams_|.
2085    CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM);
2086  }
2087
2088  next_unclaimed_push_stream_sweep_time_ = time_func_() +
2089      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2090}
2091
2092void SpdySession::OnSynReply(SpdyStreamId stream_id,
2093                             bool fin,
2094                             const SpdyHeaderBlock& headers) {
2095  CHECK(in_io_loop_);
2096
2097  if (availability_state_ == STATE_CLOSED)
2098    return;
2099
2100  base::Time response_time = base::Time::Now();
2101  base::TimeTicks recv_first_byte_time = time_func_();
2102
2103  if (net_log().IsLoggingAllEvents()) {
2104    net_log().AddEvent(
2105        NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2106        base::Bind(&NetLogSpdySynCallback,
2107                   &headers, fin, false,  // not unidirectional
2108                   stream_id, 0));
2109  }
2110
2111  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2112  if (it == active_streams_.end()) {
2113    // NOTE:  it may just be that the stream was cancelled.
2114    return;
2115  }
2116
2117  SpdyStream* stream = it->second.stream;
2118  CHECK_EQ(stream->stream_id(), stream_id);
2119
2120  if (!it->second.waiting_for_syn_reply) {
2121    const std::string& error =
2122        "Received duplicate SYN_REPLY for stream.";
2123    stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2124    ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error);
2125    return;
2126  }
2127  it->second.waiting_for_syn_reply = false;
2128
2129  ignore_result(OnInitialResponseHeadersReceived(
2130      headers, response_time, recv_first_byte_time, stream));
2131}
2132
2133void SpdySession::OnHeaders(SpdyStreamId stream_id,
2134                            bool fin,
2135                            const SpdyHeaderBlock& headers) {
2136  CHECK(in_io_loop_);
2137
2138  if (availability_state_ == STATE_CLOSED)
2139    return;
2140
2141  if (net_log().IsLoggingAllEvents()) {
2142    net_log().AddEvent(
2143        NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2144        base::Bind(&NetLogSpdySynCallback,
2145                   &headers, fin, /*unidirectional=*/false,
2146                   stream_id, 0));
2147  }
2148
2149  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2150  if (it == active_streams_.end()) {
2151    // NOTE:  it may just be that the stream was cancelled.
2152    LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2153    return;
2154  }
2155
2156  SpdyStream* stream = it->second.stream;
2157  CHECK_EQ(stream->stream_id(), stream_id);
2158
2159  int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2160  if (rv < 0) {
2161    DCHECK_NE(rv, ERR_IO_PENDING);
2162    DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2163  }
2164}
2165
2166void SpdySession::OnRstStream(SpdyStreamId stream_id,
2167                              SpdyRstStreamStatus status) {
2168  CHECK(in_io_loop_);
2169
2170  if (availability_state_ == STATE_CLOSED)
2171    return;
2172
2173  std::string description;
2174  net_log().AddEvent(
2175      NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2176      base::Bind(&NetLogSpdyRstCallback,
2177                 stream_id, status, &description));
2178
2179  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2180  if (it == active_streams_.end()) {
2181    // NOTE:  it may just be that the stream was cancelled.
2182    LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2183    return;
2184  }
2185
2186  CHECK_EQ(it->second.stream->stream_id(), stream_id);
2187
2188  if (status == 0) {
2189    it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2190  } else if (status == RST_STREAM_REFUSED_STREAM) {
2191    CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2192  } else {
2193    RecordProtocolErrorHistogram(
2194        PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2195    it->second.stream->LogStreamError(
2196        ERR_SPDY_PROTOCOL_ERROR,
2197        base::StringPrintf("SPDY stream closed with status: %d", status));
2198    // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2199    //                For now, it doesn't matter much - it is a protocol error.
2200    CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2201  }
2202}
2203
2204void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2205                           SpdyGoAwayStatus status) {
2206  CHECK(in_io_loop_);
2207
2208  if (availability_state_ == STATE_CLOSED)
2209    return;
2210
2211  net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2212      base::Bind(&NetLogSpdyGoAwayCallback,
2213                 last_accepted_stream_id,
2214                 active_streams_.size(),
2215                 unclaimed_pushed_streams_.size(),
2216                 status));
2217  if (availability_state_ < STATE_GOING_AWAY) {
2218    availability_state_ = STATE_GOING_AWAY;
2219    // |pool_| will be NULL when |InitializeWithSocket()| is in the
2220    // call stack.
2221    if (pool_)
2222      pool_->MakeSessionUnavailable(GetWeakPtr());
2223  }
2224  StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2225  // This is to handle the case when we already don't have any active
2226  // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2227  // active streams and so the last one being closed will finish the
2228  // going away process (see DeleteStream()).
2229  MaybeFinishGoingAway();
2230}
2231
2232void SpdySession::OnPing(uint32 unique_id) {
2233  CHECK(in_io_loop_);
2234
2235  if (availability_state_ == STATE_CLOSED)
2236    return;
2237
2238  net_log_.AddEvent(
2239      NetLog::TYPE_SPDY_SESSION_PING,
2240      base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
2241
2242  // Send response to a PING from server.
2243  if (unique_id % 2 == 0) {
2244    WritePingFrame(unique_id);
2245    return;
2246  }
2247
2248  --pings_in_flight_;
2249  if (pings_in_flight_ < 0) {
2250    RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2251    CloseSessionResult result =
2252        DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2253    DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2254    pings_in_flight_ = 0;
2255    return;
2256  }
2257
2258  if (pings_in_flight_ > 0)
2259    return;
2260
2261  // We will record RTT in histogram when there are no more client sent
2262  // pings_in_flight_.
2263  RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2264}
2265
2266void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2267                                 uint32 delta_window_size) {
2268  CHECK(in_io_loop_);
2269
2270  if (availability_state_ == STATE_CLOSED)
2271    return;
2272
2273  DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2274  net_log_.AddEvent(
2275      NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2276      base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2277                 stream_id, delta_window_size));
2278
2279  if (stream_id == kSessionFlowControlStreamId) {
2280    // WINDOW_UPDATE for the session.
2281    if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2282      LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2283                   << "session flow control is not turned on";
2284      // TODO(akalin): Record an error and close the session.
2285      return;
2286    }
2287
2288    if (delta_window_size < 1u) {
2289      RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2290      CloseSessionResult result = DoCloseSession(
2291          ERR_SPDY_PROTOCOL_ERROR,
2292          "Received WINDOW_UPDATE with an invalid delta_window_size " +
2293          base::UintToString(delta_window_size));
2294      DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2295      return;
2296    }
2297
2298    IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2299  } else {
2300    // WINDOW_UPDATE for a stream.
2301    if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2302      // TODO(akalin): Record an error and close the session.
2303      LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2304                   << " when flow control is not turned on";
2305      return;
2306    }
2307
2308    ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2309
2310    if (it == active_streams_.end()) {
2311      // NOTE:  it may just be that the stream was cancelled.
2312      LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2313      return;
2314    }
2315
2316    SpdyStream* stream = it->second.stream;
2317    CHECK_EQ(stream->stream_id(), stream_id);
2318
2319    if (delta_window_size < 1u) {
2320      ResetStreamIterator(it,
2321                          RST_STREAM_FLOW_CONTROL_ERROR,
2322                          base::StringPrintf(
2323                              "Received WINDOW_UPDATE with an invalid "
2324                              "delta_window_size %ud", delta_window_size));
2325      return;
2326    }
2327
2328    CHECK_EQ(it->second.stream->stream_id(), stream_id);
2329    it->second.stream->IncreaseSendWindowSize(
2330        static_cast<int32>(delta_window_size));
2331  }
2332}
2333
2334void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2335                                SpdyStreamId promised_stream_id) {
2336  // TODO(akalin): Handle PUSH_PROMISE frames.
2337}
2338
2339void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2340                                         uint32 delta_window_size) {
2341  CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2342  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2343  CHECK(it != active_streams_.end());
2344  CHECK_EQ(it->second.stream->stream_id(), stream_id);
2345  SendWindowUpdateFrame(
2346      stream_id, delta_window_size, it->second.stream->priority());
2347}
2348
2349void SpdySession::SendInitialSettings() {
2350  DCHECK_NE(availability_state_, STATE_CLOSED);
2351
2352  // First, notify the server about the settings they should use when
2353  // communicating with us.
2354  if (GetProtocolVersion() >= 2 && enable_sending_initial_settings_) {
2355    SettingsMap settings_map;
2356    // Create a new settings frame notifying the sever of our
2357    // max_concurrent_streams_ and initial window size.
2358    settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2359        SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2360    if (GetProtocolVersion() > 2 &&
2361        stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2362      settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2363          SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2364                                stream_initial_recv_window_size_);
2365    }
2366    SendSettings(settings_map);
2367  }
2368
2369  // Next, notify the server about our initial recv window size.
2370  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
2371      enable_sending_initial_settings_) {
2372    // Bump up the receive window size to the real initial value. This
2373    // has to go here since the WINDOW_UPDATE frame sent by
2374    // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2375    DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2376    // This condition implies that |kDefaultInitialRecvWindowSize| -
2377    // |session_recv_window_size_| doesn't overflow.
2378    DCHECK_GT(session_recv_window_size_, 0);
2379    IncreaseRecvWindowSize(
2380        kDefaultInitialRecvWindowSize - session_recv_window_size_);
2381  }
2382
2383  // Finally, notify the server about the settings they have previously
2384  // told us to use when communicating with them.
2385  const SettingsMap& settings_map =
2386      http_server_properties_->GetSpdySettings(host_port_pair());
2387  if (settings_map.empty())
2388    return;
2389
2390  const SpdySettingsIds id = SETTINGS_CURRENT_CWND;
2391  SettingsMap::const_iterator it = settings_map.find(id);
2392  uint32 value = 0;
2393  if (it != settings_map.end())
2394    value = it->second.second;
2395  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", value, 1, 200, 100);
2396
2397  const SettingsMap& settings_map_new =
2398      http_server_properties_->GetSpdySettings(host_port_pair());
2399  for (SettingsMap::const_iterator i = settings_map_new.begin(),
2400           end = settings_map_new.end(); i != end; ++i) {
2401    const SpdySettingsIds new_id = i->first;
2402    const uint32 new_val = i->second.second;
2403    HandleSetting(new_id, new_val);
2404  }
2405
2406  SendSettings(settings_map_new);
2407}
2408
2409
2410void SpdySession::SendSettings(const SettingsMap& settings) {
2411  DCHECK_NE(availability_state_, STATE_CLOSED);
2412
2413  net_log_.AddEvent(
2414      NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2415      base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2416
2417  // Create the SETTINGS frame and send it.
2418  DCHECK(buffered_spdy_framer_.get());
2419  scoped_ptr<SpdyFrame> settings_frame(
2420      buffered_spdy_framer_->CreateSettings(settings));
2421  sent_settings_ = true;
2422  EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2423}
2424
2425void SpdySession::HandleSetting(uint32 id, uint32 value) {
2426  switch (id) {
2427    case SETTINGS_MAX_CONCURRENT_STREAMS:
2428      max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2429                                         kMaxConcurrentStreamLimit);
2430      ProcessPendingStreamRequests();
2431      break;
2432    case SETTINGS_INITIAL_WINDOW_SIZE: {
2433      if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2434        net_log().AddEvent(
2435            NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2436        return;
2437      }
2438
2439      if (value > static_cast<uint32>(kint32max)) {
2440        net_log().AddEvent(
2441            NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2442            NetLog::IntegerCallback("initial_window_size", value));
2443        return;
2444      }
2445
2446      // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2447      int32 delta_window_size =
2448          static_cast<int32>(value) - stream_initial_send_window_size_;
2449      stream_initial_send_window_size_ = static_cast<int32>(value);
2450      UpdateStreamsSendWindowSize(delta_window_size);
2451      net_log().AddEvent(
2452          NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2453          NetLog::IntegerCallback("delta_window_size", delta_window_size));
2454      break;
2455    }
2456  }
2457}
2458
2459void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2460  DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2461  for (ActiveStreamMap::iterator it = active_streams_.begin();
2462       it != active_streams_.end(); ++it) {
2463    it->second.stream->AdjustSendWindowSize(delta_window_size);
2464  }
2465
2466  for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2467       it != created_streams_.end(); it++) {
2468    (*it)->AdjustSendWindowSize(delta_window_size);
2469  }
2470}
2471
2472void SpdySession::SendPrefacePingIfNoneInFlight() {
2473  if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2474    return;
2475
2476  base::TimeTicks now = time_func_();
2477  // If there is no activity in the session, then send a preface-PING.
2478  if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2479    SendPrefacePing();
2480}
2481
2482void SpdySession::SendPrefacePing() {
2483  WritePingFrame(next_ping_id_);
2484}
2485
2486void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2487                                        uint32 delta_window_size,
2488                                        RequestPriority priority) {
2489  CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2490  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2491  if (it != active_streams_.end()) {
2492    CHECK_EQ(it->second.stream->stream_id(), stream_id);
2493  } else {
2494    CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2495    CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2496  }
2497
2498  net_log_.AddEvent(
2499      NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2500      base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2501                 stream_id, delta_window_size));
2502
2503  DCHECK(buffered_spdy_framer_.get());
2504  scoped_ptr<SpdyFrame> window_update_frame(
2505      buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2506  EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2507}
2508
2509void SpdySession::WritePingFrame(uint32 unique_id) {
2510  DCHECK(buffered_spdy_framer_.get());
2511  scoped_ptr<SpdyFrame> ping_frame(
2512      buffered_spdy_framer_->CreatePingFrame(unique_id));
2513  EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2514
2515  if (net_log().IsLoggingAllEvents()) {
2516    net_log().AddEvent(
2517        NetLog::TYPE_SPDY_SESSION_PING,
2518        base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
2519  }
2520  if (unique_id % 2 != 0) {
2521    next_ping_id_ += 2;
2522    ++pings_in_flight_;
2523    PlanToCheckPingStatus();
2524    last_ping_sent_time_ = time_func_();
2525  }
2526}
2527
2528void SpdySession::PlanToCheckPingStatus() {
2529  if (check_ping_status_pending_)
2530    return;
2531
2532  check_ping_status_pending_ = true;
2533  base::MessageLoop::current()->PostDelayedTask(
2534      FROM_HERE,
2535      base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2536                 time_func_()), hung_interval_);
2537}
2538
2539void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2540  CHECK(!in_io_loop_);
2541  DCHECK_NE(availability_state_, STATE_CLOSED);
2542
2543  // Check if we got a response back for all PINGs we had sent.
2544  if (pings_in_flight_ == 0) {
2545    check_ping_status_pending_ = false;
2546    return;
2547  }
2548
2549  DCHECK(check_ping_status_pending_);
2550
2551  base::TimeTicks now = time_func_();
2552  base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2553
2554  if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2555    // Track all failed PING messages in a separate bucket.
2556    const base::TimeDelta kFailedPing =
2557        base::TimeDelta::FromInternalValue(INT_MAX);
2558    RecordPingRTTHistogram(kFailedPing);
2559    CloseSessionResult result =
2560        DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2561    DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
2562    return;
2563  }
2564
2565  // Check the status of connection after a delay.
2566  base::MessageLoop::current()->PostDelayedTask(
2567      FROM_HERE,
2568      base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2569                 now),
2570      delay);
2571}
2572
2573void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2574  UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2575}
2576
2577void SpdySession::RecordProtocolErrorHistogram(
2578    SpdyProtocolErrorDetails details) {
2579  UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2580                            NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2581  if (EndsWith(host_port_pair().host(), "google.com", false)) {
2582    UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2583                              NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2584  }
2585}
2586
2587void SpdySession::RecordHistograms() {
2588  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2589                              streams_initiated_count_,
2590                              0, 300, 50);
2591  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2592                              streams_pushed_count_,
2593                              0, 300, 50);
2594  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2595                              streams_pushed_and_claimed_count_,
2596                              0, 300, 50);
2597  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2598                              streams_abandoned_count_,
2599                              0, 300, 50);
2600  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2601                            sent_settings_ ? 1 : 0, 2);
2602  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2603                            received_settings_ ? 1 : 0, 2);
2604  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2605                              stalled_streams_,
2606                              0, 300, 50);
2607  UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2608                            stalled_streams_ > 0 ? 1 : 0, 2);
2609
2610  if (received_settings_) {
2611    // Enumerate the saved settings, and set histograms for it.
2612    const SettingsMap& settings_map =
2613        http_server_properties_->GetSpdySettings(host_port_pair());
2614
2615    SettingsMap::const_iterator it;
2616    for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2617      const SpdySettingsIds id = it->first;
2618      const uint32 val = it->second.second;
2619      switch (id) {
2620        case SETTINGS_CURRENT_CWND:
2621          // Record several different histograms to see if cwnd converges
2622          // for larger volumes of data being sent.
2623          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2624                                      val, 1, 200, 100);
2625          if (total_bytes_received_ > 10 * 1024) {
2626            UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2627                                        val, 1, 200, 100);
2628            if (total_bytes_received_ > 25 * 1024) {
2629              UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2630                                          val, 1, 200, 100);
2631              if (total_bytes_received_ > 50 * 1024) {
2632                UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2633                                            val, 1, 200, 100);
2634                if (total_bytes_received_ > 100 * 1024) {
2635                  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2636                                              val, 1, 200, 100);
2637                }
2638              }
2639            }
2640          }
2641          break;
2642        case SETTINGS_ROUND_TRIP_TIME:
2643          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2644                                      val, 1, 1200, 100);
2645          break;
2646        case SETTINGS_DOWNLOAD_RETRANS_RATE:
2647          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2648                                      val, 1, 100, 50);
2649          break;
2650        default:
2651          break;
2652      }
2653    }
2654  }
2655}
2656
2657void SpdySession::CompleteStreamRequest(SpdyStreamRequest* pending_request) {
2658  CHECK(pending_request);
2659
2660  PendingStreamRequestCompletionSet::iterator it =
2661      pending_stream_request_completions_.find(pending_request);
2662
2663  // Abort if the request has already been cancelled.
2664  if (it == pending_stream_request_completions_.end())
2665    return;
2666
2667  base::WeakPtr<SpdyStream> stream;
2668  int rv = CreateStream(*pending_request, &stream);
2669  pending_stream_request_completions_.erase(it);
2670
2671  if (rv == OK) {
2672    DCHECK(stream.get());
2673    pending_request->OnRequestCompleteSuccess(&stream);
2674  } else {
2675    DCHECK(!stream.get());
2676    pending_request->OnRequestCompleteFailure(rv);
2677  }
2678}
2679
2680SSLClientSocket* SpdySession::GetSSLClientSocket() const {
2681  if (!is_secure_)
2682    return NULL;
2683  SSLClientSocket* ssl_socket =
2684      reinterpret_cast<SSLClientSocket*>(connection_->socket());
2685  DCHECK(ssl_socket);
2686  return ssl_socket;
2687}
2688
2689void SpdySession::OnWriteBufferConsumed(
2690    size_t frame_payload_size,
2691    size_t consume_size,
2692    SpdyBuffer::ConsumeSource consume_source) {
2693  // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2694  // deleted (e.g., a stream is closed due to incoming data).
2695
2696  if (availability_state_ == STATE_CLOSED)
2697    return;
2698
2699  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2700
2701  if (consume_source == SpdyBuffer::DISCARD) {
2702    // If we're discarding a frame or part of it, increase the send
2703    // window by the number of discarded bytes. (Although if we're
2704    // discarding part of a frame, it's probably because of a write
2705    // error and we'll be tearing down the session soon.)
2706    size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2707    DCHECK_GT(remaining_payload_bytes, 0u);
2708    IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2709  }
2710  // For consumed bytes, the send window is increased when we receive
2711  // a WINDOW_UPDATE frame.
2712}
2713
2714void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2715  // We can be called with |in_io_loop_| set if a SpdyBuffer is
2716  // deleted (e.g., a stream is closed due to incoming data).
2717
2718  DCHECK_NE(availability_state_, STATE_CLOSED);
2719  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2720  DCHECK_GE(delta_window_size, 1);
2721
2722  // Check for overflow.
2723  int32 max_delta_window_size = kint32max - session_send_window_size_;
2724  if (delta_window_size > max_delta_window_size) {
2725    RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2726    CloseSessionResult result = DoCloseSession(
2727        ERR_SPDY_PROTOCOL_ERROR,
2728        "Received WINDOW_UPDATE [delta: " +
2729        base::IntToString(delta_window_size) +
2730        "] for session overflows session_send_window_size_ [current: " +
2731        base::IntToString(session_send_window_size_) + "]");
2732    DCHECK_NE(result, SESSION_ALREADY_CLOSED);
2733    return;
2734  }
2735
2736  session_send_window_size_ += delta_window_size;
2737
2738  net_log_.AddEvent(
2739      NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2740      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2741                 delta_window_size, session_send_window_size_));
2742
2743  DCHECK(!IsSendStalled());
2744  ResumeSendStalledStreams();
2745}
2746
2747void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2748  DCHECK_NE(availability_state_, STATE_CLOSED);
2749  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2750
2751  // We only call this method when sending a frame. Therefore,
2752  // |delta_window_size| should be within the valid frame size range.
2753  DCHECK_GE(delta_window_size, 1);
2754  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
2755
2756  // |send_window_size_| should have been at least |delta_window_size| for
2757  // this call to happen.
2758  DCHECK_GE(session_send_window_size_, delta_window_size);
2759
2760  session_send_window_size_ -= delta_window_size;
2761
2762  net_log_.AddEvent(
2763      NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2764      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2765                 -delta_window_size, session_send_window_size_));
2766}
2767
2768void SpdySession::OnReadBufferConsumed(
2769    size_t consume_size,
2770    SpdyBuffer::ConsumeSource consume_source) {
2771  // We can be called with |in_io_loop_| set if a read SpdyBuffer is
2772  // deleted (e.g., discarded by a SpdyReadQueue).
2773
2774  if (availability_state_ == STATE_CLOSED)
2775    return;
2776
2777  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2778  DCHECK_GE(consume_size, 1u);
2779  DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
2780
2781  IncreaseRecvWindowSize(static_cast<int32>(consume_size));
2782}
2783
2784void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
2785  DCHECK_NE(availability_state_, STATE_CLOSED);
2786  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2787  DCHECK_GE(session_unacked_recv_window_bytes_, 0);
2788  DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
2789  DCHECK_GE(delta_window_size, 1);
2790  // Check for overflow.
2791  DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
2792
2793  session_recv_window_size_ += delta_window_size;
2794  net_log_.AddEvent(
2795      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
2796      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2797                 delta_window_size, session_recv_window_size_));
2798
2799  session_unacked_recv_window_bytes_ += delta_window_size;
2800  if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
2801    SendWindowUpdateFrame(kSessionFlowControlStreamId,
2802                          session_unacked_recv_window_bytes_,
2803                          HIGHEST);
2804    session_unacked_recv_window_bytes_ = 0;
2805  }
2806}
2807
2808void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
2809  CHECK(in_io_loop_);
2810  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2811  DCHECK_GE(delta_window_size, 1);
2812
2813  // Since we never decrease the initial receive window size,
2814  // |delta_window_size| should never cause |recv_window_size_| to go
2815  // negative. If we do, the receive window isn't being respected.
2816  if (delta_window_size > session_recv_window_size_) {
2817    RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
2818    CloseSessionResult result = DoCloseSession(
2819        ERR_SPDY_PROTOCOL_ERROR,
2820        "delta_window_size is " + base::IntToString(delta_window_size) +
2821            " in DecreaseRecvWindowSize, which is larger than the receive " +
2822            "window size of " + base::IntToString(session_recv_window_size_));
2823    DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2824    return;
2825  }
2826
2827  session_recv_window_size_ -= delta_window_size;
2828  net_log_.AddEvent(
2829      NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2830      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2831                 -delta_window_size, session_recv_window_size_));
2832}
2833
2834void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
2835  DCHECK(stream.send_stalled_by_flow_control());
2836  stream_send_unstall_queue_[stream.priority()].push_back(stream.stream_id());
2837}
2838
2839namespace {
2840
2841// Helper function to return the total size of an array of objects
2842// with .size() member functions.
2843template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
2844  size_t total_size = 0;
2845  for (size_t i = 0; i < N; ++i) {
2846    total_size += arr[i].size();
2847  }
2848  return total_size;
2849}
2850
2851}  // namespace
2852
2853void SpdySession::ResumeSendStalledStreams() {
2854  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2855
2856  // We don't have to worry about new streams being queued, since
2857  // doing so would cause IsSendStalled() to return true. But we do
2858  // have to worry about streams being closed, as well as ourselves
2859  // being closed.
2860
2861  while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
2862    size_t old_size = 0;
2863    if (DCHECK_IS_ON())
2864      old_size = GetTotalSize(stream_send_unstall_queue_);
2865
2866    SpdyStreamId stream_id = PopStreamToPossiblyResume();
2867    if (stream_id == 0)
2868      break;
2869    ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2870    // The stream may actually still be send-stalled after this (due
2871    // to its own send window) but that's okay -- it'll then be
2872    // resumed once its send window increases.
2873    if (it != active_streams_.end())
2874      it->second.stream->PossiblyResumeIfSendStalled();
2875
2876    // The size should decrease unless we got send-stalled again.
2877    if (!IsSendStalled())
2878      DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
2879  }
2880}
2881
2882SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
2883  for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
2884    std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
2885    if (!queue->empty()) {
2886      SpdyStreamId stream_id = queue->front();
2887      queue->pop_front();
2888      return stream_id;
2889    }
2890  }
2891  return 0;
2892}
2893
2894}  // namespace net
2895