quic_client_session.cc revision 1e9bf3e0803691d0a228da41fc608347b6db4340
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_client_session.h"
6
7#include "base/callback_helpers.h"
8#include "base/message_loop/message_loop.h"
9#include "base/metrics/histogram.h"
10#include "base/metrics/sparse_histogram.h"
11#include "base/stl_util.h"
12#include "base/strings/string_number_conversions.h"
13#include "base/values.h"
14#include "net/base/io_buffer.h"
15#include "net/base/net_errors.h"
16#include "net/quic/quic_connection_helper.h"
17#include "net/quic/quic_crypto_client_stream_factory.h"
18#include "net/quic/quic_default_packet_writer.h"
19#include "net/quic/quic_stream_factory.h"
20#include "net/ssl/ssl_info.h"
21#include "net/udp/datagram_client_socket.h"
22
23namespace net {
24
25namespace {
26
27// Note: these values must be kept in sync with the corresponding values in:
28// tools/metrics/histograms/histograms.xml
29enum HandshakeState {
30  STATE_STARTED = 0,
31  STATE_ENCRYPTION_ESTABLISHED = 1,
32  STATE_HANDSHAKE_CONFIRMED = 2,
33  STATE_FAILED = 3,
34  NUM_HANDSHAKE_STATES = 4
35};
36
37void RecordHandshakeState(HandshakeState state) {
38  UMA_HISTOGRAM_ENUMERATION("Net.QuicHandshakeState", state,
39                            NUM_HANDSHAKE_STATES);
40}
41
42}  // namespace
43
44QuicClientSession::StreamRequest::StreamRequest() : stream_(NULL) {}
45
46QuicClientSession::StreamRequest::~StreamRequest() {
47  CancelRequest();
48}
49
50int QuicClientSession::StreamRequest::StartRequest(
51    const base::WeakPtr<QuicClientSession>& session,
52    QuicReliableClientStream** stream,
53    const CompletionCallback& callback) {
54  session_ = session;
55  stream_ = stream;
56  int rv = session_->TryCreateStream(this, stream_);
57  if (rv == ERR_IO_PENDING) {
58    callback_ = callback;
59  }
60
61  return rv;
62}
63
64void QuicClientSession::StreamRequest::CancelRequest() {
65  if (session_)
66    session_->CancelRequest(this);
67  session_.reset();
68  callback_.Reset();
69}
70
71void QuicClientSession::StreamRequest::OnRequestCompleteSuccess(
72    QuicReliableClientStream* stream) {
73  session_.reset();
74  *stream_ = stream;
75  ResetAndReturn(&callback_).Run(OK);
76}
77
78void QuicClientSession::StreamRequest::OnRequestCompleteFailure(int rv) {
79  session_.reset();
80  ResetAndReturn(&callback_).Run(rv);
81}
82
83QuicClientSession::QuicClientSession(
84    QuicConnection* connection,
85    scoped_ptr<DatagramClientSocket> socket,
86    scoped_ptr<QuicDefaultPacketWriter> writer,
87    QuicStreamFactory* stream_factory,
88    QuicCryptoClientStreamFactory* crypto_client_stream_factory,
89    const string& server_hostname,
90    const QuicConfig& config,
91    QuicCryptoClientConfig* crypto_config,
92    NetLog* net_log)
93    : QuicSession(connection, config, false),
94      require_confirmation_(false),
95      stream_factory_(stream_factory),
96      socket_(socket.Pass()),
97      writer_(writer.Pass()),
98      read_buffer_(new IOBufferWithSize(kMaxPacketSize)),
99      read_pending_(false),
100      num_total_streams_(0),
101      net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)),
102      logger_(net_log_),
103      num_packets_read_(0),
104      weak_factory_(this) {
105  crypto_stream_.reset(
106      crypto_client_stream_factory ?
107          crypto_client_stream_factory->CreateQuicCryptoClientStream(
108              server_hostname, this, crypto_config) :
109          new QuicCryptoClientStream(server_hostname, this, crypto_config));
110
111  connection->set_debug_visitor(&logger_);
112  // TODO(rch): pass in full host port proxy pair
113  net_log_.BeginEvent(
114      NetLog::TYPE_QUIC_SESSION,
115      NetLog::StringCallback("host", &server_hostname));
116}
117
118QuicClientSession::~QuicClientSession() {
119  // The session must be closed before it is destroyed.
120  DCHECK(streams()->empty());
121  CloseAllStreams(ERR_UNEXPECTED);
122  DCHECK(observers_.empty());
123  CloseAllObservers(ERR_UNEXPECTED);
124
125  connection()->set_debug_visitor(NULL);
126  net_log_.EndEvent(NetLog::TYPE_QUIC_SESSION);
127
128  while (!stream_requests_.empty()) {
129    StreamRequest* request = stream_requests_.front();
130    stream_requests_.pop_front();
131    request->OnRequestCompleteFailure(ERR_ABORTED);
132  }
133
134  if (IsEncryptionEstablished())
135    RecordHandshakeState(STATE_ENCRYPTION_ESTABLISHED);
136  if (IsCryptoHandshakeConfirmed())
137    RecordHandshakeState(STATE_HANDSHAKE_CONFIRMED);
138  else
139    RecordHandshakeState(STATE_FAILED);
140
141  UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellos",
142                       crypto_stream_->num_sent_client_hellos());
143  if (IsCryptoHandshakeConfirmed()) {
144    UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellosCryptoHandshakeConfirmed",
145                         crypto_stream_->num_sent_client_hellos());
146  }
147
148  UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumTotalStreams", num_total_streams_);
149}
150
151bool QuicClientSession::OnStreamFrames(
152    const std::vector<QuicStreamFrame>& frames) {
153  // Record total number of stream frames.
154  UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesInPacket", frames.size());
155
156  // Record number of frames per stream in packet.
157  typedef std::map<QuicStreamId, size_t> FrameCounter;
158  FrameCounter frames_per_stream;
159  for (size_t i = 0; i < frames.size(); ++i) {
160    frames_per_stream[frames[i].stream_id]++;
161  }
162  for (FrameCounter::const_iterator it = frames_per_stream.begin();
163       it != frames_per_stream.end(); ++it) {
164    UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesPerStreamInPacket",
165                         it->second);
166  }
167
168  return QuicSession::OnStreamFrames(frames);
169}
170
171void QuicClientSession::AddObserver(Observer* observer) {
172  DCHECK(!ContainsKey(observers_, observer));
173  observers_.insert(observer);
174}
175
176void QuicClientSession::RemoveObserver(Observer* observer) {
177  DCHECK(ContainsKey(observers_, observer));
178  observers_.erase(observer);
179}
180
181int QuicClientSession::TryCreateStream(StreamRequest* request,
182                                       QuicReliableClientStream** stream) {
183  if (!crypto_stream_->encryption_established()) {
184    DLOG(DFATAL) << "Encryption not established.";
185    return ERR_CONNECTION_CLOSED;
186  }
187
188  if (goaway_received()) {
189    DLOG(INFO) << "Going away.";
190    return ERR_CONNECTION_CLOSED;
191  }
192
193  if (!connection()->connected()) {
194    DLOG(INFO) << "Already closed.";
195    return ERR_CONNECTION_CLOSED;
196  }
197
198  if (GetNumOpenStreams() < get_max_open_streams()) {
199    *stream = CreateOutgoingReliableStreamImpl();
200    return OK;
201  }
202
203  stream_requests_.push_back(request);
204  return ERR_IO_PENDING;
205}
206
207void QuicClientSession::CancelRequest(StreamRequest* request) {
208  // Remove |request| from the queue while preserving the order of the
209  // other elements.
210  StreamRequestQueue::iterator it =
211      std::find(stream_requests_.begin(), stream_requests_.end(), request);
212  if (it != stream_requests_.end()) {
213    it = stream_requests_.erase(it);
214  }
215}
216
217QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() {
218  if (!crypto_stream_->encryption_established()) {
219    DLOG(INFO) << "Encryption not active so no outgoing stream created.";
220    return NULL;
221  }
222  if (GetNumOpenStreams() >= get_max_open_streams()) {
223    DLOG(INFO) << "Failed to create a new outgoing stream. "
224               << "Already " << GetNumOpenStreams() << " open.";
225    return NULL;
226  }
227  if (goaway_received()) {
228    DLOG(INFO) << "Failed to create a new outgoing stream. "
229               << "Already received goaway.";
230    return NULL;
231  }
232
233  return CreateOutgoingReliableStreamImpl();
234}
235
236QuicReliableClientStream*
237QuicClientSession::CreateOutgoingReliableStreamImpl() {
238  DCHECK(connection()->connected());
239  QuicReliableClientStream* stream =
240      new QuicReliableClientStream(GetNextStreamId(), this, net_log_);
241  ActivateStream(stream);
242  ++num_total_streams_;
243  UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumOpenStreams", GetNumOpenStreams());
244  return stream;
245}
246
247QuicCryptoClientStream* QuicClientSession::GetCryptoStream() {
248  return crypto_stream_.get();
249};
250
251bool QuicClientSession::GetSSLInfo(SSLInfo* ssl_info) {
252  DCHECK(crypto_stream_.get());
253  return crypto_stream_->GetSSLInfo(ssl_info);
254}
255
256int QuicClientSession::CryptoConnect(bool require_confirmation,
257                                     const CompletionCallback& callback) {
258  require_confirmation_ = require_confirmation;
259  RecordHandshakeState(STATE_STARTED);
260  if (!crypto_stream_->CryptoConnect()) {
261    // TODO(wtc): change crypto_stream_.CryptoConnect() to return a
262    // QuicErrorCode and map it to a net error code.
263    return ERR_CONNECTION_FAILED;
264  }
265
266  bool can_notify = require_confirmation_ ?
267      IsCryptoHandshakeConfirmed() : IsEncryptionEstablished();
268  if (can_notify) {
269    return OK;
270  }
271
272  callback_ = callback;
273  return ERR_IO_PENDING;
274}
275
276int QuicClientSession::GetNumSentClientHellos() const {
277  return crypto_stream_->num_sent_client_hellos();
278}
279
280ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream(
281    QuicStreamId id) {
282  DLOG(ERROR) << "Server push not supported";
283  return NULL;
284}
285
286void QuicClientSession::CloseStream(QuicStreamId stream_id) {
287  QuicSession::CloseStream(stream_id);
288  OnClosedStream();
289}
290
291void QuicClientSession::SendRstStream(QuicStreamId id,
292                                      QuicRstStreamErrorCode error) {
293  QuicSession::SendRstStream(id, error);
294  OnClosedStream();
295}
296
297void QuicClientSession::OnClosedStream() {
298  if (GetNumOpenStreams() < get_max_open_streams() &&
299      !stream_requests_.empty() &&
300      crypto_stream_->encryption_established() &&
301      !goaway_received() &&
302      connection()->connected()) {
303    StreamRequest* request = stream_requests_.front();
304    stream_requests_.pop_front();
305    request->OnRequestCompleteSuccess(CreateOutgoingReliableStreamImpl());
306  }
307
308  if (GetNumOpenStreams() == 0) {
309    stream_factory_->OnIdleSession(this);
310  }
311}
312
313void QuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
314  if (!callback_.is_null() &&
315      (!require_confirmation_ || event == HANDSHAKE_CONFIRMED)) {
316    // TODO(rtenneti): Currently for all CryptoHandshakeEvent events, callback_
317    // could be called because there are no error events in CryptoHandshakeEvent
318    // enum. If error events are added to CryptoHandshakeEvent, then the
319    // following code needs to changed.
320    base::ResetAndReturn(&callback_).Run(OK);
321  }
322  if (event == HANDSHAKE_CONFIRMED) {
323    ObserverSet::iterator it = observers_.begin();
324    while (it != observers_.end()) {
325      Observer* observer = *it;
326      ++it;
327      observer->OnCryptoHandshakeConfirmed();
328    }
329  }
330  QuicSession::OnCryptoHandshakeEvent(event);
331}
332
333void QuicClientSession::OnCryptoHandshakeMessageSent(
334    const CryptoHandshakeMessage& message) {
335  logger_.OnCryptoHandshakeMessageSent(message);
336}
337
338void QuicClientSession::OnCryptoHandshakeMessageReceived(
339    const CryptoHandshakeMessage& message) {
340  logger_.OnCryptoHandshakeMessageReceived(message);
341}
342
343void QuicClientSession::OnConnectionClosed(QuicErrorCode error,
344                                           bool from_peer) {
345  DCHECK(!connection()->connected());
346  logger_.OnConnectionClosed(error, from_peer);
347  if (from_peer) {
348    UMA_HISTOGRAM_SPARSE_SLOWLY(
349        "Net.QuicSession.ConnectionCloseErrorCodeServer", error);
350  } else {
351    UMA_HISTOGRAM_SPARSE_SLOWLY(
352        "Net.QuicSession.ConnectionCloseErrorCodeClient", error);
353  }
354
355  if (error == QUIC_CONNECTION_TIMED_OUT) {
356    UMA_HISTOGRAM_COUNTS(
357        "Net.QuicSession.ConnectionClose.NumOpenStreams.TimedOut",
358        GetNumOpenStreams());
359    if (!IsCryptoHandshakeConfirmed()) {
360      // If there have been any streams created, they were 0-RTT speculative
361      // requests that have not be serviced.
362      UMA_HISTOGRAM_COUNTS(
363          "Net.QuicSession.ConnectionClose.NumTotalStreams.HandshakeTimedOut",
364          num_total_streams_);
365    }
366  }
367
368  UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.QuicVersion",
369                              connection()->version());
370  NotifyFactoryOfSessionGoingAway();
371  if (!callback_.is_null()) {
372    base::ResetAndReturn(&callback_).Run(ERR_QUIC_PROTOCOL_ERROR);
373  }
374  socket_->Close();
375  QuicSession::OnConnectionClosed(error, from_peer);
376  NotifyFactoryOfSessionClosedLater();
377}
378
379void QuicClientSession::OnSuccessfulVersionNegotiation(
380    const QuicVersion& version) {
381  logger_.OnSuccessfulVersionNegotiation(version);
382  QuicSession::OnSuccessfulVersionNegotiation(version);
383}
384
385void QuicClientSession::StartReading() {
386  if (read_pending_) {
387    return;
388  }
389  read_pending_ = true;
390  int rv = socket_->Read(read_buffer_.get(),
391                         read_buffer_->size(),
392                         base::Bind(&QuicClientSession::OnReadComplete,
393                                    weak_factory_.GetWeakPtr()));
394  if (rv == ERR_IO_PENDING) {
395    num_packets_read_ = 0;
396    return;
397  }
398
399  if (++num_packets_read_ > 32) {
400    num_packets_read_ = 0;
401    // Data was read, process it.
402    // Schedule the work through the message loop to avoid recursive
403    // callbacks.
404    base::MessageLoop::current()->PostTask(
405        FROM_HERE,
406        base::Bind(&QuicClientSession::OnReadComplete,
407                   weak_factory_.GetWeakPtr(), rv));
408  } else {
409    OnReadComplete(rv);
410  }
411}
412
413void QuicClientSession::CloseSessionOnError(int error) {
414  UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.CloseSessionOnError", -error);
415  CloseSessionOnErrorInner(error, QUIC_INTERNAL_ERROR);
416  NotifyFactoryOfSessionClosed();
417}
418
419void QuicClientSession::CloseSessionOnErrorInner(int net_error,
420                                                 QuicErrorCode quic_error) {
421  if (!callback_.is_null()) {
422    base::ResetAndReturn(&callback_).Run(net_error);
423  }
424  CloseAllStreams(net_error);
425  CloseAllObservers(net_error);
426  net_log_.AddEvent(
427      NetLog::TYPE_QUIC_SESSION_CLOSE_ON_ERROR,
428      NetLog::IntegerCallback("net_error", net_error));
429
430  connection()->CloseConnection(quic_error, false);
431  DCHECK(!connection()->connected());
432}
433
434void QuicClientSession::CloseAllStreams(int net_error) {
435  while (!streams()->empty()) {
436    ReliableQuicStream* stream = streams()->begin()->second;
437    QuicStreamId id = stream->id();
438    static_cast<QuicReliableClientStream*>(stream)->OnError(net_error);
439    CloseStream(id);
440  }
441}
442
443void QuicClientSession::CloseAllObservers(int net_error) {
444  while (!observers_.empty()) {
445    Observer* observer = *observers_.begin();
446    observers_.erase(observer);
447    observer->OnSessionClosed(net_error);
448  }
449}
450
451base::Value* QuicClientSession::GetInfoAsValue(const HostPortPair& pair) const {
452  base::DictionaryValue* dict = new base::DictionaryValue();
453  dict->SetString("host_port_pair", pair.ToString());
454  dict->SetString("version", QuicVersionToString(connection()->version()));
455  dict->SetInteger("open_streams", GetNumOpenStreams());
456  dict->SetInteger("total_streams", num_total_streams_);
457  dict->SetString("peer_address", peer_address().ToString());
458  dict->SetString("guid", base::Uint64ToString(guid()));
459  dict->SetBoolean("connected", connection()->connected());
460  return dict;
461}
462
463base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() {
464  return weak_factory_.GetWeakPtr();
465}
466
467void QuicClientSession::OnReadComplete(int result) {
468  read_pending_ = false;
469  if (result == 0)
470    result = ERR_CONNECTION_CLOSED;
471
472  if (result < 0) {
473    DLOG(INFO) << "Closing session on read error: " << result;
474    UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result);
475    NotifyFactoryOfSessionGoingAway();
476    CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR);
477    NotifyFactoryOfSessionClosedLater();
478    return;
479  }
480
481  scoped_refptr<IOBufferWithSize> buffer(read_buffer_);
482  read_buffer_ = new IOBufferWithSize(kMaxPacketSize);
483  QuicEncryptedPacket packet(buffer->data(), result);
484  IPEndPoint local_address;
485  IPEndPoint peer_address;
486  socket_->GetLocalAddress(&local_address);
487  socket_->GetPeerAddress(&peer_address);
488  // ProcessUdpPacket might result in |this| being deleted, so we
489  // use a weak pointer to be safe.
490  connection()->ProcessUdpPacket(local_address, peer_address, packet);
491  if (!connection()->connected()) {
492    stream_factory_->OnSessionClosed(this);
493    return;
494  }
495  StartReading();
496}
497
498void QuicClientSession::NotifyFactoryOfSessionGoingAway() {
499  if (stream_factory_)
500    stream_factory_->OnSessionGoingAway(this);
501}
502
503void QuicClientSession::NotifyFactoryOfSessionClosedLater() {
504  DCHECK_EQ(0u, GetNumOpenStreams());
505  DCHECK(!connection()->connected());
506  base::MessageLoop::current()->PostTask(
507      FROM_HERE,
508      base::Bind(&QuicClientSession::NotifyFactoryOfSessionClosed,
509                 weak_factory_.GetWeakPtr()));
510}
511
512void QuicClientSession::NotifyFactoryOfSessionClosed() {
513  DCHECK_EQ(0u, GetNumOpenStreams());
514  // Will delete |this|.
515  if (stream_factory_)
516    stream_factory_->OnSessionClosed(this);
517}
518
519}  // namespace net
520