1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_session.h"
6
7#include <algorithm>
8#include <map>
9
10#include "base/basictypes.h"
11#include "base/bind.h"
12#include "base/compiler_specific.h"
13#include "base/logging.h"
14#include "base/message_loop/message_loop.h"
15#include "base/metrics/field_trial.h"
16#include "base/metrics/histogram.h"
17#include "base/metrics/sparse_histogram.h"
18#include "base/metrics/stats_counters.h"
19#include "base/stl_util.h"
20#include "base/strings/string_number_conversions.h"
21#include "base/strings/string_util.h"
22#include "base/strings/stringprintf.h"
23#include "base/strings/utf_string_conversions.h"
24#include "base/time/time.h"
25#include "base/values.h"
26#include "crypto/ec_private_key.h"
27#include "crypto/ec_signature_creator.h"
28#include "net/base/connection_type_histograms.h"
29#include "net/base/net_log.h"
30#include "net/base/net_util.h"
31#include "net/cert/asn1_util.h"
32#include "net/cert/cert_verify_result.h"
33#include "net/http/http_log_util.h"
34#include "net/http/http_network_session.h"
35#include "net/http/http_server_properties.h"
36#include "net/http/http_util.h"
37#include "net/http/transport_security_state.h"
38#include "net/socket/ssl_client_socket.h"
39#include "net/spdy/spdy_buffer_producer.h"
40#include "net/spdy/spdy_frame_builder.h"
41#include "net/spdy/spdy_http_utils.h"
42#include "net/spdy/spdy_protocol.h"
43#include "net/spdy/spdy_session_pool.h"
44#include "net/spdy/spdy_stream.h"
45#include "net/ssl/channel_id_service.h"
46#include "net/ssl/ssl_cipher_suite_names.h"
47#include "net/ssl/ssl_connection_status_flags.h"
48
49namespace net {
50
51namespace {
52
53const int kReadBufferSize = 8 * 1024;
54const int kDefaultConnectionAtRiskOfLossSeconds = 10;
55const int kHungIntervalSeconds = 10;
56
57// Minimum seconds that unclaimed pushed streams will be kept in memory.
58const int kMinPushedStreamLifetimeSeconds = 300;
59
60scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
61    const SpdyHeaderBlock& headers,
62    net::NetLog::LogLevel log_level) {
63  scoped_ptr<base::ListValue> headers_list(new base::ListValue());
64  for (SpdyHeaderBlock::const_iterator it = headers.begin();
65       it != headers.end(); ++it) {
66    headers_list->AppendString(
67        it->first + ": " +
68        ElideHeaderValueForNetLog(log_level, it->first, it->second));
69  }
70  return headers_list.Pass();
71}
72
73base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
74                                             bool fin,
75                                             bool unidirectional,
76                                             SpdyPriority spdy_priority,
77                                             SpdyStreamId stream_id,
78                                             NetLog::LogLevel log_level) {
79  base::DictionaryValue* dict = new base::DictionaryValue();
80  dict->Set("headers",
81            SpdyHeaderBlockToListValue(*headers, log_level).release());
82  dict->SetBoolean("fin", fin);
83  dict->SetBoolean("unidirectional", unidirectional);
84  dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
85  dict->SetInteger("stream_id", stream_id);
86  return dict;
87}
88
89base::Value* NetLogSpdySynStreamReceivedCallback(
90    const SpdyHeaderBlock* headers,
91    bool fin,
92    bool unidirectional,
93    SpdyPriority spdy_priority,
94    SpdyStreamId stream_id,
95    SpdyStreamId associated_stream,
96    NetLog::LogLevel log_level) {
97  base::DictionaryValue* dict = new base::DictionaryValue();
98  dict->Set("headers",
99            SpdyHeaderBlockToListValue(*headers, log_level).release());
100  dict->SetBoolean("fin", fin);
101  dict->SetBoolean("unidirectional", unidirectional);
102  dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
103  dict->SetInteger("stream_id", stream_id);
104  dict->SetInteger("associated_stream", associated_stream);
105  return dict;
106}
107
108base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
109    const SpdyHeaderBlock* headers,
110    bool fin,
111    SpdyStreamId stream_id,
112    NetLog::LogLevel log_level) {
113  base::DictionaryValue* dict = new base::DictionaryValue();
114  dict->Set("headers",
115            SpdyHeaderBlockToListValue(*headers, log_level).release());
116  dict->SetBoolean("fin", fin);
117  dict->SetInteger("stream_id", stream_id);
118  return dict;
119}
120
121base::Value* NetLogSpdySessionCloseCallback(int net_error,
122                                            const std::string* description,
123                                            NetLog::LogLevel /* log_level */) {
124  base::DictionaryValue* dict = new base::DictionaryValue();
125  dict->SetInteger("net_error", net_error);
126  dict->SetString("description", *description);
127  return dict;
128}
129
130base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
131                                       NetLog::LogLevel /* log_level */) {
132  base::DictionaryValue* dict = new base::DictionaryValue();
133  dict->SetString("host", host_pair->first.ToString());
134  dict->SetString("proxy", host_pair->second.ToPacString());
135  return dict;
136}
137
138base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
139                                           const NextProto protocol_version,
140                                           NetLog::LogLevel /* log_level */) {
141  base::DictionaryValue* dict = new base::DictionaryValue();
142  if (source.IsValid()) {
143    source.AddToEventParameters(dict);
144  }
145  dict->SetString("protocol",
146                  SSLClientSocket::NextProtoToString(protocol_version));
147  return dict;
148}
149
150base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
151                                        bool clear_persisted,
152                                        NetLog::LogLevel /* log_level */) {
153  base::DictionaryValue* dict = new base::DictionaryValue();
154  dict->SetString("host", host_port_pair.ToString());
155  dict->SetBoolean("clear_persisted", clear_persisted);
156  return dict;
157}
158
159base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
160                                       const SpdyMajorVersion protocol_version,
161                                       SpdySettingsFlags flags,
162                                       uint32 value,
163                                       NetLog::LogLevel /* log_level */) {
164  base::DictionaryValue* dict = new base::DictionaryValue();
165  dict->SetInteger("id",
166                   SpdyConstants::SerializeSettingId(protocol_version, id));
167  dict->SetInteger("flags", flags);
168  dict->SetInteger("value", value);
169  return dict;
170}
171
172base::Value* NetLogSpdySendSettingsCallback(
173    const SettingsMap* settings,
174    const SpdyMajorVersion protocol_version,
175    NetLog::LogLevel /* log_level */) {
176  base::DictionaryValue* dict = new base::DictionaryValue();
177  base::ListValue* settings_list = new base::ListValue();
178  for (SettingsMap::const_iterator it = settings->begin();
179       it != settings->end(); ++it) {
180    const SpdySettingsIds id = it->first;
181    const SpdySettingsFlags flags = it->second.first;
182    const uint32 value = it->second.second;
183    settings_list->Append(new base::StringValue(base::StringPrintf(
184        "[id:%u flags:%u value:%u]",
185        SpdyConstants::SerializeSettingId(protocol_version, id),
186        flags,
187        value)));
188  }
189  dict->Set("settings", settings_list);
190  return dict;
191}
192
193base::Value* NetLogSpdyWindowUpdateFrameCallback(
194    SpdyStreamId stream_id,
195    uint32 delta,
196    NetLog::LogLevel /* log_level */) {
197  base::DictionaryValue* dict = new base::DictionaryValue();
198  dict->SetInteger("stream_id", static_cast<int>(stream_id));
199  dict->SetInteger("delta", delta);
200  return dict;
201}
202
203base::Value* NetLogSpdySessionWindowUpdateCallback(
204    int32 delta,
205    int32 window_size,
206    NetLog::LogLevel /* log_level */) {
207  base::DictionaryValue* dict = new base::DictionaryValue();
208  dict->SetInteger("delta", delta);
209  dict->SetInteger("window_size", window_size);
210  return dict;
211}
212
213base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
214                                    int size,
215                                    bool fin,
216                                    NetLog::LogLevel /* log_level */) {
217  base::DictionaryValue* dict = new base::DictionaryValue();
218  dict->SetInteger("stream_id", static_cast<int>(stream_id));
219  dict->SetInteger("size", size);
220  dict->SetBoolean("fin", fin);
221  return dict;
222}
223
224base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
225                                   int status,
226                                   const std::string* description,
227                                   NetLog::LogLevel /* log_level */) {
228  base::DictionaryValue* dict = new base::DictionaryValue();
229  dict->SetInteger("stream_id", static_cast<int>(stream_id));
230  dict->SetInteger("status", status);
231  dict->SetString("description", *description);
232  return dict;
233}
234
235base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
236                                    bool is_ack,
237                                    const char* type,
238                                    NetLog::LogLevel /* log_level */) {
239  base::DictionaryValue* dict = new base::DictionaryValue();
240  dict->SetInteger("unique_id", unique_id);
241  dict->SetString("type", type);
242  dict->SetBoolean("is_ack", is_ack);
243  return dict;
244}
245
246base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
247                                      int active_streams,
248                                      int unclaimed_streams,
249                                      SpdyGoAwayStatus status,
250                                      NetLog::LogLevel /* log_level */) {
251  base::DictionaryValue* dict = new base::DictionaryValue();
252  dict->SetInteger("last_accepted_stream_id",
253                   static_cast<int>(last_stream_id));
254  dict->SetInteger("active_streams", active_streams);
255  dict->SetInteger("unclaimed_streams", unclaimed_streams);
256  dict->SetInteger("status", static_cast<int>(status));
257  return dict;
258}
259
260base::Value* NetLogSpdyPushPromiseReceivedCallback(
261    const SpdyHeaderBlock* headers,
262    SpdyStreamId stream_id,
263    SpdyStreamId promised_stream_id,
264    NetLog::LogLevel log_level) {
265  base::DictionaryValue* dict = new base::DictionaryValue();
266  dict->Set("headers",
267            SpdyHeaderBlockToListValue(*headers, log_level).release());
268  dict->SetInteger("id", stream_id);
269  dict->SetInteger("promised_stream_id", promised_stream_id);
270  return dict;
271}
272
273// Helper function to return the total size of an array of objects
274// with .size() member functions.
275template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
276  size_t total_size = 0;
277  for (size_t i = 0; i < N; ++i) {
278    total_size += arr[i].size();
279  }
280  return total_size;
281}
282
283// Helper class for std:find_if on STL container containing
284// SpdyStreamRequest weak pointers.
285class RequestEquals {
286 public:
287  RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
288      : request_(request) {}
289
290  bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
291    return request_.get() == request.get();
292  }
293
294 private:
295  const base::WeakPtr<SpdyStreamRequest> request_;
296};
297
298// The maximum number of concurrent streams we will ever create.  Even if
299// the server permits more, we will never exceed this limit.
300const size_t kMaxConcurrentStreamLimit = 256;
301
302}  // namespace
303
304SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
305    SpdyFramer::SpdyError err) {
306  switch(err) {
307    case SpdyFramer::SPDY_NO_ERROR:
308      return SPDY_ERROR_NO_ERROR;
309    case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
310      return SPDY_ERROR_INVALID_CONTROL_FRAME;
311    case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
312      return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
313    case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
314      return SPDY_ERROR_ZLIB_INIT_FAILURE;
315    case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
316      return SPDY_ERROR_UNSUPPORTED_VERSION;
317    case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
318      return SPDY_ERROR_DECOMPRESS_FAILURE;
319    case SpdyFramer::SPDY_COMPRESS_FAILURE:
320      return SPDY_ERROR_COMPRESS_FAILURE;
321    case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
322      return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
323    case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
324      return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
325    case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
326      return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
327    case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
328      return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
329    case SpdyFramer::SPDY_UNEXPECTED_FRAME:
330      return SPDY_ERROR_UNEXPECTED_FRAME;
331    default:
332      NOTREACHED();
333      return static_cast<SpdyProtocolErrorDetails>(-1);
334  }
335}
336
337Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
338  switch (err) {
339    case SpdyFramer::SPDY_NO_ERROR:
340      return OK;
341    case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
342      return ERR_SPDY_PROTOCOL_ERROR;
343    case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
344      return ERR_SPDY_FRAME_SIZE_ERROR;
345    case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
346      return ERR_SPDY_COMPRESSION_ERROR;
347    case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
348      return ERR_SPDY_PROTOCOL_ERROR;
349    case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
350      return ERR_SPDY_COMPRESSION_ERROR;
351    case SpdyFramer::SPDY_COMPRESS_FAILURE:
352      return ERR_SPDY_COMPRESSION_ERROR;
353    case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
354      return ERR_SPDY_PROTOCOL_ERROR;
355    case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
356      return ERR_SPDY_PROTOCOL_ERROR;
357    case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
358      return ERR_SPDY_PROTOCOL_ERROR;
359    case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
360      return ERR_SPDY_PROTOCOL_ERROR;
361    case SpdyFramer::SPDY_UNEXPECTED_FRAME:
362      return ERR_SPDY_PROTOCOL_ERROR;
363    default:
364      NOTREACHED();
365      return ERR_SPDY_PROTOCOL_ERROR;
366  }
367}
368
369SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
370    SpdyRstStreamStatus status) {
371  switch(status) {
372    case RST_STREAM_PROTOCOL_ERROR:
373      return STATUS_CODE_PROTOCOL_ERROR;
374    case RST_STREAM_INVALID_STREAM:
375      return STATUS_CODE_INVALID_STREAM;
376    case RST_STREAM_REFUSED_STREAM:
377      return STATUS_CODE_REFUSED_STREAM;
378    case RST_STREAM_UNSUPPORTED_VERSION:
379      return STATUS_CODE_UNSUPPORTED_VERSION;
380    case RST_STREAM_CANCEL:
381      return STATUS_CODE_CANCEL;
382    case RST_STREAM_INTERNAL_ERROR:
383      return STATUS_CODE_INTERNAL_ERROR;
384    case RST_STREAM_FLOW_CONTROL_ERROR:
385      return STATUS_CODE_FLOW_CONTROL_ERROR;
386    case RST_STREAM_STREAM_IN_USE:
387      return STATUS_CODE_STREAM_IN_USE;
388    case RST_STREAM_STREAM_ALREADY_CLOSED:
389      return STATUS_CODE_STREAM_ALREADY_CLOSED;
390    case RST_STREAM_INVALID_CREDENTIALS:
391      return STATUS_CODE_INVALID_CREDENTIALS;
392    case RST_STREAM_FRAME_SIZE_ERROR:
393      return STATUS_CODE_FRAME_SIZE_ERROR;
394    case RST_STREAM_SETTINGS_TIMEOUT:
395      return STATUS_CODE_SETTINGS_TIMEOUT;
396    case RST_STREAM_CONNECT_ERROR:
397      return STATUS_CODE_CONNECT_ERROR;
398    case RST_STREAM_ENHANCE_YOUR_CALM:
399      return STATUS_CODE_ENHANCE_YOUR_CALM;
400    default:
401      NOTREACHED();
402      return static_cast<SpdyProtocolErrorDetails>(-1);
403  }
404}
405
406SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
407  switch (err) {
408    case OK:
409      return GOAWAY_NO_ERROR;
410    case ERR_SPDY_PROTOCOL_ERROR:
411      return GOAWAY_PROTOCOL_ERROR;
412    case ERR_SPDY_FLOW_CONTROL_ERROR:
413      return GOAWAY_FLOW_CONTROL_ERROR;
414    case ERR_SPDY_FRAME_SIZE_ERROR:
415      return GOAWAY_FRAME_SIZE_ERROR;
416    case ERR_SPDY_COMPRESSION_ERROR:
417      return GOAWAY_COMPRESSION_ERROR;
418    case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
419      return GOAWAY_INADEQUATE_SECURITY;
420    default:
421      return GOAWAY_PROTOCOL_ERROR;
422  }
423}
424
425void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
426                                            SpdyMajorVersion protocol_version,
427                                            SpdyHeaderBlock* request_headers,
428                                            SpdyHeaderBlock* response_headers) {
429  DCHECK(response_headers);
430  DCHECK(request_headers);
431  for (SpdyHeaderBlock::const_iterator it = headers.begin();
432       it != headers.end();
433       ++it) {
434    SpdyHeaderBlock* to_insert = response_headers;
435    if (protocol_version == SPDY2) {
436      if (it->first == "url")
437        to_insert = request_headers;
438    } else {
439      const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
440      static const char* scheme = ":scheme";
441      static const char* path = ":path";
442      if (it->first == host || it->first == scheme || it->first == path)
443        to_insert = request_headers;
444    }
445    to_insert->insert(*it);
446  }
447}
448
449SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
450  Reset();
451}
452
453SpdyStreamRequest::~SpdyStreamRequest() {
454  CancelRequest();
455}
456
457int SpdyStreamRequest::StartRequest(
458    SpdyStreamType type,
459    const base::WeakPtr<SpdySession>& session,
460    const GURL& url,
461    RequestPriority priority,
462    const BoundNetLog& net_log,
463    const CompletionCallback& callback) {
464  DCHECK(session);
465  DCHECK(!session_);
466  DCHECK(!stream_);
467  DCHECK(callback_.is_null());
468
469  type_ = type;
470  session_ = session;
471  url_ = url;
472  priority_ = priority;
473  net_log_ = net_log;
474  callback_ = callback;
475
476  base::WeakPtr<SpdyStream> stream;
477  int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
478  if (rv == OK) {
479    Reset();
480    stream_ = stream;
481  }
482  return rv;
483}
484
485void SpdyStreamRequest::CancelRequest() {
486  if (session_)
487    session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
488  Reset();
489  // Do this to cancel any pending CompleteStreamRequest() tasks.
490  weak_ptr_factory_.InvalidateWeakPtrs();
491}
492
493base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
494  DCHECK(!session_);
495  base::WeakPtr<SpdyStream> stream = stream_;
496  DCHECK(stream);
497  Reset();
498  return stream;
499}
500
501void SpdyStreamRequest::OnRequestCompleteSuccess(
502    const base::WeakPtr<SpdyStream>& stream) {
503  DCHECK(session_);
504  DCHECK(!stream_);
505  DCHECK(!callback_.is_null());
506  CompletionCallback callback = callback_;
507  Reset();
508  DCHECK(stream);
509  stream_ = stream;
510  callback.Run(OK);
511}
512
513void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
514  DCHECK(session_);
515  DCHECK(!stream_);
516  DCHECK(!callback_.is_null());
517  CompletionCallback callback = callback_;
518  Reset();
519  DCHECK_NE(rv, OK);
520  callback.Run(rv);
521}
522
523void SpdyStreamRequest::Reset() {
524  type_ = SPDY_BIDIRECTIONAL_STREAM;
525  session_.reset();
526  stream_.reset();
527  url_ = GURL();
528  priority_ = MINIMUM_PRIORITY;
529  net_log_ = BoundNetLog();
530  callback_.Reset();
531}
532
533SpdySession::ActiveStreamInfo::ActiveStreamInfo()
534    : stream(NULL),
535      waiting_for_syn_reply(false) {}
536
537SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
538    : stream(stream),
539      waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
540}
541
542SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
543
544SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
545
546SpdySession::PushedStreamInfo::PushedStreamInfo(
547    SpdyStreamId stream_id,
548    base::TimeTicks creation_time)
549    : stream_id(stream_id),
550      creation_time(creation_time) {}
551
552SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
553
554// static
555bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
556                          const SSLInfo& ssl_info,
557                          const std::string& old_hostname,
558                          const std::string& new_hostname) {
559  // Pooling is prohibited if the server cert is not valid for the new domain,
560  // and for connections on which client certs were sent. It is also prohibited
561  // when channel ID was sent if the hosts are from different eTLDs+1.
562  if (IsCertStatusError(ssl_info.cert_status))
563    return false;
564
565  if (ssl_info.client_cert_sent)
566    return false;
567
568  if (ssl_info.channel_id_sent &&
569      ChannelIDService::GetDomainForHost(new_hostname) !=
570      ChannelIDService::GetDomainForHost(old_hostname)) {
571    return false;
572  }
573
574  bool unused = false;
575  if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
576    return false;
577
578  std::string pinning_failure_log;
579  if (!transport_security_state->CheckPublicKeyPins(
580          new_hostname,
581          ssl_info.is_issued_by_known_root,
582          ssl_info.public_key_hashes,
583          &pinning_failure_log)) {
584    return false;
585  }
586
587  return true;
588}
589
590SpdySession::SpdySession(
591    const SpdySessionKey& spdy_session_key,
592    const base::WeakPtr<HttpServerProperties>& http_server_properties,
593    TransportSecurityState* transport_security_state,
594    bool verify_domain_authentication,
595    bool enable_sending_initial_data,
596    bool enable_compression,
597    bool enable_ping_based_connection_checking,
598    NextProto default_protocol,
599    size_t stream_initial_recv_window_size,
600    size_t initial_max_concurrent_streams,
601    size_t max_concurrent_streams_limit,
602    TimeFunc time_func,
603    const HostPortPair& trusted_spdy_proxy,
604    NetLog* net_log)
605    : in_io_loop_(false),
606      spdy_session_key_(spdy_session_key),
607      pool_(NULL),
608      http_server_properties_(http_server_properties),
609      transport_security_state_(transport_security_state),
610      read_buffer_(new IOBuffer(kReadBufferSize)),
611      stream_hi_water_mark_(kFirstStreamId),
612      last_accepted_push_stream_id_(0),
613      num_pushed_streams_(0u),
614      num_active_pushed_streams_(0u),
615      in_flight_write_frame_type_(DATA),
616      in_flight_write_frame_size_(0),
617      is_secure_(false),
618      certificate_error_code_(OK),
619      availability_state_(STATE_AVAILABLE),
620      read_state_(READ_STATE_DO_READ),
621      write_state_(WRITE_STATE_IDLE),
622      error_on_close_(OK),
623      max_concurrent_streams_(initial_max_concurrent_streams == 0
624                                  ? kInitialMaxConcurrentStreams
625                                  : initial_max_concurrent_streams),
626      max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
627                                        ? kMaxConcurrentStreamLimit
628                                        : max_concurrent_streams_limit),
629      max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
630      streams_initiated_count_(0),
631      streams_pushed_count_(0),
632      streams_pushed_and_claimed_count_(0),
633      streams_abandoned_count_(0),
634      total_bytes_received_(0),
635      sent_settings_(false),
636      received_settings_(false),
637      stalled_streams_(0),
638      pings_in_flight_(0),
639      next_ping_id_(1),
640      last_activity_time_(time_func()),
641      last_compressed_frame_len_(0),
642      check_ping_status_pending_(false),
643      send_connection_header_prefix_(false),
644      flow_control_state_(FLOW_CONTROL_NONE),
645      stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
646      stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
647                                           ? kDefaultInitialRecvWindowSize
648                                           : stream_initial_recv_window_size),
649      session_send_window_size_(0),
650      session_recv_window_size_(0),
651      session_unacked_recv_window_bytes_(0),
652      net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
653      verify_domain_authentication_(verify_domain_authentication),
654      enable_sending_initial_data_(enable_sending_initial_data),
655      enable_compression_(enable_compression),
656      enable_ping_based_connection_checking_(
657          enable_ping_based_connection_checking),
658      protocol_(default_protocol),
659      connection_at_risk_of_loss_time_(
660          base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
661      hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
662      trusted_spdy_proxy_(trusted_spdy_proxy),
663      time_func_(time_func),
664      weak_factory_(this) {
665  DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
666  DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
667  DCHECK(HttpStreamFactory::spdy_enabled());
668  net_log_.BeginEvent(
669      NetLog::TYPE_SPDY_SESSION,
670      base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
671  next_unclaimed_push_stream_sweep_time_ = time_func_() +
672      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
673  // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
674}
675
676SpdySession::~SpdySession() {
677  CHECK(!in_io_loop_);
678  DcheckDraining();
679
680  // TODO(akalin): Check connection->is_initialized() instead. This
681  // requires re-working CreateFakeSpdySession(), though.
682  DCHECK(connection_->socket());
683  // With SPDY we can't recycle sockets.
684  connection_->socket()->Disconnect();
685
686  RecordHistograms();
687
688  net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
689}
690
691void SpdySession::InitializeWithSocket(
692    scoped_ptr<ClientSocketHandle> connection,
693    SpdySessionPool* pool,
694    bool is_secure,
695    int certificate_error_code) {
696  CHECK(!in_io_loop_);
697  DCHECK_EQ(availability_state_, STATE_AVAILABLE);
698  DCHECK_EQ(read_state_, READ_STATE_DO_READ);
699  DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
700  DCHECK(!connection_);
701
702  DCHECK(certificate_error_code == OK ||
703         certificate_error_code < ERR_IO_PENDING);
704  // TODO(akalin): Check connection->is_initialized() instead. This
705  // requires re-working CreateFakeSpdySession(), though.
706  DCHECK(connection->socket());
707
708  base::StatsCounter spdy_sessions("spdy.sessions");
709  spdy_sessions.Increment();
710
711  connection_ = connection.Pass();
712  is_secure_ = is_secure;
713  certificate_error_code_ = certificate_error_code;
714
715  NextProto protocol_negotiated =
716      connection_->socket()->GetNegotiatedProtocol();
717  if (protocol_negotiated != kProtoUnknown) {
718    protocol_ = protocol_negotiated;
719  }
720  DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
721  DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
722
723  if (protocol_ == kProtoSPDY4)
724    send_connection_header_prefix_ = true;
725
726  if (protocol_ >= kProtoSPDY31) {
727    flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
728    session_send_window_size_ = kSpdySessionInitialWindowSize;
729    session_recv_window_size_ = kSpdySessionInitialWindowSize;
730  } else if (protocol_ >= kProtoSPDY3) {
731    flow_control_state_ = FLOW_CONTROL_STREAM;
732  } else {
733    flow_control_state_ = FLOW_CONTROL_NONE;
734  }
735
736  buffered_spdy_framer_.reset(
737      new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
738                             enable_compression_));
739  buffered_spdy_framer_->set_visitor(this);
740  buffered_spdy_framer_->set_debug_visitor(this);
741  UMA_HISTOGRAM_ENUMERATION(
742      "Net.SpdyVersion2",
743      protocol_ - kProtoSPDYMinimumVersion,
744      kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
745
746  net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
747                    base::Bind(&NetLogSpdyInitializedCallback,
748                               connection_->socket()->NetLog().source(),
749                               protocol_));
750
751  DCHECK_EQ(availability_state_, STATE_AVAILABLE);
752  connection_->AddHigherLayeredPool(this);
753  if (enable_sending_initial_data_)
754    SendInitialData();
755  pool_ = pool;
756
757  // Bootstrap the read loop.
758  base::MessageLoop::current()->PostTask(
759      FROM_HERE,
760      base::Bind(&SpdySession::PumpReadLoop,
761                 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
762}
763
764bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
765  if (!verify_domain_authentication_)
766    return true;
767
768  if (availability_state_ == STATE_DRAINING)
769    return false;
770
771  SSLInfo ssl_info;
772  bool was_npn_negotiated;
773  NextProto protocol_negotiated = kProtoUnknown;
774  if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
775    return true;   // This is not a secure session, so all domains are okay.
776
777  return CanPool(transport_security_state_, ssl_info,
778                 host_port_pair().host(), domain);
779}
780
781int SpdySession::GetPushStream(
782    const GURL& url,
783    base::WeakPtr<SpdyStream>* stream,
784    const BoundNetLog& stream_net_log) {
785  CHECK(!in_io_loop_);
786
787  stream->reset();
788
789  if (availability_state_ == STATE_DRAINING)
790    return ERR_CONNECTION_CLOSED;
791
792  Error err = TryAccessStream(url);
793  if (err != OK)
794    return err;
795
796  *stream = GetActivePushStream(url);
797  if (*stream) {
798    DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
799    streams_pushed_and_claimed_count_++;
800  }
801  return OK;
802}
803
804// {,Try}CreateStream() and TryAccessStream() can be called with
805// |in_io_loop_| set if a stream is being created in response to
806// another being closed due to received data.
807
808Error SpdySession::TryAccessStream(const GURL& url) {
809  if (is_secure_ && certificate_error_code_ != OK &&
810      (url.SchemeIs("https") || url.SchemeIs("wss"))) {
811    RecordProtocolErrorHistogram(
812        PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
813    DoDrainSession(
814        static_cast<Error>(certificate_error_code_),
815        "Tried to get SPDY stream for secure content over an unauthenticated "
816        "session.");
817    return ERR_SPDY_PROTOCOL_ERROR;
818  }
819  return OK;
820}
821
822int SpdySession::TryCreateStream(
823    const base::WeakPtr<SpdyStreamRequest>& request,
824    base::WeakPtr<SpdyStream>* stream) {
825  DCHECK(request);
826
827  if (availability_state_ == STATE_GOING_AWAY)
828    return ERR_FAILED;
829
830  if (availability_state_ == STATE_DRAINING)
831    return ERR_CONNECTION_CLOSED;
832
833  Error err = TryAccessStream(request->url());
834  if (err != OK)
835    return err;
836
837  if (!max_concurrent_streams_ ||
838      (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
839       max_concurrent_streams_)) {
840    return CreateStream(*request, stream);
841  }
842
843  stalled_streams_++;
844  net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
845  RequestPriority priority = request->priority();
846  CHECK_GE(priority, MINIMUM_PRIORITY);
847  CHECK_LE(priority, MAXIMUM_PRIORITY);
848  pending_create_stream_queues_[priority].push_back(request);
849  return ERR_IO_PENDING;
850}
851
852int SpdySession::CreateStream(const SpdyStreamRequest& request,
853                              base::WeakPtr<SpdyStream>* stream) {
854  DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
855  DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
856
857  if (availability_state_ == STATE_GOING_AWAY)
858    return ERR_FAILED;
859
860  if (availability_state_ == STATE_DRAINING)
861    return ERR_CONNECTION_CLOSED;
862
863  Error err = TryAccessStream(request.url());
864  if (err != OK) {
865    // This should have been caught in TryCreateStream().
866    NOTREACHED();
867    return err;
868  }
869
870  DCHECK(connection_->socket());
871  DCHECK(connection_->socket()->IsConnected());
872  if (connection_->socket()) {
873    UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
874                          connection_->socket()->IsConnected());
875    if (!connection_->socket()->IsConnected()) {
876      DoDrainSession(
877          ERR_CONNECTION_CLOSED,
878          "Tried to create SPDY stream for a closed socket connection.");
879      return ERR_CONNECTION_CLOSED;
880    }
881  }
882
883  scoped_ptr<SpdyStream> new_stream(
884      new SpdyStream(request.type(), GetWeakPtr(), request.url(),
885                     request.priority(),
886                     stream_initial_send_window_size_,
887                     stream_initial_recv_window_size_,
888                     request.net_log()));
889  *stream = new_stream->GetWeakPtr();
890  InsertCreatedStream(new_stream.Pass());
891
892  UMA_HISTOGRAM_CUSTOM_COUNTS(
893      "Net.SpdyPriorityCount",
894      static_cast<int>(request.priority()), 0, 10, 11);
895
896  return OK;
897}
898
899void SpdySession::CancelStreamRequest(
900    const base::WeakPtr<SpdyStreamRequest>& request) {
901  DCHECK(request);
902  RequestPriority priority = request->priority();
903  CHECK_GE(priority, MINIMUM_PRIORITY);
904  CHECK_LE(priority, MAXIMUM_PRIORITY);
905
906#if DCHECK_IS_ON
907  // |request| should not be in a queue not matching its priority.
908  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
909    if (priority == i)
910      continue;
911    PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
912    DCHECK(std::find_if(queue->begin(),
913                        queue->end(),
914                        RequestEquals(request)) == queue->end());
915  }
916#endif
917
918  PendingStreamRequestQueue* queue =
919      &pending_create_stream_queues_[priority];
920  // Remove |request| from |queue| while preserving the order of the
921  // other elements.
922  PendingStreamRequestQueue::iterator it =
923      std::find_if(queue->begin(), queue->end(), RequestEquals(request));
924  // The request may already be removed if there's a
925  // CompleteStreamRequest() in flight.
926  if (it != queue->end()) {
927    it = queue->erase(it);
928    // |request| should be in the queue at most once, and if it is
929    // present, should not be pending completion.
930    DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
931           queue->end());
932  }
933}
934
935base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
936  for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
937    if (pending_create_stream_queues_[j].empty())
938      continue;
939
940    base::WeakPtr<SpdyStreamRequest> pending_request =
941        pending_create_stream_queues_[j].front();
942    DCHECK(pending_request);
943    pending_create_stream_queues_[j].pop_front();
944    return pending_request;
945  }
946  return base::WeakPtr<SpdyStreamRequest>();
947}
948
949void SpdySession::ProcessPendingStreamRequests() {
950  // Like |max_concurrent_streams_|, 0 means infinite for
951  // |max_requests_to_process|.
952  size_t max_requests_to_process = 0;
953  if (max_concurrent_streams_ != 0) {
954    max_requests_to_process =
955        max_concurrent_streams_ -
956        (active_streams_.size() + created_streams_.size());
957  }
958  for (size_t i = 0;
959       max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
960    base::WeakPtr<SpdyStreamRequest> pending_request =
961        GetNextPendingStreamRequest();
962    if (!pending_request)
963      break;
964
965    // Note that this post can race with other stream creations, and it's
966    // possible that the un-stalled stream will be stalled again if it loses.
967    // TODO(jgraettinger): Provide stronger ordering guarantees.
968    base::MessageLoop::current()->PostTask(
969        FROM_HERE,
970        base::Bind(&SpdySession::CompleteStreamRequest,
971                   weak_factory_.GetWeakPtr(),
972                   pending_request));
973  }
974}
975
976void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
977  pooled_aliases_.insert(alias_key);
978}
979
980SpdyMajorVersion SpdySession::GetProtocolVersion() const {
981  DCHECK(buffered_spdy_framer_.get());
982  return buffered_spdy_framer_->protocol_version();
983}
984
985bool SpdySession::HasAcceptableTransportSecurity() const {
986  // If we're not even using TLS, we have no standards to meet.
987  if (!is_secure_) {
988    return true;
989  }
990
991  // We don't enforce transport security standards for older SPDY versions.
992  if (GetProtocolVersion() < SPDY4) {
993    return true;
994  }
995
996  SSLInfo ssl_info;
997  CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
998
999  // HTTP/2 requires TLS 1.2+
1000  if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
1001      SSL_CONNECTION_VERSION_TLS1_2) {
1002    return false;
1003  }
1004
1005  if (!IsSecureTLSCipherSuite(
1006          SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
1007    return false;
1008  }
1009
1010  return true;
1011}
1012
1013base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
1014  return weak_factory_.GetWeakPtr();
1015}
1016
1017bool SpdySession::CloseOneIdleConnection() {
1018  CHECK(!in_io_loop_);
1019  DCHECK(pool_);
1020  if (active_streams_.empty()) {
1021    DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1022  }
1023  // Return false as the socket wasn't immediately closed.
1024  return false;
1025}
1026
1027void SpdySession::EnqueueStreamWrite(
1028    const base::WeakPtr<SpdyStream>& stream,
1029    SpdyFrameType frame_type,
1030    scoped_ptr<SpdyBufferProducer> producer) {
1031  DCHECK(frame_type == HEADERS ||
1032         frame_type == DATA ||
1033         frame_type == CREDENTIAL ||
1034         frame_type == SYN_STREAM);
1035  EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1036}
1037
1038scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1039    SpdyStreamId stream_id,
1040    RequestPriority priority,
1041    SpdyControlFlags flags,
1042    const SpdyHeaderBlock& block) {
1043  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1044  CHECK(it != active_streams_.end());
1045  CHECK_EQ(it->second.stream->stream_id(), stream_id);
1046
1047  SendPrefacePingIfNoneInFlight();
1048
1049  DCHECK(buffered_spdy_framer_.get());
1050  SpdyPriority spdy_priority =
1051      ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1052
1053  scoped_ptr<SpdyFrame> syn_frame;
1054  // TODO(hkhalil): Avoid copy of |block|.
1055  if (GetProtocolVersion() <= SPDY3) {
1056    SpdySynStreamIR syn_stream(stream_id);
1057    syn_stream.set_associated_to_stream_id(0);
1058    syn_stream.set_priority(spdy_priority);
1059    syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1060    syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1061    syn_stream.set_name_value_block(block);
1062    syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1063  } else {
1064    SpdyHeadersIR headers(stream_id);
1065    headers.set_priority(spdy_priority);
1066    headers.set_has_priority(true);
1067    headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1068    headers.set_name_value_block(block);
1069    syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1070  }
1071
1072  base::StatsCounter spdy_requests("spdy.requests");
1073  spdy_requests.Increment();
1074  streams_initiated_count_++;
1075
1076  if (net_log().IsLogging()) {
1077    net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1078                       base::Bind(&NetLogSpdySynStreamSentCallback,
1079                                  &block,
1080                                  (flags & CONTROL_FLAG_FIN) != 0,
1081                                  (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1082                                  spdy_priority,
1083                                  stream_id));
1084  }
1085
1086  return syn_frame.Pass();
1087}
1088
1089scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1090                                                     IOBuffer* data,
1091                                                     int len,
1092                                                     SpdyDataFlags flags) {
1093  if (availability_state_ == STATE_DRAINING) {
1094    return scoped_ptr<SpdyBuffer>();
1095  }
1096
1097  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1098  CHECK(it != active_streams_.end());
1099  SpdyStream* stream = it->second.stream;
1100  CHECK_EQ(stream->stream_id(), stream_id);
1101
1102  if (len < 0) {
1103    NOTREACHED();
1104    return scoped_ptr<SpdyBuffer>();
1105  }
1106
1107  int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1108
1109  bool send_stalled_by_stream =
1110      (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1111      (stream->send_window_size() <= 0);
1112  bool send_stalled_by_session = IsSendStalled();
1113
1114  // NOTE: There's an enum of the same name in histograms.xml.
1115  enum SpdyFrameFlowControlState {
1116    SEND_NOT_STALLED,
1117    SEND_STALLED_BY_STREAM,
1118    SEND_STALLED_BY_SESSION,
1119    SEND_STALLED_BY_STREAM_AND_SESSION,
1120  };
1121
1122  SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1123  if (send_stalled_by_stream) {
1124    if (send_stalled_by_session) {
1125      frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1126    } else {
1127      frame_flow_control_state = SEND_STALLED_BY_STREAM;
1128    }
1129  } else if (send_stalled_by_session) {
1130    frame_flow_control_state = SEND_STALLED_BY_SESSION;
1131  }
1132
1133  if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1134    UMA_HISTOGRAM_ENUMERATION(
1135        "Net.SpdyFrameStreamFlowControlState",
1136        frame_flow_control_state,
1137        SEND_STALLED_BY_STREAM + 1);
1138  } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1139    UMA_HISTOGRAM_ENUMERATION(
1140        "Net.SpdyFrameStreamAndSessionFlowControlState",
1141        frame_flow_control_state,
1142        SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1143  }
1144
1145  // Obey send window size of the stream if stream flow control is
1146  // enabled.
1147  if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1148    if (send_stalled_by_stream) {
1149      stream->set_send_stalled_by_flow_control(true);
1150      // Even though we're currently stalled only by the stream, we
1151      // might end up being stalled by the session also.
1152      QueueSendStalledStream(*stream);
1153      net_log().AddEvent(
1154          NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1155          NetLog::IntegerCallback("stream_id", stream_id));
1156      return scoped_ptr<SpdyBuffer>();
1157    }
1158
1159    effective_len = std::min(effective_len, stream->send_window_size());
1160  }
1161
1162  // Obey send window size of the session if session flow control is
1163  // enabled.
1164  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1165    if (send_stalled_by_session) {
1166      stream->set_send_stalled_by_flow_control(true);
1167      QueueSendStalledStream(*stream);
1168      net_log().AddEvent(
1169          NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1170          NetLog::IntegerCallback("stream_id", stream_id));
1171      return scoped_ptr<SpdyBuffer>();
1172    }
1173
1174    effective_len = std::min(effective_len, session_send_window_size_);
1175  }
1176
1177  DCHECK_GE(effective_len, 0);
1178
1179  // Clear FIN flag if only some of the data will be in the data
1180  // frame.
1181  if (effective_len < len)
1182    flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1183
1184  if (net_log().IsLogging()) {
1185    net_log().AddEvent(
1186        NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1187        base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1188                   (flags & DATA_FLAG_FIN) != 0));
1189  }
1190
1191  // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1192  if (effective_len > 0)
1193    SendPrefacePingIfNoneInFlight();
1194
1195  // TODO(mbelshe): reduce memory copies here.
1196  DCHECK(buffered_spdy_framer_.get());
1197  scoped_ptr<SpdyFrame> frame(
1198      buffered_spdy_framer_->CreateDataFrame(
1199          stream_id, data->data(),
1200          static_cast<uint32>(effective_len), flags));
1201
1202  scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1203
1204  // Send window size is based on payload size, so nothing to do if this is
1205  // just a FIN with no payload.
1206  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
1207      effective_len != 0) {
1208    DecreaseSendWindowSize(static_cast<int32>(effective_len));
1209    data_buffer->AddConsumeCallback(
1210        base::Bind(&SpdySession::OnWriteBufferConsumed,
1211                   weak_factory_.GetWeakPtr(),
1212                   static_cast<size_t>(effective_len)));
1213  }
1214
1215  return data_buffer.Pass();
1216}
1217
1218void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1219  DCHECK_NE(stream_id, 0u);
1220
1221  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1222  if (it == active_streams_.end()) {
1223    NOTREACHED();
1224    return;
1225  }
1226
1227  CloseActiveStreamIterator(it, status);
1228}
1229
1230void SpdySession::CloseCreatedStream(
1231    const base::WeakPtr<SpdyStream>& stream, int status) {
1232  DCHECK_EQ(stream->stream_id(), 0u);
1233
1234  CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1235  if (it == created_streams_.end()) {
1236    NOTREACHED();
1237    return;
1238  }
1239
1240  CloseCreatedStreamIterator(it, status);
1241}
1242
1243void SpdySession::ResetStream(SpdyStreamId stream_id,
1244                              SpdyRstStreamStatus status,
1245                              const std::string& description) {
1246  DCHECK_NE(stream_id, 0u);
1247
1248  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1249  if (it == active_streams_.end()) {
1250    NOTREACHED();
1251    return;
1252  }
1253
1254  ResetStreamIterator(it, status, description);
1255}
1256
1257bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1258  return ContainsKey(active_streams_, stream_id);
1259}
1260
1261LoadState SpdySession::GetLoadState() const {
1262  // Just report that we're idle since the session could be doing
1263  // many things concurrently.
1264  return LOAD_STATE_IDLE;
1265}
1266
1267void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1268                                            int status) {
1269  // TODO(mbelshe): We should send a RST_STREAM control frame here
1270  //                so that the server can cancel a large send.
1271
1272  scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1273  active_streams_.erase(it);
1274
1275  // TODO(akalin): When SpdyStream was ref-counted (and
1276  // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1277  // was only done when status was not OK. This meant that pushed
1278  // streams can still be claimed after they're closed. This is
1279  // probably something that we still want to support, although server
1280  // push is hardly used. Write tests for this and fix this. (See
1281  // http://crbug.com/261712 .)
1282  if (owned_stream->type() == SPDY_PUSH_STREAM) {
1283    unclaimed_pushed_streams_.erase(owned_stream->url());
1284    num_pushed_streams_--;
1285    if (!owned_stream->IsReservedRemote())
1286      num_active_pushed_streams_--;
1287  }
1288
1289  DeleteStream(owned_stream.Pass(), status);
1290  MaybeFinishGoingAway();
1291
1292  // If there are no active streams and the socket pool is stalled, close the
1293  // session to free up a socket slot.
1294  if (active_streams_.empty() && connection_->IsPoolStalled()) {
1295    DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1296  }
1297}
1298
1299void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1300                                             int status) {
1301  scoped_ptr<SpdyStream> owned_stream(*it);
1302  created_streams_.erase(it);
1303  DeleteStream(owned_stream.Pass(), status);
1304}
1305
1306void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1307                                      SpdyRstStreamStatus status,
1308                                      const std::string& description) {
1309  // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1310  // may close us.
1311  SpdyStreamId stream_id = it->first;
1312  RequestPriority priority = it->second.stream->priority();
1313  EnqueueResetStreamFrame(stream_id, priority, status, description);
1314
1315  // Removes any pending writes for the stream except for possibly an
1316  // in-flight one.
1317  CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1318}
1319
1320void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1321                                          RequestPriority priority,
1322                                          SpdyRstStreamStatus status,
1323                                          const std::string& description) {
1324  DCHECK_NE(stream_id, 0u);
1325
1326  net_log().AddEvent(
1327      NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1328      base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1329
1330  DCHECK(buffered_spdy_framer_.get());
1331  scoped_ptr<SpdyFrame> rst_frame(
1332      buffered_spdy_framer_->CreateRstStream(stream_id, status));
1333
1334  EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1335  RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1336}
1337
1338void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1339  CHECK(!in_io_loop_);
1340  if (availability_state_ == STATE_DRAINING) {
1341    return;
1342  }
1343  ignore_result(DoReadLoop(expected_read_state, result));
1344}
1345
1346int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1347  CHECK(!in_io_loop_);
1348  CHECK_EQ(read_state_, expected_read_state);
1349
1350  in_io_loop_ = true;
1351
1352  int bytes_read_without_yielding = 0;
1353
1354  // Loop until the session is draining, the read becomes blocked, or
1355  // the read limit is exceeded.
1356  while (true) {
1357    switch (read_state_) {
1358      case READ_STATE_DO_READ:
1359        CHECK_EQ(result, OK);
1360        result = DoRead();
1361        break;
1362      case READ_STATE_DO_READ_COMPLETE:
1363        if (result > 0)
1364          bytes_read_without_yielding += result;
1365        result = DoReadComplete(result);
1366        break;
1367      default:
1368        NOTREACHED() << "read_state_: " << read_state_;
1369        break;
1370    }
1371
1372    if (availability_state_ == STATE_DRAINING)
1373      break;
1374
1375    if (result == ERR_IO_PENDING)
1376      break;
1377
1378    if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1379      read_state_ = READ_STATE_DO_READ;
1380      base::MessageLoop::current()->PostTask(
1381          FROM_HERE,
1382          base::Bind(&SpdySession::PumpReadLoop,
1383                     weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1384      result = ERR_IO_PENDING;
1385      break;
1386    }
1387  }
1388
1389  CHECK(in_io_loop_);
1390  in_io_loop_ = false;
1391
1392  return result;
1393}
1394
1395int SpdySession::DoRead() {
1396  CHECK(in_io_loop_);
1397
1398  CHECK(connection_);
1399  CHECK(connection_->socket());
1400  read_state_ = READ_STATE_DO_READ_COMPLETE;
1401  return connection_->socket()->Read(
1402      read_buffer_.get(),
1403      kReadBufferSize,
1404      base::Bind(&SpdySession::PumpReadLoop,
1405                 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1406}
1407
1408int SpdySession::DoReadComplete(int result) {
1409  CHECK(in_io_loop_);
1410
1411  // Parse a frame.  For now this code requires that the frame fit into our
1412  // buffer (kReadBufferSize).
1413  // TODO(mbelshe): support arbitrarily large frames!
1414
1415  if (result == 0) {
1416    UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1417                                total_bytes_received_, 1, 100000000, 50);
1418    DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1419
1420    return ERR_CONNECTION_CLOSED;
1421  }
1422
1423  if (result < 0) {
1424    DoDrainSession(static_cast<Error>(result), "result is < 0.");
1425    return result;
1426  }
1427  CHECK_LE(result, kReadBufferSize);
1428  total_bytes_received_ += result;
1429
1430  last_activity_time_ = time_func_();
1431
1432  DCHECK(buffered_spdy_framer_.get());
1433  char* data = read_buffer_->data();
1434  while (result > 0) {
1435    uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1436    result -= bytes_processed;
1437    data += bytes_processed;
1438
1439    if (availability_state_ == STATE_DRAINING) {
1440      return ERR_CONNECTION_CLOSED;
1441    }
1442
1443    DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1444  }
1445
1446  read_state_ = READ_STATE_DO_READ;
1447  return OK;
1448}
1449
1450void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1451  CHECK(!in_io_loop_);
1452  DCHECK_EQ(write_state_, expected_write_state);
1453
1454  DoWriteLoop(expected_write_state, result);
1455
1456  if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1457      write_queue_.IsEmpty()) {
1458    pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
1459    return;
1460  }
1461}
1462
1463int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1464  CHECK(!in_io_loop_);
1465  DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1466  DCHECK_EQ(write_state_, expected_write_state);
1467
1468  in_io_loop_ = true;
1469
1470  // Loop until the session is closed or the write becomes blocked.
1471  while (true) {
1472    switch (write_state_) {
1473      case WRITE_STATE_DO_WRITE:
1474        DCHECK_EQ(result, OK);
1475        result = DoWrite();
1476        break;
1477      case WRITE_STATE_DO_WRITE_COMPLETE:
1478        result = DoWriteComplete(result);
1479        break;
1480      case WRITE_STATE_IDLE:
1481      default:
1482        NOTREACHED() << "write_state_: " << write_state_;
1483        break;
1484    }
1485
1486    if (write_state_ == WRITE_STATE_IDLE) {
1487      DCHECK_EQ(result, ERR_IO_PENDING);
1488      break;
1489    }
1490
1491    if (result == ERR_IO_PENDING)
1492      break;
1493  }
1494
1495  CHECK(in_io_loop_);
1496  in_io_loop_ = false;
1497
1498  return result;
1499}
1500
1501int SpdySession::DoWrite() {
1502  CHECK(in_io_loop_);
1503
1504  DCHECK(buffered_spdy_framer_);
1505  if (in_flight_write_) {
1506    DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1507  } else {
1508    // Grab the next frame to send.
1509    SpdyFrameType frame_type = DATA;
1510    scoped_ptr<SpdyBufferProducer> producer;
1511    base::WeakPtr<SpdyStream> stream;
1512    if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1513      write_state_ = WRITE_STATE_IDLE;
1514      return ERR_IO_PENDING;
1515    }
1516
1517    if (stream.get())
1518      CHECK(!stream->IsClosed());
1519
1520    // Activate the stream only when sending the SYN_STREAM frame to
1521    // guarantee monotonically-increasing stream IDs.
1522    if (frame_type == SYN_STREAM) {
1523      CHECK(stream.get());
1524      CHECK_EQ(stream->stream_id(), 0u);
1525      scoped_ptr<SpdyStream> owned_stream =
1526          ActivateCreatedStream(stream.get());
1527      InsertActivatedStream(owned_stream.Pass());
1528
1529      if (stream_hi_water_mark_ > kLastStreamId) {
1530        CHECK_EQ(stream->stream_id(), kLastStreamId);
1531        // We've exhausted the stream ID space, and no new streams may be
1532        // created after this one.
1533        MakeUnavailable();
1534        StartGoingAway(kLastStreamId, ERR_ABORTED);
1535      }
1536    }
1537
1538    in_flight_write_ = producer->ProduceBuffer();
1539    if (!in_flight_write_) {
1540      NOTREACHED();
1541      return ERR_UNEXPECTED;
1542    }
1543    in_flight_write_frame_type_ = frame_type;
1544    in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1545    DCHECK_GE(in_flight_write_frame_size_,
1546              buffered_spdy_framer_->GetFrameMinimumSize());
1547    in_flight_write_stream_ = stream;
1548  }
1549
1550  write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1551
1552  // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1553  // with Socket implementations that don't store their IOBuffer
1554  // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1555  scoped_refptr<IOBuffer> write_io_buffer =
1556      in_flight_write_->GetIOBufferForRemainingData();
1557  return connection_->socket()->Write(
1558      write_io_buffer.get(),
1559      in_flight_write_->GetRemainingSize(),
1560      base::Bind(&SpdySession::PumpWriteLoop,
1561                 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1562}
1563
1564int SpdySession::DoWriteComplete(int result) {
1565  CHECK(in_io_loop_);
1566  DCHECK_NE(result, ERR_IO_PENDING);
1567  DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1568
1569  last_activity_time_ = time_func_();
1570
1571  if (result < 0) {
1572    DCHECK_NE(result, ERR_IO_PENDING);
1573    in_flight_write_.reset();
1574    in_flight_write_frame_type_ = DATA;
1575    in_flight_write_frame_size_ = 0;
1576    in_flight_write_stream_.reset();
1577    write_state_ = WRITE_STATE_DO_WRITE;
1578    DoDrainSession(static_cast<Error>(result), "Write error");
1579    return OK;
1580  }
1581
1582  // It should not be possible to have written more bytes than our
1583  // in_flight_write_.
1584  DCHECK_LE(static_cast<size_t>(result),
1585            in_flight_write_->GetRemainingSize());
1586
1587  if (result > 0) {
1588    in_flight_write_->Consume(static_cast<size_t>(result));
1589
1590    // We only notify the stream when we've fully written the pending frame.
1591    if (in_flight_write_->GetRemainingSize() == 0) {
1592      // It is possible that the stream was cancelled while we were
1593      // writing to the socket.
1594      if (in_flight_write_stream_.get()) {
1595        DCHECK_GT(in_flight_write_frame_size_, 0u);
1596        in_flight_write_stream_->OnFrameWriteComplete(
1597            in_flight_write_frame_type_,
1598            in_flight_write_frame_size_);
1599      }
1600
1601      // Cleanup the write which just completed.
1602      in_flight_write_.reset();
1603      in_flight_write_frame_type_ = DATA;
1604      in_flight_write_frame_size_ = 0;
1605      in_flight_write_stream_.reset();
1606    }
1607  }
1608
1609  write_state_ = WRITE_STATE_DO_WRITE;
1610  return OK;
1611}
1612
1613void SpdySession::DcheckGoingAway() const {
1614#if DCHECK_IS_ON
1615  DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1616  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1617    DCHECK(pending_create_stream_queues_[i].empty());
1618  }
1619  DCHECK(created_streams_.empty());
1620#endif
1621}
1622
1623void SpdySession::DcheckDraining() const {
1624  DcheckGoingAway();
1625  DCHECK_EQ(availability_state_, STATE_DRAINING);
1626  DCHECK(active_streams_.empty());
1627  DCHECK(unclaimed_pushed_streams_.empty());
1628}
1629
1630void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1631                                 Error status) {
1632  DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1633
1634  // The loops below are carefully written to avoid reentrancy problems.
1635
1636  while (true) {
1637    size_t old_size = GetTotalSize(pending_create_stream_queues_);
1638    base::WeakPtr<SpdyStreamRequest> pending_request =
1639        GetNextPendingStreamRequest();
1640    if (!pending_request)
1641      break;
1642    // No new stream requests should be added while the session is
1643    // going away.
1644    DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1645    pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1646  }
1647
1648  while (true) {
1649    size_t old_size = active_streams_.size();
1650    ActiveStreamMap::iterator it =
1651        active_streams_.lower_bound(last_good_stream_id + 1);
1652    if (it == active_streams_.end())
1653      break;
1654    LogAbandonedActiveStream(it, status);
1655    CloseActiveStreamIterator(it, status);
1656    // No new streams should be activated while the session is going
1657    // away.
1658    DCHECK_GT(old_size, active_streams_.size());
1659  }
1660
1661  while (!created_streams_.empty()) {
1662    size_t old_size = created_streams_.size();
1663    CreatedStreamSet::iterator it = created_streams_.begin();
1664    LogAbandonedStream(*it, status);
1665    CloseCreatedStreamIterator(it, status);
1666    // No new streams should be created while the session is going
1667    // away.
1668    DCHECK_GT(old_size, created_streams_.size());
1669  }
1670
1671  write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1672
1673  DcheckGoingAway();
1674}
1675
1676void SpdySession::MaybeFinishGoingAway() {
1677  if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1678    DoDrainSession(OK, "Finished going away");
1679  }
1680}
1681
1682void SpdySession::DoDrainSession(Error err, const std::string& description) {
1683  if (availability_state_ == STATE_DRAINING) {
1684    return;
1685  }
1686  MakeUnavailable();
1687
1688  // If |err| indicates an error occurred, inform the peer that we're closing
1689  // and why. Don't GOAWAY on a graceful or idle close, as that may
1690  // unnecessarily wake the radio. We could technically GOAWAY on network errors
1691  // (we'll probably fail to actually write it, but that's okay), however many
1692  // unit-tests would need to be updated.
1693  if (err != OK &&
1694      err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
1695      err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
1696      err != ERR_SOCKET_NOT_CONNECTED &&
1697      err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1698    // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1699    SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
1700                           MapNetErrorToGoAwayStatus(err),
1701                           description);
1702    EnqueueSessionWrite(HIGHEST,
1703                        GOAWAY,
1704                        scoped_ptr<SpdyFrame>(
1705                            buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1706  }
1707
1708  availability_state_ = STATE_DRAINING;
1709  error_on_close_ = err;
1710
1711  net_log_.AddEvent(
1712      NetLog::TYPE_SPDY_SESSION_CLOSE,
1713      base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1714
1715  UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1716  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1717                              total_bytes_received_, 1, 100000000, 50);
1718
1719  if (err == OK) {
1720    // We ought to be going away already, as this is a graceful close.
1721    DcheckGoingAway();
1722  } else {
1723    StartGoingAway(0, err);
1724  }
1725  DcheckDraining();
1726  MaybePostWriteLoop();
1727}
1728
1729void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1730  DCHECK(stream);
1731  std::string description = base::StringPrintf(
1732      "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1733      stream->url().spec();
1734  stream->LogStreamError(status, description);
1735  // We don't increment the streams abandoned counter here. If the
1736  // stream isn't active (i.e., it hasn't written anything to the wire
1737  // yet) then it's as if it never existed. If it is active, then
1738  // LogAbandonedActiveStream() will increment the counters.
1739}
1740
1741void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1742                                           Error status) {
1743  DCHECK_GT(it->first, 0u);
1744  LogAbandonedStream(it->second.stream, status);
1745  ++streams_abandoned_count_;
1746  base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1747  abandoned_streams.Increment();
1748  if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1749      unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1750      unclaimed_pushed_streams_.end()) {
1751    base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1752    abandoned_push_streams.Increment();
1753  }
1754}
1755
1756SpdyStreamId SpdySession::GetNewStreamId() {
1757  CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1758  SpdyStreamId id = stream_hi_water_mark_;
1759  stream_hi_water_mark_ += 2;
1760  return id;
1761}
1762
1763void SpdySession::CloseSessionOnError(Error err,
1764                                      const std::string& description) {
1765  DCHECK_LT(err, ERR_IO_PENDING);
1766  DoDrainSession(err, description);
1767}
1768
1769void SpdySession::MakeUnavailable() {
1770  if (availability_state_ == STATE_AVAILABLE) {
1771    availability_state_ = STATE_GOING_AWAY;
1772    pool_->MakeSessionUnavailable(GetWeakPtr());
1773  }
1774}
1775
1776base::Value* SpdySession::GetInfoAsValue() const {
1777  base::DictionaryValue* dict = new base::DictionaryValue();
1778
1779  dict->SetInteger("source_id", net_log_.source().id);
1780
1781  dict->SetString("host_port_pair", host_port_pair().ToString());
1782  if (!pooled_aliases_.empty()) {
1783    base::ListValue* alias_list = new base::ListValue();
1784    for (std::set<SpdySessionKey>::const_iterator it =
1785             pooled_aliases_.begin();
1786         it != pooled_aliases_.end(); it++) {
1787      alias_list->Append(new base::StringValue(
1788          it->host_port_pair().ToString()));
1789    }
1790    dict->Set("aliases", alias_list);
1791  }
1792  dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1793
1794  dict->SetInteger("active_streams", active_streams_.size());
1795
1796  dict->SetInteger("unclaimed_pushed_streams",
1797                   unclaimed_pushed_streams_.size());
1798
1799  dict->SetBoolean("is_secure", is_secure_);
1800
1801  dict->SetString("protocol_negotiated",
1802                  SSLClientSocket::NextProtoToString(
1803                      connection_->socket()->GetNegotiatedProtocol()));
1804
1805  dict->SetInteger("error", error_on_close_);
1806  dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1807
1808  dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1809  dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1810  dict->SetInteger("streams_pushed_and_claimed_count",
1811      streams_pushed_and_claimed_count_);
1812  dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1813  DCHECK(buffered_spdy_framer_.get());
1814  dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1815
1816  dict->SetBoolean("sent_settings", sent_settings_);
1817  dict->SetBoolean("received_settings", received_settings_);
1818
1819  dict->SetInteger("send_window_size", session_send_window_size_);
1820  dict->SetInteger("recv_window_size", session_recv_window_size_);
1821  dict->SetInteger("unacked_recv_window_bytes",
1822                   session_unacked_recv_window_bytes_);
1823  return dict;
1824}
1825
1826bool SpdySession::IsReused() const {
1827  return buffered_spdy_framer_->frames_received() > 0 ||
1828      connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1829}
1830
1831bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1832                                    LoadTimingInfo* load_timing_info) const {
1833  return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1834                                        load_timing_info);
1835}
1836
1837int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1838  int rv = ERR_SOCKET_NOT_CONNECTED;
1839  if (connection_->socket()) {
1840    rv = connection_->socket()->GetPeerAddress(address);
1841  }
1842
1843  UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1844                        rv == ERR_SOCKET_NOT_CONNECTED);
1845
1846  return rv;
1847}
1848
1849int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1850  int rv = ERR_SOCKET_NOT_CONNECTED;
1851  if (connection_->socket()) {
1852    rv = connection_->socket()->GetLocalAddress(address);
1853  }
1854
1855  UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1856                        rv == ERR_SOCKET_NOT_CONNECTED);
1857
1858  return rv;
1859}
1860
1861void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1862                                      SpdyFrameType frame_type,
1863                                      scoped_ptr<SpdyFrame> frame) {
1864  DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1865         frame_type == WINDOW_UPDATE || frame_type == PING ||
1866         frame_type == GOAWAY);
1867  EnqueueWrite(
1868      priority, frame_type,
1869      scoped_ptr<SpdyBufferProducer>(
1870          new SimpleBufferProducer(
1871              scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1872      base::WeakPtr<SpdyStream>());
1873}
1874
1875void SpdySession::EnqueueWrite(RequestPriority priority,
1876                               SpdyFrameType frame_type,
1877                               scoped_ptr<SpdyBufferProducer> producer,
1878                               const base::WeakPtr<SpdyStream>& stream) {
1879  if (availability_state_ == STATE_DRAINING)
1880    return;
1881
1882  write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1883  MaybePostWriteLoop();
1884}
1885
1886void SpdySession::MaybePostWriteLoop() {
1887  if (write_state_ == WRITE_STATE_IDLE) {
1888    CHECK(!in_flight_write_);
1889    write_state_ = WRITE_STATE_DO_WRITE;
1890    base::MessageLoop::current()->PostTask(
1891        FROM_HERE,
1892        base::Bind(&SpdySession::PumpWriteLoop,
1893                   weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1894  }
1895}
1896
1897void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1898  CHECK_EQ(stream->stream_id(), 0u);
1899  CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1900  created_streams_.insert(stream.release());
1901}
1902
1903scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1904  CHECK_EQ(stream->stream_id(), 0u);
1905  CHECK(created_streams_.find(stream) != created_streams_.end());
1906  stream->set_stream_id(GetNewStreamId());
1907  scoped_ptr<SpdyStream> owned_stream(stream);
1908  created_streams_.erase(stream);
1909  return owned_stream.Pass();
1910}
1911
1912void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1913  SpdyStreamId stream_id = stream->stream_id();
1914  CHECK_NE(stream_id, 0u);
1915  std::pair<ActiveStreamMap::iterator, bool> result =
1916      active_streams_.insert(
1917          std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1918  CHECK(result.second);
1919  ignore_result(stream.release());
1920}
1921
1922void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1923  if (in_flight_write_stream_.get() == stream.get()) {
1924    // If we're deleting the stream for the in-flight write, we still
1925    // need to let the write complete, so we clear
1926    // |in_flight_write_stream_| and let the write finish on its own
1927    // without notifying |in_flight_write_stream_|.
1928    in_flight_write_stream_.reset();
1929  }
1930
1931  write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1932  stream->OnClose(status);
1933
1934  if (availability_state_ == STATE_AVAILABLE) {
1935    ProcessPendingStreamRequests();
1936  }
1937}
1938
1939base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1940  base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1941
1942  PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1943  if (unclaimed_it == unclaimed_pushed_streams_.end())
1944    return base::WeakPtr<SpdyStream>();
1945
1946  SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1947  unclaimed_pushed_streams_.erase(unclaimed_it);
1948
1949  ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1950  if (active_it == active_streams_.end()) {
1951    NOTREACHED();
1952    return base::WeakPtr<SpdyStream>();
1953  }
1954
1955  net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1956  used_push_streams.Increment();
1957  return active_it->second.stream->GetWeakPtr();
1958}
1959
1960bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1961                             bool* was_npn_negotiated,
1962                             NextProto* protocol_negotiated) {
1963  *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1964  *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1965  return connection_->socket()->GetSSLInfo(ssl_info);
1966}
1967
1968bool SpdySession::GetSSLCertRequestInfo(
1969    SSLCertRequestInfo* cert_request_info) {
1970  if (!is_secure_)
1971    return false;
1972  GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1973  return true;
1974}
1975
1976void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1977  CHECK(in_io_loop_);
1978
1979  RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1980  std::string description =
1981      base::StringPrintf("Framer error: %d (%s).",
1982                         error_code,
1983                         SpdyFramer::ErrorCodeToString(error_code));
1984  DoDrainSession(MapFramerErrorToNetError(error_code), description);
1985}
1986
1987void SpdySession::OnStreamError(SpdyStreamId stream_id,
1988                                const std::string& description) {
1989  CHECK(in_io_loop_);
1990
1991  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1992  if (it == active_streams_.end()) {
1993    // We still want to send a frame to reset the stream even if we
1994    // don't know anything about it.
1995    EnqueueResetStreamFrame(
1996        stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1997    return;
1998  }
1999
2000  ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
2001}
2002
2003void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
2004                                    size_t length,
2005                                    bool fin) {
2006  CHECK(in_io_loop_);
2007
2008  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2009
2010  // By the time data comes in, the stream may already be inactive.
2011  if (it == active_streams_.end())
2012    return;
2013
2014  SpdyStream* stream = it->second.stream;
2015  CHECK_EQ(stream->stream_id(), stream_id);
2016
2017  DCHECK(buffered_spdy_framer_);
2018  size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
2019  stream->IncrementRawReceivedBytes(header_len);
2020}
2021
2022void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2023                                    const char* data,
2024                                    size_t len,
2025                                    bool fin) {
2026  CHECK(in_io_loop_);
2027
2028  if (data == NULL && len != 0) {
2029    // This is notification of consumed data padding.
2030    // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2031    // See crbug.com/353012.
2032    return;
2033  }
2034
2035  DCHECK_LT(len, 1u << 24);
2036  if (net_log().IsLogging()) {
2037    net_log().AddEvent(
2038        NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2039        base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2040  }
2041
2042  // Build the buffer as early as possible so that we go through the
2043  // session flow control checks and update
2044  // |unacked_recv_window_bytes_| properly even when the stream is
2045  // inactive (since the other side has still reduced its session send
2046  // window).
2047  scoped_ptr<SpdyBuffer> buffer;
2048  if (data) {
2049    DCHECK_GT(len, 0u);
2050    CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2051    buffer.reset(new SpdyBuffer(data, len));
2052
2053    if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2054      DecreaseRecvWindowSize(static_cast<int32>(len));
2055      buffer->AddConsumeCallback(
2056          base::Bind(&SpdySession::OnReadBufferConsumed,
2057                     weak_factory_.GetWeakPtr()));
2058    }
2059  } else {
2060    DCHECK_EQ(len, 0u);
2061  }
2062
2063  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2064
2065  // By the time data comes in, the stream may already be inactive.
2066  if (it == active_streams_.end())
2067    return;
2068
2069  SpdyStream* stream = it->second.stream;
2070  CHECK_EQ(stream->stream_id(), stream_id);
2071
2072  stream->IncrementRawReceivedBytes(len);
2073
2074  if (it->second.waiting_for_syn_reply) {
2075    const std::string& error = "Data received before SYN_REPLY.";
2076    stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2077    ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2078    return;
2079  }
2080
2081  stream->OnDataReceived(buffer.Pass());
2082}
2083
2084void SpdySession::OnSettings(bool clear_persisted) {
2085  CHECK(in_io_loop_);
2086
2087  if (clear_persisted)
2088    http_server_properties_->ClearSpdySettings(host_port_pair());
2089
2090  if (net_log_.IsLogging()) {
2091    net_log_.AddEvent(
2092        NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2093        base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2094                   clear_persisted));
2095  }
2096
2097  if (GetProtocolVersion() >= SPDY4) {
2098    // Send an acknowledgment of the setting.
2099    SpdySettingsIR settings_ir;
2100    settings_ir.set_is_ack(true);
2101    EnqueueSessionWrite(
2102        HIGHEST,
2103        SETTINGS,
2104        scoped_ptr<SpdyFrame>(
2105            buffered_spdy_framer_->SerializeFrame(settings_ir)));
2106  }
2107}
2108
2109void SpdySession::OnSetting(SpdySettingsIds id,
2110                            uint8 flags,
2111                            uint32 value) {
2112  CHECK(in_io_loop_);
2113
2114  HandleSetting(id, value);
2115  http_server_properties_->SetSpdySetting(
2116      host_port_pair(),
2117      id,
2118      static_cast<SpdySettingsFlags>(flags),
2119      value);
2120  received_settings_ = true;
2121
2122  // Log the setting.
2123  const SpdyMajorVersion protocol_version = GetProtocolVersion();
2124  net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2125                    base::Bind(&NetLogSpdySettingCallback,
2126                               id,
2127                               protocol_version,
2128                               static_cast<SpdySettingsFlags>(flags),
2129                               value));
2130}
2131
2132void SpdySession::OnSendCompressedFrame(
2133    SpdyStreamId stream_id,
2134    SpdyFrameType type,
2135    size_t payload_len,
2136    size_t frame_len) {
2137  if (type != SYN_STREAM && type != HEADERS)
2138    return;
2139
2140  DCHECK(buffered_spdy_framer_.get());
2141  size_t compressed_len =
2142      frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2143
2144  if (payload_len) {
2145    // Make sure we avoid early decimal truncation.
2146    int compression_pct = 100 - (100 * compressed_len) / payload_len;
2147    UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2148                             compression_pct);
2149  }
2150}
2151
2152void SpdySession::OnReceiveCompressedFrame(
2153    SpdyStreamId stream_id,
2154    SpdyFrameType type,
2155    size_t frame_len) {
2156  last_compressed_frame_len_ = frame_len;
2157}
2158
2159int SpdySession::OnInitialResponseHeadersReceived(
2160    const SpdyHeaderBlock& response_headers,
2161    base::Time response_time,
2162    base::TimeTicks recv_first_byte_time,
2163    SpdyStream* stream) {
2164  CHECK(in_io_loop_);
2165  SpdyStreamId stream_id = stream->stream_id();
2166
2167  if (stream->type() == SPDY_PUSH_STREAM) {
2168    DCHECK(stream->IsReservedRemote());
2169    if (max_concurrent_pushed_streams_ &&
2170        num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2171      ResetStream(stream_id,
2172                  RST_STREAM_REFUSED_STREAM,
2173                  "Stream concurrency limit reached.");
2174      return STATUS_CODE_REFUSED_STREAM;
2175    }
2176  }
2177
2178  if (stream->type() == SPDY_PUSH_STREAM) {
2179    // Will be balanced in DeleteStream.
2180    num_active_pushed_streams_++;
2181  }
2182
2183  // May invalidate |stream|.
2184  int rv = stream->OnInitialResponseHeadersReceived(
2185      response_headers, response_time, recv_first_byte_time);
2186  if (rv < 0) {
2187    DCHECK_NE(rv, ERR_IO_PENDING);
2188    DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2189  }
2190
2191  return rv;
2192}
2193
2194void SpdySession::OnSynStream(SpdyStreamId stream_id,
2195                              SpdyStreamId associated_stream_id,
2196                              SpdyPriority priority,
2197                              bool fin,
2198                              bool unidirectional,
2199                              const SpdyHeaderBlock& headers) {
2200  CHECK(in_io_loop_);
2201
2202  if (GetProtocolVersion() >= SPDY4) {
2203    DCHECK_EQ(0u, associated_stream_id);
2204    OnHeaders(stream_id, fin, headers);
2205    return;
2206  }
2207
2208  base::Time response_time = base::Time::Now();
2209  base::TimeTicks recv_first_byte_time = time_func_();
2210
2211  if (net_log_.IsLogging()) {
2212    net_log_.AddEvent(
2213        NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2214        base::Bind(&NetLogSpdySynStreamReceivedCallback,
2215                   &headers, fin, unidirectional, priority,
2216                   stream_id, associated_stream_id));
2217  }
2218
2219  // Split headers to simulate push promise and response.
2220  SpdyHeaderBlock request_headers;
2221  SpdyHeaderBlock response_headers;
2222  SplitPushedHeadersToRequestAndResponse(
2223      headers, GetProtocolVersion(), &request_headers, &response_headers);
2224
2225  if (!TryCreatePushStream(
2226          stream_id, associated_stream_id, priority, request_headers))
2227    return;
2228
2229  ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2230  if (active_it == active_streams_.end()) {
2231    NOTREACHED();
2232    return;
2233  }
2234
2235  if (OnInitialResponseHeadersReceived(response_headers,
2236                                       response_time,
2237                                       recv_first_byte_time,
2238                                       active_it->second.stream) != OK)
2239    return;
2240
2241  base::StatsCounter push_requests("spdy.pushed_streams");
2242  push_requests.Increment();
2243}
2244
2245void SpdySession::DeleteExpiredPushedStreams() {
2246  if (unclaimed_pushed_streams_.empty())
2247    return;
2248
2249  // Check that adequate time has elapsed since the last sweep.
2250  if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2251    return;
2252
2253  // Gather old streams to delete.
2254  base::TimeTicks minimum_freshness = time_func_() -
2255      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2256  std::vector<SpdyStreamId> streams_to_close;
2257  for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2258       it != unclaimed_pushed_streams_.end(); ++it) {
2259    if (minimum_freshness > it->second.creation_time)
2260      streams_to_close.push_back(it->second.stream_id);
2261  }
2262
2263  for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2264           streams_to_close.begin();
2265       to_close_it != streams_to_close.end(); ++to_close_it) {
2266    ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2267    if (active_it == active_streams_.end())
2268      continue;
2269
2270    LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2271    // CloseActiveStreamIterator() will remove the stream from
2272    // |unclaimed_pushed_streams_|.
2273    ResetStreamIterator(
2274        active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2275  }
2276
2277  next_unclaimed_push_stream_sweep_time_ = time_func_() +
2278      base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2279}
2280
2281void SpdySession::OnSynReply(SpdyStreamId stream_id,
2282                             bool fin,
2283                             const SpdyHeaderBlock& headers) {
2284  CHECK(in_io_loop_);
2285
2286  base::Time response_time = base::Time::Now();
2287  base::TimeTicks recv_first_byte_time = time_func_();
2288
2289  if (net_log().IsLogging()) {
2290    net_log().AddEvent(
2291        NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2292        base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2293                   &headers, fin, stream_id));
2294  }
2295
2296  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2297  if (it == active_streams_.end()) {
2298    // NOTE:  it may just be that the stream was cancelled.
2299    return;
2300  }
2301
2302  SpdyStream* stream = it->second.stream;
2303  CHECK_EQ(stream->stream_id(), stream_id);
2304
2305  stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2306  last_compressed_frame_len_ = 0;
2307
2308  if (GetProtocolVersion() >= SPDY4) {
2309    const std::string& error =
2310        "SPDY4 wasn't expecting SYN_REPLY.";
2311    stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2312    ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2313    return;
2314  }
2315  if (!it->second.waiting_for_syn_reply) {
2316    const std::string& error =
2317        "Received duplicate SYN_REPLY for stream.";
2318    stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2319    ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2320    return;
2321  }
2322  it->second.waiting_for_syn_reply = false;
2323
2324  ignore_result(OnInitialResponseHeadersReceived(
2325      headers, response_time, recv_first_byte_time, stream));
2326}
2327
2328void SpdySession::OnHeaders(SpdyStreamId stream_id,
2329                            bool fin,
2330                            const SpdyHeaderBlock& headers) {
2331  CHECK(in_io_loop_);
2332
2333  if (net_log().IsLogging()) {
2334    net_log().AddEvent(
2335        NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2336        base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2337                   &headers, fin, stream_id));
2338  }
2339
2340  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2341  if (it == active_streams_.end()) {
2342    // NOTE:  it may just be that the stream was cancelled.
2343    LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2344    return;
2345  }
2346
2347  SpdyStream* stream = it->second.stream;
2348  CHECK_EQ(stream->stream_id(), stream_id);
2349
2350  stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2351  last_compressed_frame_len_ = 0;
2352
2353  base::Time response_time = base::Time::Now();
2354  base::TimeTicks recv_first_byte_time = time_func_();
2355
2356  if (it->second.waiting_for_syn_reply) {
2357    if (GetProtocolVersion() < SPDY4) {
2358      const std::string& error =
2359          "Was expecting SYN_REPLY, not HEADERS.";
2360      stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2361      ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2362      return;
2363    }
2364
2365    it->second.waiting_for_syn_reply = false;
2366    ignore_result(OnInitialResponseHeadersReceived(
2367        headers, response_time, recv_first_byte_time, stream));
2368  } else if (it->second.stream->IsReservedRemote()) {
2369    ignore_result(OnInitialResponseHeadersReceived(
2370        headers, response_time, recv_first_byte_time, stream));
2371  } else {
2372    int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2373    if (rv < 0) {
2374      DCHECK_NE(rv, ERR_IO_PENDING);
2375      DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2376    }
2377  }
2378}
2379
2380bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
2381  // Validate stream id.
2382  // Was the frame sent on a stream id that has not been used in this session?
2383  if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
2384    return false;
2385
2386  if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
2387    return false;
2388
2389  return true;
2390}
2391
2392void SpdySession::OnRstStream(SpdyStreamId stream_id,
2393                              SpdyRstStreamStatus status) {
2394  CHECK(in_io_loop_);
2395
2396  std::string description;
2397  net_log().AddEvent(
2398      NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2399      base::Bind(&NetLogSpdyRstCallback,
2400                 stream_id, status, &description));
2401
2402  ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2403  if (it == active_streams_.end()) {
2404    // NOTE:  it may just be that the stream was cancelled.
2405    LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2406    return;
2407  }
2408
2409  CHECK_EQ(it->second.stream->stream_id(), stream_id);
2410
2411  if (status == 0) {
2412    it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2413  } else if (status == RST_STREAM_REFUSED_STREAM) {
2414    CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2415  } else {
2416    RecordProtocolErrorHistogram(
2417        PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2418    it->second.stream->LogStreamError(
2419        ERR_SPDY_PROTOCOL_ERROR,
2420        base::StringPrintf("SPDY stream closed with status: %d", status));
2421    // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2422    //                For now, it doesn't matter much - it is a protocol error.
2423    CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2424  }
2425}
2426
2427void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2428                           SpdyGoAwayStatus status) {
2429  CHECK(in_io_loop_);
2430
2431  // TODO(jgraettinger): UMA histogram on |status|.
2432
2433  net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2434      base::Bind(&NetLogSpdyGoAwayCallback,
2435                 last_accepted_stream_id,
2436                 active_streams_.size(),
2437                 unclaimed_pushed_streams_.size(),
2438                 status));
2439  MakeUnavailable();
2440  StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2441  // This is to handle the case when we already don't have any active
2442  // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2443  // active streams and so the last one being closed will finish the
2444  // going away process (see DeleteStream()).
2445  MaybeFinishGoingAway();
2446}
2447
2448void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2449  CHECK(in_io_loop_);
2450
2451  net_log_.AddEvent(
2452      NetLog::TYPE_SPDY_SESSION_PING,
2453      base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2454
2455  // Send response to a PING from server.
2456  if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2457      (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2458    WritePingFrame(unique_id, true);
2459    return;
2460  }
2461
2462  --pings_in_flight_;
2463  if (pings_in_flight_ < 0) {
2464    RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2465    DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2466    pings_in_flight_ = 0;
2467    return;
2468  }
2469
2470  if (pings_in_flight_ > 0)
2471    return;
2472
2473  // We will record RTT in histogram when there are no more client sent
2474  // pings_in_flight_.
2475  RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2476}
2477
2478void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2479                                 uint32 delta_window_size) {
2480  CHECK(in_io_loop_);
2481
2482  DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2483  net_log_.AddEvent(
2484      NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2485      base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2486                 stream_id, delta_window_size));
2487
2488  if (stream_id == kSessionFlowControlStreamId) {
2489    // WINDOW_UPDATE for the session.
2490    if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2491      LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2492                   << "session flow control is not turned on";
2493      // TODO(akalin): Record an error and close the session.
2494      return;
2495    }
2496
2497    if (delta_window_size < 1u) {
2498      RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2499      DoDrainSession(
2500          ERR_SPDY_PROTOCOL_ERROR,
2501          "Received WINDOW_UPDATE with an invalid delta_window_size " +
2502              base::UintToString(delta_window_size));
2503      return;
2504    }
2505
2506    IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2507  } else {
2508    // WINDOW_UPDATE for a stream.
2509    if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2510      // TODO(akalin): Record an error and close the session.
2511      LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2512                   << " when flow control is not turned on";
2513      return;
2514    }
2515
2516    ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2517
2518    if (it == active_streams_.end()) {
2519      // NOTE:  it may just be that the stream was cancelled.
2520      LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2521      return;
2522    }
2523
2524    SpdyStream* stream = it->second.stream;
2525    CHECK_EQ(stream->stream_id(), stream_id);
2526
2527    if (delta_window_size < 1u) {
2528      ResetStreamIterator(it,
2529                          RST_STREAM_FLOW_CONTROL_ERROR,
2530                          base::StringPrintf(
2531                              "Received WINDOW_UPDATE with an invalid "
2532                              "delta_window_size %ud", delta_window_size));
2533      return;
2534    }
2535
2536    CHECK_EQ(it->second.stream->stream_id(), stream_id);
2537    it->second.stream->IncreaseSendWindowSize(
2538        static_cast<int32>(delta_window_size));
2539  }
2540}
2541
2542bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2543                                      SpdyStreamId associated_stream_id,
2544                                      SpdyPriority priority,
2545                                      const SpdyHeaderBlock& headers) {
2546  // Server-initiated streams should have even sequence numbers.
2547  if ((stream_id & 0x1) != 0) {
2548    LOG(WARNING) << "Received invalid push stream id " << stream_id;
2549    if (GetProtocolVersion() > SPDY2)
2550      CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
2551    return false;
2552  }
2553
2554  if (GetProtocolVersion() > SPDY2) {
2555    if (stream_id <= last_accepted_push_stream_id_) {
2556      LOG(WARNING) << "Received push stream id lesser or equal to the last "
2557                   << "accepted before " << stream_id;
2558      CloseSessionOnError(
2559          ERR_SPDY_PROTOCOL_ERROR,
2560          "New push stream id must be greater than the last accepted.");
2561      return false;
2562    }
2563  }
2564
2565  if (IsStreamActive(stream_id)) {
2566    // For SPDY3 and higher we should not get here, we'll start going away
2567    // earlier on |last_seen_push_stream_id_| check.
2568    CHECK_GT(SPDY3, GetProtocolVersion());
2569    LOG(WARNING) << "Received push for active stream " << stream_id;
2570    return false;
2571  }
2572
2573  last_accepted_push_stream_id_ = stream_id;
2574
2575  RequestPriority request_priority =
2576      ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2577
2578  if (availability_state_ == STATE_GOING_AWAY) {
2579    // TODO(akalin): This behavior isn't in the SPDY spec, although it
2580    // probably should be.
2581    EnqueueResetStreamFrame(stream_id,
2582                            request_priority,
2583                            RST_STREAM_REFUSED_STREAM,
2584                            "push stream request received when going away");
2585    return false;
2586  }
2587
2588  if (associated_stream_id == 0) {
2589    // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2590    // session going away. We should never get here.
2591    CHECK_GT(SPDY4, GetProtocolVersion());
2592    std::string description = base::StringPrintf(
2593        "Received invalid associated stream id %d for pushed stream %d",
2594        associated_stream_id,
2595        stream_id);
2596    EnqueueResetStreamFrame(
2597        stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2598    return false;
2599  }
2600
2601  streams_pushed_count_++;
2602
2603  // TODO(mbelshe): DCHECK that this is a GET method?
2604
2605  // Verify that the response had a URL for us.
2606  GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2607  if (!gurl.is_valid()) {
2608    EnqueueResetStreamFrame(stream_id,
2609                            request_priority,
2610                            RST_STREAM_PROTOCOL_ERROR,
2611                            "Pushed stream url was invalid: " + gurl.spec());
2612    return false;
2613  }
2614
2615  // Verify we have a valid stream association.
2616  ActiveStreamMap::iterator associated_it =
2617      active_streams_.find(associated_stream_id);
2618  if (associated_it == active_streams_.end()) {
2619    EnqueueResetStreamFrame(
2620        stream_id,
2621        request_priority,
2622        RST_STREAM_INVALID_STREAM,
2623        base::StringPrintf("Received push for inactive associated stream %d",
2624                           associated_stream_id));
2625    return false;
2626  }
2627
2628  // Check that the pushed stream advertises the same origin as its associated
2629  // stream. Bypass this check if and only if this session is with a SPDY proxy
2630  // that is trusted explicitly via the --trusted-spdy-proxy switch.
2631  if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2632    // Disallow pushing of HTTPS content.
2633    if (gurl.SchemeIs("https")) {
2634      EnqueueResetStreamFrame(
2635          stream_id,
2636          request_priority,
2637          RST_STREAM_REFUSED_STREAM,
2638          base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2639                             associated_stream_id));
2640    }
2641  } else {
2642    GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2643    if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2644      EnqueueResetStreamFrame(
2645          stream_id,
2646          request_priority,
2647          RST_STREAM_REFUSED_STREAM,
2648          base::StringPrintf("Rejected Cross Origin Push Stream %d",
2649                             associated_stream_id));
2650      return false;
2651    }
2652  }
2653
2654  // There should not be an existing pushed stream with the same path.
2655  PushedStreamMap::iterator pushed_it =
2656      unclaimed_pushed_streams_.lower_bound(gurl);
2657  if (pushed_it != unclaimed_pushed_streams_.end() &&
2658      pushed_it->first == gurl) {
2659    EnqueueResetStreamFrame(
2660        stream_id,
2661        request_priority,
2662        RST_STREAM_PROTOCOL_ERROR,
2663        "Received duplicate pushed stream with url: " + gurl.spec());
2664    return false;
2665  }
2666
2667  scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2668                                               GetWeakPtr(),
2669                                               gurl,
2670                                               request_priority,
2671                                               stream_initial_send_window_size_,
2672                                               stream_initial_recv_window_size_,
2673                                               net_log_));
2674  stream->set_stream_id(stream_id);
2675
2676  // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2677  if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2678    associated_it->second.stream->IncrementRawReceivedBytes(
2679        last_compressed_frame_len_);
2680  } else {
2681    stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2682  }
2683
2684  last_compressed_frame_len_ = 0;
2685
2686  DeleteExpiredPushedStreams();
2687  PushedStreamMap::iterator inserted_pushed_it =
2688      unclaimed_pushed_streams_.insert(
2689          pushed_it,
2690          std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2691  DCHECK(inserted_pushed_it != pushed_it);
2692
2693  InsertActivatedStream(stream.Pass());
2694
2695  ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2696  if (active_it == active_streams_.end()) {
2697    NOTREACHED();
2698    return false;
2699  }
2700
2701  active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2702  DCHECK(active_it->second.stream->IsReservedRemote());
2703  num_pushed_streams_++;
2704  return true;
2705}
2706
2707void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2708                                SpdyStreamId promised_stream_id,
2709                                const SpdyHeaderBlock& headers) {
2710  CHECK(in_io_loop_);
2711
2712  if (net_log_.IsLogging()) {
2713    net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2714                      base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2715                                 &headers,
2716                                 stream_id,
2717                                 promised_stream_id));
2718  }
2719
2720  // Any priority will do.
2721  // TODO(baranovich): pass parent stream id priority?
2722  if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2723    return;
2724
2725  base::StatsCounter push_requests("spdy.pushed_streams");
2726  push_requests.Increment();
2727}
2728
2729void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2730                                         uint32 delta_window_size) {
2731  CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2732  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2733  CHECK(it != active_streams_.end());
2734  CHECK_EQ(it->second.stream->stream_id(), stream_id);
2735  SendWindowUpdateFrame(
2736      stream_id, delta_window_size, it->second.stream->priority());
2737}
2738
2739void SpdySession::SendInitialData() {
2740  DCHECK(enable_sending_initial_data_);
2741
2742  if (send_connection_header_prefix_) {
2743    DCHECK_EQ(protocol_, kProtoSPDY4);
2744    scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2745        new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2746                      kHttp2ConnectionHeaderPrefixSize,
2747                      false /* take_ownership */));
2748    // Count the prefix as part of the subsequent SETTINGS frame.
2749    EnqueueSessionWrite(HIGHEST, SETTINGS,
2750                        connection_header_prefix_frame.Pass());
2751  }
2752
2753  // First, notify the server about the settings they should use when
2754  // communicating with us.
2755  SettingsMap settings_map;
2756  // Create a new settings frame notifying the server of our
2757  // max concurrent streams and initial window size.
2758  settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2759      SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2760  if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2761      stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2762    settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2763        SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2764                              stream_initial_recv_window_size_);
2765  }
2766  SendSettings(settings_map);
2767
2768  // Next, notify the server about our initial recv window size.
2769  if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2770    // Bump up the receive window size to the real initial value. This
2771    // has to go here since the WINDOW_UPDATE frame sent by
2772    // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2773    DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2774    // This condition implies that |kDefaultInitialRecvWindowSize| -
2775    // |session_recv_window_size_| doesn't overflow.
2776    DCHECK_GT(session_recv_window_size_, 0);
2777    IncreaseRecvWindowSize(
2778        kDefaultInitialRecvWindowSize - session_recv_window_size_);
2779  }
2780
2781  if (protocol_ <= kProtoSPDY31) {
2782    // Finally, notify the server about the settings they have
2783    // previously told us to use when communicating with them (after
2784    // applying them).
2785    const SettingsMap& server_settings_map =
2786        http_server_properties_->GetSpdySettings(host_port_pair());
2787    if (server_settings_map.empty())
2788      return;
2789
2790    SettingsMap::const_iterator it =
2791        server_settings_map.find(SETTINGS_CURRENT_CWND);
2792    uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2793    UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2794
2795    for (SettingsMap::const_iterator it = server_settings_map.begin();
2796         it != server_settings_map.end(); ++it) {
2797      const SpdySettingsIds new_id = it->first;
2798      const uint32 new_val = it->second.second;
2799      HandleSetting(new_id, new_val);
2800    }
2801
2802    SendSettings(server_settings_map);
2803  }
2804}
2805
2806
2807void SpdySession::SendSettings(const SettingsMap& settings) {
2808  const SpdyMajorVersion protocol_version = GetProtocolVersion();
2809  net_log_.AddEvent(
2810      NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2811      base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
2812  // Create the SETTINGS frame and send it.
2813  DCHECK(buffered_spdy_framer_.get());
2814  scoped_ptr<SpdyFrame> settings_frame(
2815      buffered_spdy_framer_->CreateSettings(settings));
2816  sent_settings_ = true;
2817  EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2818}
2819
2820void SpdySession::HandleSetting(uint32 id, uint32 value) {
2821  switch (id) {
2822    case SETTINGS_MAX_CONCURRENT_STREAMS:
2823      max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2824                                         kMaxConcurrentStreamLimit);
2825      ProcessPendingStreamRequests();
2826      break;
2827    case SETTINGS_INITIAL_WINDOW_SIZE: {
2828      if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2829        net_log().AddEvent(
2830            NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2831        return;
2832      }
2833
2834      if (value > static_cast<uint32>(kint32max)) {
2835        net_log().AddEvent(
2836            NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2837            NetLog::IntegerCallback("initial_window_size", value));
2838        return;
2839      }
2840
2841      // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2842      int32 delta_window_size =
2843          static_cast<int32>(value) - stream_initial_send_window_size_;
2844      stream_initial_send_window_size_ = static_cast<int32>(value);
2845      UpdateStreamsSendWindowSize(delta_window_size);
2846      net_log().AddEvent(
2847          NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2848          NetLog::IntegerCallback("delta_window_size", delta_window_size));
2849      break;
2850    }
2851  }
2852}
2853
2854void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2855  DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2856  for (ActiveStreamMap::iterator it = active_streams_.begin();
2857       it != active_streams_.end(); ++it) {
2858    it->second.stream->AdjustSendWindowSize(delta_window_size);
2859  }
2860
2861  for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2862       it != created_streams_.end(); it++) {
2863    (*it)->AdjustSendWindowSize(delta_window_size);
2864  }
2865}
2866
2867void SpdySession::SendPrefacePingIfNoneInFlight() {
2868  if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2869    return;
2870
2871  base::TimeTicks now = time_func_();
2872  // If there is no activity in the session, then send a preface-PING.
2873  if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2874    SendPrefacePing();
2875}
2876
2877void SpdySession::SendPrefacePing() {
2878  WritePingFrame(next_ping_id_, false);
2879}
2880
2881void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2882                                        uint32 delta_window_size,
2883                                        RequestPriority priority) {
2884  CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2885  ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2886  if (it != active_streams_.end()) {
2887    CHECK_EQ(it->second.stream->stream_id(), stream_id);
2888  } else {
2889    CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2890    CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2891  }
2892
2893  net_log_.AddEvent(
2894      NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2895      base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2896                 stream_id, delta_window_size));
2897
2898  DCHECK(buffered_spdy_framer_.get());
2899  scoped_ptr<SpdyFrame> window_update_frame(
2900      buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2901  EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2902}
2903
2904void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2905  DCHECK(buffered_spdy_framer_.get());
2906  scoped_ptr<SpdyFrame> ping_frame(
2907      buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2908  EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2909
2910  if (net_log().IsLogging()) {
2911    net_log().AddEvent(
2912        NetLog::TYPE_SPDY_SESSION_PING,
2913        base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2914  }
2915  if (!is_ack) {
2916    next_ping_id_ += 2;
2917    ++pings_in_flight_;
2918    PlanToCheckPingStatus();
2919    last_ping_sent_time_ = time_func_();
2920  }
2921}
2922
2923void SpdySession::PlanToCheckPingStatus() {
2924  if (check_ping_status_pending_)
2925    return;
2926
2927  check_ping_status_pending_ = true;
2928  base::MessageLoop::current()->PostDelayedTask(
2929      FROM_HERE,
2930      base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2931                 time_func_()), hung_interval_);
2932}
2933
2934void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2935  CHECK(!in_io_loop_);
2936
2937  // Check if we got a response back for all PINGs we had sent.
2938  if (pings_in_flight_ == 0) {
2939    check_ping_status_pending_ = false;
2940    return;
2941  }
2942
2943  DCHECK(check_ping_status_pending_);
2944
2945  base::TimeTicks now = time_func_();
2946  base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2947
2948  if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2949    // Track all failed PING messages in a separate bucket.
2950    RecordPingRTTHistogram(base::TimeDelta::Max());
2951    DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2952    return;
2953  }
2954
2955  // Check the status of connection after a delay.
2956  base::MessageLoop::current()->PostDelayedTask(
2957      FROM_HERE,
2958      base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2959                 now),
2960      delay);
2961}
2962
2963void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2964  UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2965}
2966
2967void SpdySession::RecordProtocolErrorHistogram(
2968    SpdyProtocolErrorDetails details) {
2969  UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2970                            NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2971  if (EndsWith(host_port_pair().host(), "google.com", false)) {
2972    UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2973                              NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2974  }
2975}
2976
2977void SpdySession::RecordHistograms() {
2978  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2979                              streams_initiated_count_,
2980                              0, 300, 50);
2981  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2982                              streams_pushed_count_,
2983                              0, 300, 50);
2984  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2985                              streams_pushed_and_claimed_count_,
2986                              0, 300, 50);
2987  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2988                              streams_abandoned_count_,
2989                              0, 300, 50);
2990  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2991                            sent_settings_ ? 1 : 0, 2);
2992  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2993                            received_settings_ ? 1 : 0, 2);
2994  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2995                              stalled_streams_,
2996                              0, 300, 50);
2997  UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2998                            stalled_streams_ > 0 ? 1 : 0, 2);
2999
3000  if (received_settings_) {
3001    // Enumerate the saved settings, and set histograms for it.
3002    const SettingsMap& settings_map =
3003        http_server_properties_->GetSpdySettings(host_port_pair());
3004
3005    SettingsMap::const_iterator it;
3006    for (it = settings_map.begin(); it != settings_map.end(); ++it) {
3007      const SpdySettingsIds id = it->first;
3008      const uint32 val = it->second.second;
3009      switch (id) {
3010        case SETTINGS_CURRENT_CWND:
3011          // Record several different histograms to see if cwnd converges
3012          // for larger volumes of data being sent.
3013          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
3014                                      val, 1, 200, 100);
3015          if (total_bytes_received_ > 10 * 1024) {
3016            UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
3017                                        val, 1, 200, 100);
3018            if (total_bytes_received_ > 25 * 1024) {
3019              UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
3020                                          val, 1, 200, 100);
3021              if (total_bytes_received_ > 50 * 1024) {
3022                UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
3023                                            val, 1, 200, 100);
3024                if (total_bytes_received_ > 100 * 1024) {
3025                  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
3026                                              val, 1, 200, 100);
3027                }
3028              }
3029            }
3030          }
3031          break;
3032        case SETTINGS_ROUND_TRIP_TIME:
3033          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
3034                                      val, 1, 1200, 100);
3035          break;
3036        case SETTINGS_DOWNLOAD_RETRANS_RATE:
3037          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
3038                                      val, 1, 100, 50);
3039          break;
3040        default:
3041          break;
3042      }
3043    }
3044  }
3045}
3046
3047void SpdySession::CompleteStreamRequest(
3048    const base::WeakPtr<SpdyStreamRequest>& pending_request) {
3049  // Abort if the request has already been cancelled.
3050  if (!pending_request)
3051    return;
3052
3053  base::WeakPtr<SpdyStream> stream;
3054  int rv = TryCreateStream(pending_request, &stream);
3055
3056  if (rv == OK) {
3057    DCHECK(stream);
3058    pending_request->OnRequestCompleteSuccess(stream);
3059    return;
3060  }
3061  DCHECK(!stream);
3062
3063  if (rv != ERR_IO_PENDING) {
3064    pending_request->OnRequestCompleteFailure(rv);
3065  }
3066}
3067
3068SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3069  if (!is_secure_)
3070    return NULL;
3071  SSLClientSocket* ssl_socket =
3072      reinterpret_cast<SSLClientSocket*>(connection_->socket());
3073  DCHECK(ssl_socket);
3074  return ssl_socket;
3075}
3076
3077void SpdySession::OnWriteBufferConsumed(
3078    size_t frame_payload_size,
3079    size_t consume_size,
3080    SpdyBuffer::ConsumeSource consume_source) {
3081  // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3082  // deleted (e.g., a stream is closed due to incoming data).
3083
3084  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3085
3086  if (consume_source == SpdyBuffer::DISCARD) {
3087    // If we're discarding a frame or part of it, increase the send
3088    // window by the number of discarded bytes. (Although if we're
3089    // discarding part of a frame, it's probably because of a write
3090    // error and we'll be tearing down the session soon.)
3091    size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3092    DCHECK_GT(remaining_payload_bytes, 0u);
3093    IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3094  }
3095  // For consumed bytes, the send window is increased when we receive
3096  // a WINDOW_UPDATE frame.
3097}
3098
3099void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3100  // We can be called with |in_io_loop_| set if a SpdyBuffer is
3101  // deleted (e.g., a stream is closed due to incoming data).
3102
3103  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3104  DCHECK_GE(delta_window_size, 1);
3105
3106  // Check for overflow.
3107  int32 max_delta_window_size = kint32max - session_send_window_size_;
3108  if (delta_window_size > max_delta_window_size) {
3109    RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3110    DoDrainSession(
3111        ERR_SPDY_PROTOCOL_ERROR,
3112        "Received WINDOW_UPDATE [delta: " +
3113            base::IntToString(delta_window_size) +
3114            "] for session overflows session_send_window_size_ [current: " +
3115            base::IntToString(session_send_window_size_) + "]");
3116    return;
3117  }
3118
3119  session_send_window_size_ += delta_window_size;
3120
3121  net_log_.AddEvent(
3122      NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3123      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3124                 delta_window_size, session_send_window_size_));
3125
3126  DCHECK(!IsSendStalled());
3127  ResumeSendStalledStreams();
3128}
3129
3130void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3131  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3132
3133  // We only call this method when sending a frame. Therefore,
3134  // |delta_window_size| should be within the valid frame size range.
3135  DCHECK_GE(delta_window_size, 1);
3136  DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3137
3138  // |send_window_size_| should have been at least |delta_window_size| for
3139  // this call to happen.
3140  DCHECK_GE(session_send_window_size_, delta_window_size);
3141
3142  session_send_window_size_ -= delta_window_size;
3143
3144  net_log_.AddEvent(
3145      NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3146      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3147                 -delta_window_size, session_send_window_size_));
3148}
3149
3150void SpdySession::OnReadBufferConsumed(
3151    size_t consume_size,
3152    SpdyBuffer::ConsumeSource consume_source) {
3153  // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3154  // deleted (e.g., discarded by a SpdyReadQueue).
3155
3156  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3157  DCHECK_GE(consume_size, 1u);
3158  DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3159
3160  IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3161}
3162
3163void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3164  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3165  DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3166  DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3167  DCHECK_GE(delta_window_size, 1);
3168  // Check for overflow.
3169  DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3170
3171  session_recv_window_size_ += delta_window_size;
3172  net_log_.AddEvent(
3173      NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3174      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3175                 delta_window_size, session_recv_window_size_));
3176
3177  session_unacked_recv_window_bytes_ += delta_window_size;
3178  if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3179    SendWindowUpdateFrame(kSessionFlowControlStreamId,
3180                          session_unacked_recv_window_bytes_,
3181                          HIGHEST);
3182    session_unacked_recv_window_bytes_ = 0;
3183  }
3184}
3185
3186void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3187  CHECK(in_io_loop_);
3188  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3189  DCHECK_GE(delta_window_size, 1);
3190
3191  // Since we never decrease the initial receive window size,
3192  // |delta_window_size| should never cause |recv_window_size_| to go
3193  // negative. If we do, the receive window isn't being respected.
3194  if (delta_window_size > session_recv_window_size_) {
3195    RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3196    DoDrainSession(
3197        ERR_SPDY_FLOW_CONTROL_ERROR,
3198        "delta_window_size is " + base::IntToString(delta_window_size) +
3199            " in DecreaseRecvWindowSize, which is larger than the receive " +
3200            "window size of " + base::IntToString(session_recv_window_size_));
3201    return;
3202  }
3203
3204  session_recv_window_size_ -= delta_window_size;
3205  net_log_.AddEvent(
3206      NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3207      base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3208                 -delta_window_size, session_recv_window_size_));
3209}
3210
3211void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3212  DCHECK(stream.send_stalled_by_flow_control());
3213  RequestPriority priority = stream.priority();
3214  CHECK_GE(priority, MINIMUM_PRIORITY);
3215  CHECK_LE(priority, MAXIMUM_PRIORITY);
3216  stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3217}
3218
3219void SpdySession::ResumeSendStalledStreams() {
3220  DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3221
3222  // We don't have to worry about new streams being queued, since
3223  // doing so would cause IsSendStalled() to return true. But we do
3224  // have to worry about streams being closed, as well as ourselves
3225  // being closed.
3226
3227  while (!IsSendStalled()) {
3228    size_t old_size = 0;
3229#if DCHECK_IS_ON
3230    old_size = GetTotalSize(stream_send_unstall_queue_);
3231#endif
3232
3233    SpdyStreamId stream_id = PopStreamToPossiblyResume();
3234    if (stream_id == 0)
3235      break;
3236    ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3237    // The stream may actually still be send-stalled after this (due
3238    // to its own send window) but that's okay -- it'll then be
3239    // resumed once its send window increases.
3240    if (it != active_streams_.end())
3241      it->second.stream->PossiblyResumeIfSendStalled();
3242
3243    // The size should decrease unless we got send-stalled again.
3244    if (!IsSendStalled())
3245      DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3246  }
3247}
3248
3249SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3250  for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3251    std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3252    if (!queue->empty()) {
3253      SpdyStreamId stream_id = queue->front();
3254      queue->pop_front();
3255      return stream_id;
3256    }
3257  }
3258  return 0;
3259}
3260
3261}  // namespace net
3262