1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_session.h"
6
7#include "base/basictypes.h"
8#include "base/logging.h"
9#include "base/memory/linked_ptr.h"
10#include "base/message_loop.h"
11#include "base/metrics/field_trial.h"
12#include "base/metrics/stats_counters.h"
13#include "base/stl_util-inl.h"
14#include "base/string_number_conversions.h"
15#include "base/string_util.h"
16#include "base/stringprintf.h"
17#include "base/time.h"
18#include "base/utf_string_conversions.h"
19#include "base/values.h"
20#include "net/base/connection_type_histograms.h"
21#include "net/base/net_log.h"
22#include "net/base/net_util.h"
23#include "net/http/http_network_session.h"
24#include "net/socket/ssl_client_socket.h"
25#include "net/spdy/spdy_frame_builder.h"
26#include "net/spdy/spdy_http_utils.h"
27#include "net/spdy/spdy_protocol.h"
28#include "net/spdy/spdy_session_pool.h"
29#include "net/spdy/spdy_settings_storage.h"
30#include "net/spdy/spdy_stream.h"
31
32namespace net {
33
34NetLogSpdySynParameter::NetLogSpdySynParameter(
35    const linked_ptr<spdy::SpdyHeaderBlock>& headers,
36    spdy::SpdyControlFlags flags,
37    spdy::SpdyStreamId id,
38    spdy::SpdyStreamId associated_stream)
39    : headers_(headers),
40      flags_(flags),
41      id_(id),
42      associated_stream_(associated_stream) {
43}
44
45NetLogSpdySynParameter::~NetLogSpdySynParameter() {
46}
47
48Value* NetLogSpdySynParameter::ToValue() const {
49  DictionaryValue* dict = new DictionaryValue();
50  ListValue* headers_list = new ListValue();
51  for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
52      it != headers_->end(); ++it) {
53    headers_list->Append(new StringValue(base::StringPrintf(
54        "%s: %s", it->first.c_str(), it->second.c_str())));
55  }
56  dict->SetInteger("flags", flags_);
57  dict->Set("headers", headers_list);
58  dict->SetInteger("id", id_);
59  if (associated_stream_)
60    dict->SetInteger("associated_stream", associated_stream_);
61  return dict;
62}
63
64namespace {
65
66const int kReadBufferSize = 8 * 1024;
67
68class NetLogSpdySessionParameter : public NetLog::EventParameters {
69 public:
70  NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
71      : host_pair_(host_pair) {}
72  virtual Value* ToValue() const {
73    DictionaryValue* dict = new DictionaryValue();
74    dict->Set("host", new StringValue(host_pair_.first.ToString()));
75    dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
76    return dict;
77  }
78 private:
79  const HostPortProxyPair host_pair_;
80  DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
81};
82
83class NetLogSpdySettingsParameter : public NetLog::EventParameters {
84 public:
85  explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
86      : settings_(settings) {}
87
88  virtual Value* ToValue() const {
89    DictionaryValue* dict = new DictionaryValue();
90    ListValue* settings = new ListValue();
91    for (spdy::SpdySettings::const_iterator it = settings_.begin();
92         it != settings_.end(); ++it) {
93      settings->Append(new StringValue(
94          base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
95    }
96    dict->Set("settings", settings);
97    return dict;
98  }
99
100 private:
101  ~NetLogSpdySettingsParameter() {}
102  const spdy::SpdySettings settings_;
103
104  DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
105};
106
107class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
108 public:
109  NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
110                                  int delta,
111                                  int window_size)
112      : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
113
114  virtual Value* ToValue() const {
115    DictionaryValue* dict = new DictionaryValue();
116    dict->SetInteger("stream_id", static_cast<int>(stream_id_));
117    dict->SetInteger("delta", delta_);
118    dict->SetInteger("window_size", window_size_);
119    return dict;
120  }
121
122 private:
123  ~NetLogSpdyWindowUpdateParameter() {}
124  const spdy::SpdyStreamId stream_id_;
125  const int delta_;
126  const int window_size_;
127
128  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
129};
130
131class NetLogSpdyDataParameter : public NetLog::EventParameters {
132 public:
133  NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
134                          int size,
135                          spdy::SpdyDataFlags flags)
136      : stream_id_(stream_id), size_(size), flags_(flags) {}
137
138  virtual Value* ToValue() const {
139    DictionaryValue* dict = new DictionaryValue();
140    dict->SetInteger("stream_id", static_cast<int>(stream_id_));
141    dict->SetInteger("size", size_);
142    dict->SetInteger("flags", static_cast<int>(flags_));
143    return dict;
144  }
145
146 private:
147  ~NetLogSpdyDataParameter() {}
148  const spdy::SpdyStreamId stream_id_;
149  const int size_;
150  const spdy::SpdyDataFlags flags_;
151
152  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
153};
154
155class NetLogSpdyRstParameter : public NetLog::EventParameters {
156 public:
157  NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
158      : stream_id_(stream_id), status_(status) {}
159
160  virtual Value* ToValue() const {
161    DictionaryValue* dict = new DictionaryValue();
162    dict->SetInteger("stream_id", static_cast<int>(stream_id_));
163    dict->SetInteger("status", status_);
164    return dict;
165  }
166
167 private:
168  ~NetLogSpdyRstParameter() {}
169  const spdy::SpdyStreamId stream_id_;
170  const int status_;
171
172  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
173};
174
175class NetLogSpdyPingParameter : public NetLog::EventParameters {
176 public:
177  explicit NetLogSpdyPingParameter(uint32 unique_id) : unique_id_(unique_id) {}
178
179  virtual Value* ToValue() const {
180    DictionaryValue* dict = new DictionaryValue();
181    dict->SetInteger("unique_id", unique_id_);
182    return dict;
183  }
184
185 private:
186  ~NetLogSpdyPingParameter() {}
187  const uint32 unique_id_;
188
189  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyPingParameter);
190};
191
192class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
193 public:
194  NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
195                            int active_streams,
196                            int unclaimed_streams)
197      : last_stream_id_(last_stream_id),
198        active_streams_(active_streams),
199        unclaimed_streams_(unclaimed_streams) {}
200
201  virtual Value* ToValue() const {
202    DictionaryValue* dict = new DictionaryValue();
203    dict->SetInteger("last_accepted_stream_id",
204                     static_cast<int>(last_stream_id_));
205    dict->SetInteger("active_streams", active_streams_);
206    dict->SetInteger("unclaimed_streams", unclaimed_streams_);
207    return dict;
208  }
209
210 private:
211  ~NetLogSpdyGoAwayParameter() {}
212  const spdy::SpdyStreamId last_stream_id_;
213  const int active_streams_;
214  const int unclaimed_streams_;
215
216  DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
217};
218
219}  // namespace
220
221// static
222bool SpdySession::use_ssl_ = true;
223
224// static
225bool SpdySession::use_flow_control_ = false;
226
227// static
228size_t SpdySession::max_concurrent_stream_limit_ = 256;
229
230// static
231bool SpdySession::enable_ping_based_connection_checking_ = true;
232
233// static
234int SpdySession::connection_at_risk_of_loss_ms_ = 0;
235
236// static
237int SpdySession::trailing_ping_delay_time_ms_ = 1000;
238
239// static
240int SpdySession::hung_interval_ms_ = 10000;
241
242SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
243                         SpdySessionPool* spdy_session_pool,
244                         SpdySettingsStorage* spdy_settings,
245                         NetLog* net_log)
246    : ALLOW_THIS_IN_INITIALIZER_LIST(
247          read_callback_(this, &SpdySession::OnReadComplete)),
248      ALLOW_THIS_IN_INITIALIZER_LIST(
249          write_callback_(this, &SpdySession::OnWriteComplete)),
250      ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
251      host_port_proxy_pair_(host_port_proxy_pair),
252      spdy_session_pool_(spdy_session_pool),
253      spdy_settings_(spdy_settings),
254      connection_(new ClientSocketHandle),
255      read_buffer_(new IOBuffer(kReadBufferSize)),
256      read_pending_(false),
257      stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
258      write_pending_(false),
259      delayed_write_pending_(false),
260      is_secure_(false),
261      certificate_error_code_(OK),
262      error_(OK),
263      state_(IDLE),
264      max_concurrent_streams_(kDefaultMaxConcurrentStreams),
265      streams_initiated_count_(0),
266      streams_pushed_count_(0),
267      streams_pushed_and_claimed_count_(0),
268      streams_abandoned_count_(0),
269      frames_received_(0),
270      bytes_received_(0),
271      sent_settings_(false),
272      received_settings_(false),
273      stalled_streams_(0),
274      pings_in_flight_(0),
275      next_ping_id_(1),
276      received_data_time_(base::TimeTicks::Now()),
277      trailing_ping_pending_(false),
278      check_ping_status_pending_(false),
279      need_to_send_ping_(false),
280      initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
281      initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
282      net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
283  DCHECK(HttpStreamFactory::spdy_enabled());
284  net_log_.BeginEvent(
285      NetLog::TYPE_SPDY_SESSION,
286      make_scoped_refptr(
287          new NetLogSpdySessionParameter(host_port_proxy_pair_)));
288
289  // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
290
291  spdy_framer_.set_visitor(this);
292
293  SendSettings();
294}
295
296SpdySession::~SpdySession() {
297  if (state_ != CLOSED) {
298    state_ = CLOSED;
299
300    // Cleanup all the streams.
301    CloseAllStreams(net::ERR_ABORTED);
302  }
303
304  if (connection_->is_initialized()) {
305    // With Spdy we can't recycle sockets.
306    connection_->socket()->Disconnect();
307  }
308
309  // Streams should all be gone now.
310  DCHECK_EQ(0u, num_active_streams());
311  DCHECK_EQ(0u, num_unclaimed_pushed_streams());
312
313  DCHECK(pending_callback_map_.empty());
314
315  RecordHistograms();
316
317  net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
318}
319
320net::Error SpdySession::InitializeWithSocket(
321    ClientSocketHandle* connection,
322    bool is_secure,
323    int certificate_error_code) {
324  base::StatsCounter spdy_sessions("spdy.sessions");
325  spdy_sessions.Increment();
326
327  state_ = CONNECTED;
328  connection_.reset(connection);
329  is_secure_ = is_secure;
330  certificate_error_code_ = certificate_error_code;
331
332  // Write out any data that we might have to send, such as the settings frame.
333  WriteSocketLater();
334  net::Error error = ReadSocket();
335  if (error == ERR_IO_PENDING)
336    return OK;
337  return error;
338}
339
340bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
341  if (state_ != CONNECTED)
342    return false;
343
344  SSLInfo ssl_info;
345  bool was_npn_negotiated;
346  if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
347    return true;   // This is not a secure session, so all domains are okay.
348
349  return ssl_info.cert->VerifyNameMatch(domain);
350}
351
352int SpdySession::GetPushStream(
353    const GURL& url,
354    scoped_refptr<SpdyStream>* stream,
355    const BoundNetLog& stream_net_log) {
356  CHECK_NE(state_, CLOSED);
357
358  *stream = NULL;
359
360  // Don't allow access to secure push streams over an unauthenticated, but
361  // encrypted SSL socket.
362  if (is_secure_ && certificate_error_code_ != OK &&
363      (url.SchemeIs("https") || url.SchemeIs("wss"))) {
364    LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
365               << "unauthenticated session.";
366    CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
367    return ERR_SPDY_PROTOCOL_ERROR;
368  }
369
370  *stream = GetActivePushStream(url.spec());
371  if (stream->get()) {
372    DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
373    streams_pushed_and_claimed_count_++;
374    return OK;
375  }
376  return 0;
377}
378
379int SpdySession::CreateStream(
380    const GURL& url,
381    RequestPriority priority,
382    scoped_refptr<SpdyStream>* spdy_stream,
383    const BoundNetLog& stream_net_log,
384    CompletionCallback* callback) {
385  if (!max_concurrent_streams_ ||
386      active_streams_.size() < max_concurrent_streams_) {
387    return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
388  }
389
390  stalled_streams_++;
391  net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
392  create_stream_queues_[priority].push(
393      PendingCreateStream(url, priority, spdy_stream,
394                          stream_net_log, callback));
395  return ERR_IO_PENDING;
396}
397
398void SpdySession::ProcessPendingCreateStreams() {
399  while (!max_concurrent_streams_ ||
400         active_streams_.size() < max_concurrent_streams_) {
401    bool no_pending_create_streams = true;
402    for (int i = 0;i < NUM_PRIORITIES;++i) {
403      if (!create_stream_queues_[i].empty()) {
404        PendingCreateStream pending_create = create_stream_queues_[i].front();
405        create_stream_queues_[i].pop();
406        no_pending_create_streams = false;
407        int error = CreateStreamImpl(*pending_create.url,
408                                     pending_create.priority,
409                                     pending_create.spdy_stream,
410                                     *pending_create.stream_net_log);
411        scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
412        DCHECK(!ContainsKey(pending_callback_map_, stream));
413        pending_callback_map_[stream] =
414            CallbackResultPair(pending_create.callback, error);
415        MessageLoop::current()->PostTask(
416            FROM_HERE,
417            method_factory_.NewRunnableMethod(
418                &SpdySession::InvokeUserStreamCreationCallback, stream));
419        break;
420      }
421    }
422    if (no_pending_create_streams)
423      return;  // there were no streams in any queue
424  }
425}
426
427void SpdySession::CancelPendingCreateStreams(
428    const scoped_refptr<SpdyStream>* spdy_stream) {
429  PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
430  if (it != pending_callback_map_.end()) {
431    pending_callback_map_.erase(it);
432    return;
433  }
434
435  for (int i = 0;i < NUM_PRIORITIES;++i) {
436    PendingCreateStreamQueue tmp;
437    // Make a copy removing this trans
438    while (!create_stream_queues_[i].empty()) {
439      PendingCreateStream pending_create = create_stream_queues_[i].front();
440      create_stream_queues_[i].pop();
441      if (pending_create.spdy_stream != spdy_stream)
442        tmp.push(pending_create);
443    }
444    // Now copy it back
445    while (!tmp.empty()) {
446      create_stream_queues_[i].push(tmp.front());
447      tmp.pop();
448    }
449  }
450}
451
452int SpdySession::CreateStreamImpl(
453    const GURL& url,
454    RequestPriority priority,
455    scoped_refptr<SpdyStream>* spdy_stream,
456    const BoundNetLog& stream_net_log) {
457  // Make sure that we don't try to send https/wss over an unauthenticated, but
458  // encrypted SSL socket.
459  if (is_secure_ && certificate_error_code_ != OK &&
460      (url.SchemeIs("https") || url.SchemeIs("wss"))) {
461    LOG(ERROR) << "Tried to create spdy stream for secure content over an "
462               << "unauthenticated session.";
463    CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
464    return ERR_SPDY_PROTOCOL_ERROR;
465  }
466
467  const std::string& path = url.PathForRequest();
468
469  const spdy::SpdyStreamId stream_id = GetNewStreamId();
470
471  *spdy_stream = new SpdyStream(this,
472                                stream_id,
473                                false,
474                                stream_net_log);
475  const scoped_refptr<SpdyStream>& stream = *spdy_stream;
476
477  stream->set_priority(priority);
478  stream->set_path(path);
479  stream->set_send_window_size(initial_send_window_size_);
480  stream->set_recv_window_size(initial_recv_window_size_);
481  ActivateStream(stream);
482
483  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
484      static_cast<int>(priority), 0, 10, 11);
485
486  // TODO(mbelshe): Optimize memory allocations
487  DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
488
489  DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
490  return OK;
491}
492
493int SpdySession::WriteSynStream(
494    spdy::SpdyStreamId stream_id,
495    RequestPriority priority,
496    spdy::SpdyControlFlags flags,
497    const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
498  // Find our stream
499  if (!IsStreamActive(stream_id))
500    return ERR_INVALID_SPDY_STREAM;
501  const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
502  CHECK_EQ(stream->stream_id(), stream_id);
503
504  SendPrefacePingIfNoneInFlight();
505
506  scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
507      spdy_framer_.CreateSynStream(
508          stream_id, 0,
509          ConvertRequestPriorityToSpdyPriority(priority),
510          flags, false, headers.get()));
511  QueueFrame(syn_frame.get(), priority, stream);
512
513  base::StatsCounter spdy_requests("spdy.requests");
514  spdy_requests.Increment();
515  streams_initiated_count_++;
516
517  if (net_log().IsLoggingAllEvents()) {
518    net_log().AddEvent(
519        NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
520        make_scoped_refptr(
521            new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
522  }
523
524  // Some servers don't like too many pings, so we limit our current sending to
525  // no more than one ping for any syn sent.  To do this, we avoid ever setting
526  // this to true unless we send a syn (which we have just done).  This approach
527  // may change over time as servers change their responses to pings.
528  need_to_send_ping_ = true;
529
530  return ERR_IO_PENDING;
531}
532
533int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
534                                 net::IOBuffer* data, int len,
535                                 spdy::SpdyDataFlags flags) {
536  // Find our stream
537  DCHECK(IsStreamActive(stream_id));
538  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
539  CHECK_EQ(stream->stream_id(), stream_id);
540  if (!stream)
541    return ERR_INVALID_SPDY_STREAM;
542
543  SendPrefacePingIfNoneInFlight();
544
545  if (len > kMaxSpdyFrameChunkSize) {
546    len = kMaxSpdyFrameChunkSize;
547    flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
548  }
549
550  // Obey send window size of the stream if flow control is enabled.
551  if (use_flow_control_) {
552    if (stream->send_window_size() <= 0) {
553      // Because we queue frames onto the session, it is possible that
554      // a stream was not flow controlled at the time it attempted the
555      // write, but when we go to fulfill the write, it is now flow
556      // controlled.  This is why we need the session to mark the stream
557      // as stalled - because only the session knows for sure when the
558      // stall occurs.
559      stream->set_stalled_by_flow_control(true);
560      net_log().AddEvent(
561          NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
562          make_scoped_refptr(
563              new NetLogIntegerParameter("stream_id", stream_id)));
564      return ERR_IO_PENDING;
565    }
566    int new_len = std::min(len, stream->send_window_size());
567    if (new_len < len) {
568      len = new_len;
569      flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
570    }
571    stream->DecreaseSendWindowSize(len);
572  }
573
574  if (net_log().IsLoggingAllEvents()) {
575    net_log().AddEvent(
576        NetLog::TYPE_SPDY_SESSION_SEND_DATA,
577        make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
578  }
579
580  // TODO(mbelshe): reduce memory copies here.
581  scoped_ptr<spdy::SpdyDataFrame> frame(
582      spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
583  QueueFrame(frame.get(), stream->priority(), stream);
584  return ERR_IO_PENDING;
585}
586
587void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
588  // TODO(mbelshe): We should send a RST_STREAM control frame here
589  //                so that the server can cancel a large send.
590
591  DeleteStream(stream_id, status);
592}
593
594void SpdySession::ResetStream(
595    spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
596
597  net_log().AddEvent(
598      NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
599      make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
600
601  scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
602      spdy_framer_.CreateRstStream(stream_id, status));
603
604  // Default to lowest priority unless we know otherwise.
605  int priority = 3;
606  if(IsStreamActive(stream_id)) {
607    scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
608    priority = stream->priority();
609  }
610  QueueFrame(rst_frame.get(), priority, NULL);
611  DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
612}
613
614bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
615  return ContainsKey(active_streams_, stream_id);
616}
617
618LoadState SpdySession::GetLoadState() const {
619  // NOTE: The application only queries the LoadState via the
620  //       SpdyNetworkTransaction, and details are only needed when
621  //       we're in the process of connecting.
622
623  // If we're connecting, defer to the connection to give us the actual
624  // LoadState.
625  if (state_ == CONNECTING)
626    return connection_->GetLoadState();
627
628  // Just report that we're idle since the session could be doing
629  // many things concurrently.
630  return LOAD_STATE_IDLE;
631}
632
633void SpdySession::OnReadComplete(int bytes_read) {
634  // Parse a frame.  For now this code requires that the frame fit into our
635  // buffer (32KB).
636  // TODO(mbelshe): support arbitrarily large frames!
637
638  read_pending_ = false;
639
640  if (bytes_read <= 0) {
641    // Session is tearing down.
642    net::Error error = static_cast<net::Error>(bytes_read);
643    if (bytes_read == 0)
644      error = ERR_CONNECTION_CLOSED;
645    CloseSessionOnError(error, true);
646    return;
647  }
648
649  bytes_received_ += bytes_read;
650
651  received_data_time_ = base::TimeTicks::Now();
652
653  // The SpdyFramer will use callbacks onto |this| as it parses frames.
654  // When errors occur, those callbacks can lead to teardown of all references
655  // to |this|, so maintain a reference to self during this call for safe
656  // cleanup.
657  scoped_refptr<SpdySession> self(this);
658
659  char *data = read_buffer_->data();
660  while (bytes_read &&
661         spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
662    uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
663    bytes_read -= bytes_processed;
664    data += bytes_processed;
665    if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
666      spdy_framer_.Reset();
667  }
668
669  if (state_ != CLOSED)
670    ReadSocket();
671}
672
673void SpdySession::OnWriteComplete(int result) {
674  DCHECK(write_pending_);
675  DCHECK(in_flight_write_.size());
676
677  write_pending_ = false;
678
679  scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
680
681  if (result >= 0) {
682    // It should not be possible to have written more bytes than our
683    // in_flight_write_.
684    DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
685
686    in_flight_write_.buffer()->DidConsume(result);
687
688    // We only notify the stream when we've fully written the pending frame.
689    if (!in_flight_write_.buffer()->BytesRemaining()) {
690      if (stream) {
691        // Report the number of bytes written to the caller, but exclude the
692        // frame size overhead.  NOTE: if this frame was compressed the
693        // reported bytes written is the compressed size, not the original
694        // size.
695        if (result > 0) {
696          result = in_flight_write_.buffer()->size();
697          DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
698          result -= static_cast<int>(spdy::SpdyFrame::size());
699        }
700
701        // It is possible that the stream was cancelled while we were writing
702        // to the socket.
703        if (!stream->cancelled())
704          stream->OnWriteComplete(result);
705      }
706
707      // Cleanup the write which just completed.
708      in_flight_write_.release();
709    }
710
711    // Write more data.  We're already in a continuation, so we can
712    // go ahead and write it immediately (without going back to the
713    // message loop).
714    WriteSocketLater();
715  } else {
716    in_flight_write_.release();
717
718    // The stream is now errored.  Close it down.
719    CloseSessionOnError(static_cast<net::Error>(result), true);
720  }
721}
722
723net::Error SpdySession::ReadSocket() {
724  if (read_pending_)
725    return OK;
726
727  if (state_ == CLOSED) {
728    NOTREACHED();
729    return ERR_UNEXPECTED;
730  }
731
732  CHECK(connection_.get());
733  CHECK(connection_->socket());
734  int bytes_read = connection_->socket()->Read(read_buffer_.get(),
735                                               kReadBufferSize,
736                                               &read_callback_);
737  switch (bytes_read) {
738    case 0:
739      // Socket is closed!
740      CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
741      return ERR_CONNECTION_CLOSED;
742    case net::ERR_IO_PENDING:
743      // Waiting for data.  Nothing to do now.
744      read_pending_ = true;
745      return ERR_IO_PENDING;
746    default:
747      // Data was read, process it.
748      // Schedule the work through the message loop to avoid recursive
749      // callbacks.
750      read_pending_ = true;
751      MessageLoop::current()->PostTask(
752          FROM_HERE,
753          method_factory_.NewRunnableMethod(
754              &SpdySession::OnReadComplete, bytes_read));
755      break;
756  }
757  return OK;
758}
759
760void SpdySession::WriteSocketLater() {
761  if (delayed_write_pending_)
762    return;
763
764  if (state_ < CONNECTED)
765    return;
766
767  delayed_write_pending_ = true;
768  MessageLoop::current()->PostTask(
769      FROM_HERE,
770      method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
771}
772
773void SpdySession::WriteSocket() {
774  // This function should only be called via WriteSocketLater.
775  DCHECK(delayed_write_pending_);
776  delayed_write_pending_ = false;
777
778  // If the socket isn't connected yet, just wait; we'll get called
779  // again when the socket connection completes.  If the socket is
780  // closed, just return.
781  if (state_ < CONNECTED || state_ == CLOSED)
782    return;
783
784  if (write_pending_)   // Another write is in progress still.
785    return;
786
787  // Loop sending frames until we've sent everything or until the write
788  // returns error (or ERR_IO_PENDING).
789  while (in_flight_write_.buffer() || !queue_.empty()) {
790    if (!in_flight_write_.buffer()) {
791      // Grab the next SpdyFrame to send.
792      SpdyIOBuffer next_buffer = queue_.top();
793      queue_.pop();
794
795      // We've deferred compression until just before we write it to the socket,
796      // which is now.  At this time, we don't compress our data frames.
797      spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
798      size_t size;
799      if (spdy_framer_.IsCompressible(uncompressed_frame)) {
800        scoped_ptr<spdy::SpdyFrame> compressed_frame(
801            spdy_framer_.CompressFrame(uncompressed_frame));
802        if (!compressed_frame.get()) {
803          LOG(ERROR) << "SPDY Compression failure";
804          CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
805          return;
806        }
807
808        size = compressed_frame->length() + spdy::SpdyFrame::size();
809
810        DCHECK_GT(size, 0u);
811
812        // TODO(mbelshe): We have too much copying of data here.
813        IOBufferWithSize* buffer = new IOBufferWithSize(size);
814        memcpy(buffer->data(), compressed_frame->data(), size);
815
816        // Attempt to send the frame.
817        in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
818      } else {
819        size = uncompressed_frame.length() + spdy::SpdyFrame::size();
820        in_flight_write_ = next_buffer;
821      }
822    } else {
823      DCHECK(in_flight_write_.buffer()->BytesRemaining());
824    }
825
826    write_pending_ = true;
827    int rv = connection_->socket()->Write(in_flight_write_.buffer(),
828        in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
829    if (rv == net::ERR_IO_PENDING)
830      break;
831
832    // We sent the frame successfully.
833    OnWriteComplete(rv);
834
835    // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
836    //                 as in an error state.
837    if (rv < 0)
838      break;
839  }
840}
841
842void SpdySession::CloseAllStreams(net::Error status) {
843  base::StatsCounter abandoned_streams("spdy.abandoned_streams");
844  base::StatsCounter abandoned_push_streams(
845      "spdy.abandoned_push_streams");
846
847  if (!active_streams_.empty())
848    abandoned_streams.Add(active_streams_.size());
849  if (!unclaimed_pushed_streams_.empty()) {
850    streams_abandoned_count_ += unclaimed_pushed_streams_.size();
851    abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
852    unclaimed_pushed_streams_.clear();
853  }
854
855  for (int i = 0;i < NUM_PRIORITIES;++i) {
856    while (!create_stream_queues_[i].empty()) {
857      PendingCreateStream pending_create = create_stream_queues_[i].front();
858      create_stream_queues_[i].pop();
859      pending_create.callback->Run(ERR_ABORTED);
860    }
861  }
862
863  while (!active_streams_.empty()) {
864    ActiveStreamMap::iterator it = active_streams_.begin();
865    const scoped_refptr<SpdyStream>& stream = it->second;
866    DCHECK(stream);
867    LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
868                 << "): " << stream->path();
869    DeleteStream(stream->stream_id(), status);
870  }
871
872  // We also need to drain the queue.
873  while (queue_.size())
874    queue_.pop();
875}
876
877int SpdySession::GetNewStreamId() {
878  int id = stream_hi_water_mark_;
879  stream_hi_water_mark_ += 2;
880  if (stream_hi_water_mark_ > 0x7fff)
881    stream_hi_water_mark_ = 1;
882  return id;
883}
884
885void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
886                             spdy::SpdyPriority priority,
887                             SpdyStream* stream) {
888  int length = spdy::SpdyFrame::size() + frame->length();
889  IOBuffer* buffer = new IOBuffer(length);
890  memcpy(buffer->data(), frame->data(), length);
891  queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
892
893  WriteSocketLater();
894}
895
896void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
897  // Closing all streams can have a side-effect of dropping the last reference
898  // to |this|.  Hold a reference through this function.
899  scoped_refptr<SpdySession> self(this);
900
901  DCHECK_LT(err, OK);
902  net_log_.AddEvent(
903      NetLog::TYPE_SPDY_SESSION_CLOSE,
904      make_scoped_refptr(new NetLogIntegerParameter("status", err)));
905
906  // Don't close twice.  This can occur because we can have both
907  // a read and a write outstanding, and each can complete with
908  // an error.
909  if (state_ != CLOSED) {
910    state_ = CLOSED;
911    error_ = err;
912    if (remove_from_pool)
913      RemoveFromPool();
914    CloseAllStreams(err);
915  }
916}
917
918Value* SpdySession::GetInfoAsValue() const {
919  DictionaryValue* dict = new DictionaryValue();
920
921  dict->SetInteger("source_id", net_log_.source().id);
922
923  dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
924  dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
925
926  dict->SetInteger("active_streams", active_streams_.size());
927
928  dict->SetInteger("unclaimed_pushed_streams",
929      unclaimed_pushed_streams_.size());
930
931  dict->SetBoolean("is_secure", is_secure_);
932
933  dict->SetInteger("error", error_);
934  dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
935
936  dict->SetInteger("streams_initiated_count", streams_initiated_count_);
937  dict->SetInteger("streams_pushed_count", streams_pushed_count_);
938  dict->SetInteger("streams_pushed_and_claimed_count",
939      streams_pushed_and_claimed_count_);
940  dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
941  dict->SetInteger("frames_received", frames_received_);
942
943  dict->SetBoolean("sent_settings", sent_settings_);
944  dict->SetBoolean("received_settings", received_settings_);
945  return dict;
946}
947
948int SpdySession::GetPeerAddress(AddressList* address) const {
949  if (!connection_->socket())
950    return ERR_SOCKET_NOT_CONNECTED;
951
952  return connection_->socket()->GetPeerAddress(address);
953}
954
955int SpdySession::GetLocalAddress(IPEndPoint* address) const {
956  if (!connection_->socket())
957    return ERR_SOCKET_NOT_CONNECTED;
958
959  return connection_->socket()->GetLocalAddress(address);
960}
961
962void SpdySession::ActivateStream(SpdyStream* stream) {
963  const spdy::SpdyStreamId id = stream->stream_id();
964  DCHECK(!IsStreamActive(id));
965
966  active_streams_[id] = stream;
967}
968
969void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
970  // For push streams, if they are being deleted normally, we leave
971  // the stream in the unclaimed_pushed_streams_ list.  However, if
972  // the stream is errored out, clean it up entirely.
973  if (status != OK) {
974    PushedStreamMap::iterator it;
975    for (it = unclaimed_pushed_streams_.begin();
976         it != unclaimed_pushed_streams_.end(); ++it) {
977      scoped_refptr<SpdyStream> curr = it->second;
978      if (id == curr->stream_id()) {
979        unclaimed_pushed_streams_.erase(it);
980        break;
981      }
982    }
983  }
984
985  // The stream might have been deleted.
986  ActiveStreamMap::iterator it2 = active_streams_.find(id);
987  if (it2 == active_streams_.end())
988    return;
989
990  // If this is an active stream, call the callback.
991  const scoped_refptr<SpdyStream> stream(it2->second);
992  active_streams_.erase(it2);
993  if (stream)
994    stream->OnClose(status);
995  ProcessPendingCreateStreams();
996}
997
998void SpdySession::RemoveFromPool() {
999  if (spdy_session_pool_) {
1000    spdy_session_pool_->Remove(make_scoped_refptr(this));
1001    spdy_session_pool_ = NULL;
1002  }
1003}
1004
1005scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
1006    const std::string& path) {
1007  base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1008
1009  PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
1010  if (it != unclaimed_pushed_streams_.end()) {
1011    net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
1012    scoped_refptr<SpdyStream> stream = it->second;
1013    unclaimed_pushed_streams_.erase(it);
1014    used_push_streams.Increment();
1015    return stream;
1016  }
1017  return NULL;
1018}
1019
1020bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
1021  if (is_secure_) {
1022    SSLClientSocket* ssl_socket =
1023        reinterpret_cast<SSLClientSocket*>(connection_->socket());
1024    ssl_socket->GetSSLInfo(ssl_info);
1025    *was_npn_negotiated = ssl_socket->was_npn_negotiated();
1026    return true;
1027  }
1028  return false;
1029}
1030
1031bool SpdySession::GetSSLCertRequestInfo(
1032    SSLCertRequestInfo* cert_request_info) {
1033  if (is_secure_) {
1034    SSLClientSocket* ssl_socket =
1035        reinterpret_cast<SSLClientSocket*>(connection_->socket());
1036    ssl_socket->GetSSLCertRequestInfo(cert_request_info);
1037    return true;
1038  }
1039  return false;
1040}
1041
1042void SpdySession::OnError(spdy::SpdyFramer* framer) {
1043  CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1044}
1045
1046void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1047                                    const char* data,
1048                                    size_t len) {
1049  if (net_log().IsLoggingAllEvents()) {
1050    net_log().AddEvent(
1051        NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1052        make_scoped_refptr(new NetLogSpdyDataParameter(
1053            stream_id, len, spdy::SpdyDataFlags())));
1054  }
1055
1056  if (!IsStreamActive(stream_id)) {
1057    // NOTE:  it may just be that the stream was cancelled.
1058    LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
1059    return;
1060  }
1061
1062  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1063  stream->OnDataReceived(data, len);
1064}
1065
1066bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
1067                          const scoped_refptr<SpdyStream> stream) {
1068  int rv = OK;
1069  rv = stream->OnResponseReceived(headers);
1070  if (rv < 0) {
1071    DCHECK_NE(rv, ERR_IO_PENDING);
1072    const spdy::SpdyStreamId stream_id = stream->stream_id();
1073    DeleteStream(stream_id, rv);
1074    return false;
1075  }
1076  return true;
1077}
1078
1079void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
1080                        const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1081  spdy::SpdyStreamId stream_id = frame.stream_id();
1082  spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
1083
1084  if (net_log_.IsLoggingAllEvents()) {
1085    net_log_.AddEvent(
1086        NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1087        make_scoped_refptr(new NetLogSpdySynParameter(
1088            headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1089            stream_id, associated_stream_id)));
1090  }
1091
1092  // Server-initiated streams should have even sequence numbers.
1093  if ((stream_id & 0x1) != 0) {
1094    LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1095    return;
1096  }
1097
1098  if (IsStreamActive(stream_id)) {
1099    LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1100    return;
1101  }
1102
1103  if (associated_stream_id == 0) {
1104    LOG(WARNING) << "Received invalid OnSyn associated stream id "
1105                 << associated_stream_id
1106                 << " for stream " << stream_id;
1107    ResetStream(stream_id, spdy::INVALID_STREAM);
1108    return;
1109  }
1110
1111  streams_pushed_count_++;
1112
1113  // TODO(mbelshe): DCHECK that this is a GET method?
1114
1115  // Verify that the response had a URL for us.
1116  const std::string& url = ContainsKey(*headers, "url") ?
1117      headers->find("url")->second : "";
1118  if (url.empty()) {
1119    ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1120    LOG(WARNING) << "Pushed stream did not contain a url.";
1121    return;
1122  }
1123
1124  GURL gurl(url);
1125  if (!gurl.is_valid()) {
1126    ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1127    LOG(WARNING) << "Pushed stream url was invalid: " << url;
1128    return;
1129  }
1130
1131  // Verify we have a valid stream association.
1132  if (!IsStreamActive(associated_stream_id)) {
1133    LOG(WARNING) << "Received OnSyn with inactive associated stream "
1134               << associated_stream_id;
1135    ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
1136    return;
1137  }
1138
1139  scoped_refptr<SpdyStream> associated_stream =
1140      active_streams_[associated_stream_id];
1141  GURL associated_url(associated_stream->GetUrl());
1142  if (associated_url.GetOrigin() != gurl.GetOrigin()) {
1143    LOG(WARNING) << "Rejected Cross Origin Push Stream "
1144                 << associated_stream_id;
1145    ResetStream(stream_id, spdy::REFUSED_STREAM);
1146    return;
1147  }
1148
1149  // There should not be an existing pushed stream with the same path.
1150  PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1151  if (it != unclaimed_pushed_streams_.end()) {
1152    LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
1153    ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1154    return;
1155  }
1156
1157  scoped_refptr<SpdyStream> stream(
1158      new SpdyStream(this, stream_id, true, net_log_));
1159
1160  stream->set_path(gurl.PathForRequest());
1161
1162  unclaimed_pushed_streams_[url] = stream;
1163
1164  ActivateStream(stream);
1165  stream->set_response_received();
1166
1167  // Parse the headers.
1168  if (!Respond(*headers, stream))
1169    return;
1170
1171  base::StatsCounter push_requests("spdy.pushed_streams");
1172  push_requests.Increment();
1173}
1174
1175void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
1176                             const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1177  spdy::SpdyStreamId stream_id = frame.stream_id();
1178
1179  bool valid_stream = IsStreamActive(stream_id);
1180  if (!valid_stream) {
1181    // NOTE:  it may just be that the stream was cancelled.
1182    LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
1183    return;
1184  }
1185
1186  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1187  CHECK_EQ(stream->stream_id(), stream_id);
1188  CHECK(!stream->cancelled());
1189
1190  if (stream->response_received()) {
1191    LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
1192    CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
1193    return;
1194  }
1195  stream->set_response_received();
1196
1197  if (net_log().IsLoggingAllEvents()) {
1198    net_log().AddEvent(
1199        NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1200        make_scoped_refptr(new NetLogSpdySynParameter(
1201            headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1202            stream_id, 0)));
1203  }
1204
1205  Respond(*headers, stream);
1206}
1207
1208void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
1209                            const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1210  spdy::SpdyStreamId stream_id = frame.stream_id();
1211
1212  bool valid_stream = IsStreamActive(stream_id);
1213  if (!valid_stream) {
1214    // NOTE:  it may just be that the stream was cancelled.
1215    LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
1216    return;
1217  }
1218
1219  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1220  CHECK_EQ(stream->stream_id(), stream_id);
1221  CHECK(!stream->cancelled());
1222
1223  if (net_log().IsLoggingAllEvents()) {
1224    net_log().AddEvent(
1225        NetLog::TYPE_SPDY_SESSION_HEADERS,
1226        make_scoped_refptr(new NetLogSpdySynParameter(
1227            headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1228            stream_id, 0)));
1229  }
1230
1231  int rv = stream->OnHeaders(*headers);
1232  if (rv < 0) {
1233    DCHECK_NE(rv, ERR_IO_PENDING);
1234    const spdy::SpdyStreamId stream_id = stream->stream_id();
1235    DeleteStream(stream_id, rv);
1236  }
1237}
1238
1239void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
1240  const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
1241  uint32 type = frame->type();
1242  if (type == spdy::SYN_STREAM ||
1243      type == spdy::SYN_REPLY ||
1244      type == spdy::HEADERS) {
1245    if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
1246      LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
1247      int stream_id = 0;
1248      if (type == spdy::SYN_STREAM) {
1249        stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
1250                     (frame))->stream_id();
1251      } else if (type == spdy::SYN_REPLY) {
1252        stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
1253                     (frame))->stream_id();
1254      } else if (type == spdy::HEADERS) {
1255        stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
1256                     (frame))->stream_id();
1257      }
1258      if(IsStreamActive(stream_id))
1259        ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1260      return;
1261    }
1262  }
1263
1264  frames_received_++;
1265
1266  switch (type) {
1267    case spdy::GOAWAY:
1268      OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
1269      break;
1270    case spdy::PING:
1271      OnPing(*reinterpret_cast<const spdy::SpdyPingControlFrame*>(frame));
1272      break;
1273    case spdy::SETTINGS:
1274      OnSettings(
1275          *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
1276      break;
1277    case spdy::RST_STREAM:
1278      OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
1279      break;
1280    case spdy::SYN_STREAM:
1281      OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
1282            headers);
1283      break;
1284    case spdy::HEADERS:
1285      OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
1286                headers);
1287      break;
1288    case spdy::SYN_REPLY:
1289      OnSynReply(
1290          *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
1291          headers);
1292      break;
1293    case spdy::WINDOW_UPDATE:
1294      OnWindowUpdate(
1295          *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
1296      break;
1297    default:
1298      DCHECK(false);  // Error!
1299  }
1300}
1301
1302bool SpdySession::OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,
1303                                           const char* header_data,
1304                                           size_t len) {
1305  DCHECK(false);
1306  return false;
1307}
1308
1309void SpdySession::OnDataFrameHeader(const spdy::SpdyDataFrame* frame) {
1310  DCHECK(false);
1311}
1312
1313void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
1314  spdy::SpdyStreamId stream_id = frame.stream_id();
1315
1316  net_log().AddEvent(
1317      NetLog::TYPE_SPDY_SESSION_RST_STREAM,
1318      make_scoped_refptr(
1319          new NetLogSpdyRstParameter(stream_id, frame.status())));
1320
1321  bool valid_stream = IsStreamActive(stream_id);
1322  if (!valid_stream) {
1323    // NOTE:  it may just be that the stream was cancelled.
1324    LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1325    return;
1326  }
1327  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1328  CHECK_EQ(stream->stream_id(), stream_id);
1329  CHECK(!stream->cancelled());
1330
1331  if (frame.status() == 0) {
1332    stream->OnDataReceived(NULL, 0);
1333  } else {
1334    LOG(ERROR) << "Spdy stream closed: " << frame.status();
1335    // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1336    //                For now, it doesn't matter much - it is a protocol error.
1337    DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
1338  }
1339}
1340
1341void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
1342  net_log_.AddEvent(
1343      NetLog::TYPE_SPDY_SESSION_GOAWAY,
1344      make_scoped_refptr(
1345          new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
1346                                        active_streams_.size(),
1347                                        unclaimed_pushed_streams_.size())));
1348  RemoveFromPool();
1349  CloseAllStreams(net::ERR_ABORTED);
1350
1351  // TODO(willchan): Cancel any streams that are past the GoAway frame's
1352  // |last_accepted_stream_id|.
1353
1354  // Don't bother killing any streams that are still reading.  They'll either
1355  // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
1356  // closed.
1357}
1358
1359void SpdySession::OnPing(const spdy::SpdyPingControlFrame& frame) {
1360  net_log_.AddEvent(
1361      NetLog::TYPE_SPDY_SESSION_PING,
1362      make_scoped_refptr(new NetLogSpdyPingParameter(frame.unique_id())));
1363
1364  // Send response to a PING from server.
1365  if (frame.unique_id() % 2 == 0) {
1366    WritePingFrame(frame.unique_id());
1367    return;
1368  }
1369
1370  --pings_in_flight_;
1371  if (pings_in_flight_ < 0) {
1372    CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1373    return;
1374  }
1375
1376  if (pings_in_flight_ > 0)
1377    return;
1378
1379  if (!need_to_send_ping_)
1380    return;
1381
1382  PlanToSendTrailingPing();
1383}
1384
1385void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
1386  spdy::SpdySettings settings;
1387  if (spdy_framer_.ParseSettings(&frame, &settings)) {
1388    HandleSettings(settings);
1389    spdy_settings_->Set(host_port_pair(), settings);
1390  }
1391
1392  received_settings_ = true;
1393
1394  net_log_.AddEvent(
1395      NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1396      make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1397}
1398
1399void SpdySession::OnWindowUpdate(
1400    const spdy::SpdyWindowUpdateControlFrame& frame) {
1401  spdy::SpdyStreamId stream_id = frame.stream_id();
1402  if (!IsStreamActive(stream_id)) {
1403    LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
1404    return;
1405  }
1406
1407  int delta_window_size = static_cast<int>(frame.delta_window_size());
1408  if (delta_window_size < 1) {
1409    LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
1410                 << delta_window_size;
1411    ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
1412    return;
1413  }
1414
1415  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1416  CHECK_EQ(stream->stream_id(), stream_id);
1417  CHECK(!stream->cancelled());
1418
1419  if (use_flow_control_)
1420    stream->IncreaseSendWindowSize(delta_window_size);
1421
1422  net_log_.AddEvent(
1423      NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
1424      make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1425          stream_id, delta_window_size, stream->send_window_size())));
1426}
1427
1428void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
1429                                   int delta_window_size) {
1430  DCHECK(IsStreamActive(stream_id));
1431  scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1432  CHECK_EQ(stream->stream_id(), stream_id);
1433
1434  net_log_.AddEvent(
1435      NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
1436      make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1437          stream_id, delta_window_size, stream->recv_window_size())));
1438
1439  scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
1440      spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
1441  QueueFrame(window_update_frame.get(), stream->priority(), stream);
1442}
1443
1444// Given a cwnd that we would have sent to the server, modify it based on the
1445// field trial policy.
1446uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1447  base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1448  if (!trial) {
1449      LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1450      return cwnd;
1451  }
1452  if (trial->group_name() == "cwnd10")
1453    return 10;
1454  else if (trial->group_name() == "cwnd16")
1455    return 16;
1456  else if (trial->group_name() == "cwndMin16")
1457    return std::max(cwnd, 16);
1458  else if (trial->group_name() == "cwndMin10")
1459    return std::max(cwnd, 10);
1460  else if (trial->group_name() == "cwndDynamic")
1461    return cwnd;
1462  NOTREACHED();
1463  return cwnd;
1464}
1465
1466void SpdySession::SendSettings() {
1467  // Note:  we're copying the settings here, so that we can potentially modify
1468  // the settings for the field trial.  When removing the field trial, make
1469  // this a reference to the const SpdySettings again.
1470  spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
1471  if (settings.empty())
1472    return;
1473
1474  // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
1475  for (spdy::SpdySettings::iterator i = settings.begin(),
1476           end = settings.end(); i != end; ++i) {
1477    const uint32 id = i->first.id();
1478    const uint32 val = i->second;
1479    switch (id) {
1480      case spdy::SETTINGS_CURRENT_CWND:
1481        uint32 cwnd = 0;
1482        cwnd = ApplyCwndFieldTrialPolicy(val);
1483        UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
1484                                    cwnd,
1485                                    1, 200, 100);
1486        if (cwnd != val) {
1487          i->second = cwnd;
1488          i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
1489          spdy_settings_->Set(host_port_pair(), settings);
1490        }
1491        break;
1492    }
1493  }
1494
1495  HandleSettings(settings);
1496
1497  net_log_.AddEvent(
1498      NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1499      make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1500
1501  // Create the SETTINGS frame and send it.
1502  scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
1503      spdy_framer_.CreateSettings(settings));
1504  sent_settings_ = true;
1505  QueueFrame(settings_frame.get(), 0, NULL);
1506}
1507
1508void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
1509  for (spdy::SpdySettings::const_iterator i = settings.begin(),
1510           end = settings.end(); i != end; ++i) {
1511    const uint32 id = i->first.id();
1512    const uint32 val = i->second;
1513    switch (id) {
1514      case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
1515        max_concurrent_streams_ = std::min(static_cast<size_t>(val),
1516                                           max_concurrent_stream_limit_);
1517        ProcessPendingCreateStreams();
1518        break;
1519    }
1520  }
1521}
1522
1523void SpdySession::SendPrefacePingIfNoneInFlight() {
1524  if (pings_in_flight_ || trailing_ping_pending_ ||
1525      !enable_ping_based_connection_checking_)
1526    return;
1527
1528  const base::TimeDelta kConnectionAtRiskOfLoss =
1529      base::TimeDelta::FromMilliseconds(connection_at_risk_of_loss_ms_);
1530
1531  base::TimeTicks now = base::TimeTicks::Now();
1532  // If we haven't heard from server, then send a preface-PING.
1533  if ((now - received_data_time_) > kConnectionAtRiskOfLoss)
1534    SendPrefacePing();
1535
1536  PlanToSendTrailingPing();
1537}
1538
1539void SpdySession::SendPrefacePing() {
1540  // TODO(rtenneti): Send preface pings when more servers support additional
1541  // pings.
1542  // WritePingFrame(next_ping_id_);
1543}
1544
1545void SpdySession::PlanToSendTrailingPing() {
1546  if (trailing_ping_pending_)
1547    return;
1548
1549  trailing_ping_pending_ = true;
1550  MessageLoop::current()->PostDelayedTask(
1551      FROM_HERE,
1552      method_factory_.NewRunnableMethod(&SpdySession::SendTrailingPing),
1553      trailing_ping_delay_time_ms_);
1554}
1555
1556void SpdySession::SendTrailingPing() {
1557  DCHECK(trailing_ping_pending_);
1558  trailing_ping_pending_ = false;
1559  WritePingFrame(next_ping_id_);
1560}
1561
1562void SpdySession::WritePingFrame(uint32 unique_id) {
1563  scoped_ptr<spdy::SpdyPingControlFrame> ping_frame(
1564      spdy_framer_.CreatePingFrame(next_ping_id_));
1565  QueueFrame(ping_frame.get(), SPDY_PRIORITY_HIGHEST, NULL);
1566
1567  if (net_log().IsLoggingAllEvents()) {
1568    net_log().AddEvent(
1569        NetLog::TYPE_SPDY_SESSION_PING,
1570        make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_)));
1571  }
1572  if (unique_id % 2 != 0) {
1573    next_ping_id_ += 2;
1574    ++pings_in_flight_;
1575    need_to_send_ping_ = false;
1576    PlanToCheckPingStatus();
1577  }
1578}
1579
1580void SpdySession::PlanToCheckPingStatus() {
1581  if (check_ping_status_pending_)
1582    return;
1583
1584  check_ping_status_pending_ = true;
1585  MessageLoop::current()->PostDelayedTask(
1586      FROM_HERE,
1587      method_factory_.NewRunnableMethod(
1588          &SpdySession::CheckPingStatus, base::TimeTicks::Now()),
1589      hung_interval_ms_);
1590}
1591
1592void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
1593  // Check if we got a response back for all PINGs we had sent.
1594  if (pings_in_flight_ == 0) {
1595    check_ping_status_pending_ = false;
1596    return;
1597  }
1598
1599  DCHECK(check_ping_status_pending_);
1600
1601  const base::TimeDelta kHungInterval =
1602      base::TimeDelta::FromMilliseconds(hung_interval_ms_);
1603
1604  base::TimeTicks now = base::TimeTicks::Now();
1605  base::TimeDelta delay = kHungInterval - (now - received_data_time_);
1606
1607  if (delay.InMilliseconds() < 0 || received_data_time_ < last_check_time) {
1608    DCHECK(now - received_data_time_ > kHungInterval);
1609    CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true);
1610    return;
1611  }
1612
1613  // Check the status of connection after a delay.
1614  MessageLoop::current()->PostDelayedTask(
1615      FROM_HERE,
1616      method_factory_.NewRunnableMethod(&SpdySession::CheckPingStatus, now),
1617      delay.InMilliseconds());
1618}
1619
1620void SpdySession::RecordHistograms() {
1621  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
1622                              streams_initiated_count_,
1623                              0, 300, 50);
1624  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
1625                              streams_pushed_count_,
1626                              0, 300, 50);
1627  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
1628                              streams_pushed_and_claimed_count_,
1629                              0, 300, 50);
1630  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
1631                              streams_abandoned_count_,
1632                              0, 300, 50);
1633  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
1634                            sent_settings_ ? 1 : 0, 2);
1635  UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
1636                            received_settings_ ? 1 : 0, 2);
1637  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
1638                              stalled_streams_,
1639                              0, 300, 50);
1640  UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
1641                            stalled_streams_ > 0 ? 1 : 0, 2);
1642
1643  if (received_settings_) {
1644    // Enumerate the saved settings, and set histograms for it.
1645    const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
1646
1647    spdy::SpdySettings::const_iterator it;
1648    for (it = settings.begin(); it != settings.end(); ++it) {
1649      const spdy::SpdySetting setting = *it;
1650      switch (setting.first.id()) {
1651        case spdy::SETTINGS_CURRENT_CWND:
1652          // Record several different histograms to see if cwnd converges
1653          // for larger volumes of data being sent.
1654          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
1655                                      setting.second,
1656                                      1, 200, 100);
1657          if (bytes_received_ > 10 * 1024) {
1658            UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
1659                                        setting.second,
1660                                        1, 200, 100);
1661            if (bytes_received_ > 25 * 1024) {
1662              UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
1663                                          setting.second,
1664                                          1, 200, 100);
1665              if (bytes_received_ > 50 * 1024) {
1666                UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
1667                                            setting.second,
1668                                            1, 200, 100);
1669                if (bytes_received_ > 100 * 1024) {
1670                  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
1671                                              setting.second,
1672                                              1, 200, 100);
1673                }
1674              }
1675            }
1676          }
1677          break;
1678        case spdy::SETTINGS_ROUND_TRIP_TIME:
1679          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
1680                                      setting.second,
1681                                      1, 1200, 100);
1682          break;
1683        case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
1684          UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
1685                                      setting.second,
1686                                      1, 100, 50);
1687          break;
1688      }
1689    }
1690  }
1691}
1692
1693void SpdySession::InvokeUserStreamCreationCallback(
1694    scoped_refptr<SpdyStream>* stream) {
1695  PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
1696
1697  // Exit if the request has already been cancelled.
1698  if (it == pending_callback_map_.end())
1699    return;
1700
1701  CompletionCallback* callback = it->second.callback;
1702  int result = it->second.result;
1703  pending_callback_map_.erase(it);
1704  callback->Run(result);
1705}
1706
1707}  // namespace net
1708