1// Copyright (c) 2009 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/flip/flip_session.h"
6
7#include "base/basictypes.h"
8#include "base/logging.h"
9#include "base/message_loop.h"
10#include "base/rand_util.h"
11#include "base/stats_counters.h"
12#include "base/stl_util-inl.h"
13#include "base/string_util.h"
14#include "net/base/connection_type_histograms.h"
15#include "net/base/load_flags.h"
16#include "net/base/load_log.h"
17#include "net/base/net_util.h"
18#include "net/flip/flip_frame_builder.h"
19#include "net/flip/flip_protocol.h"
20#include "net/flip/flip_stream.h"
21#include "net/http/http_network_session.h"
22#include "net/http/http_request_info.h"
23#include "net/http/http_response_headers.h"
24#include "net/http/http_response_info.h"
25#include "net/socket/client_socket.h"
26#include "net/socket/client_socket_factory.h"
27#include "net/socket/ssl_client_socket.h"
28#include "net/tools/dump_cache/url_to_filename_encoder.h"
29
30namespace {
31
32// Diagnostics function to dump the headers of a request.
33// TODO(mbelshe): Remove this function.
34void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) {
35  // Because this function gets called on every request,
36  // take extra care to optimize it away if logging is turned off.
37  if (logging::LOG_INFO < logging::GetMinLogLevel())
38    return;
39
40  flip::FlipHeaderBlock::const_iterator it = headers.begin();
41  while (it != headers.end()) {
42    std::string val = (*it).second;
43    std::string::size_type pos = 0;
44    while ((pos = val.find('\0', pos)) != val.npos)
45      val[pos] = '\n';
46    LOG(INFO) << (*it).first << "==" << val;
47    ++it;
48  }
49}
50
51}  // namespace
52
53namespace net {
54
55namespace {
56
57#ifdef WIN32
58// We use an artificially small buffer size on windows because the async IO
59// system will artifiially delay IO completions when we use large buffers.
60const int kReadBufferSize = 2 * 1024;
61#else
62const int kReadBufferSize = 8 * 1024;
63#endif
64
65// Convert a FlipHeaderBlock into an HttpResponseInfo.
66// |headers| input parameter with the FlipHeaderBlock.
67// |info| output parameter for the HttpResponseInfo.
68// Returns true if successfully converted.  False if there was a failure
69// or if the FlipHeaderBlock was invalid.
70bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers,
71                               HttpResponseInfo* response) {
72  std::string version;
73  std::string status;
74
75  // The "status" and "version" headers are required.
76  flip::FlipHeaderBlock::const_iterator it;
77  it = headers.find("status");
78  if (it == headers.end()) {
79    LOG(ERROR) << "FlipHeaderBlock without status header.";
80    return false;
81  }
82  status = it->second;
83
84  // Grab the version.  If not provided by the server,
85  it = headers.find("version");
86  if (it == headers.end()) {
87    LOG(ERROR) << "FlipHeaderBlock without version header.";
88    return false;
89  }
90  version = it->second;
91
92  std::string raw_headers(version);
93  raw_headers.push_back(' ');
94  raw_headers.append(status);
95  raw_headers.push_back('\0');
96  for (it = headers.begin(); it != headers.end(); ++it) {
97    // For each value, if the server sends a NUL-separated
98    // list of values, we separate that back out into
99    // individual headers for each value in the list.
100    // e.g.
101    //    Set-Cookie "foo\0bar"
102    // becomes
103    //    Set-Cookie: foo\0
104    //    Set-Cookie: bar\0
105    std::string value = it->second;
106    size_t start = 0;
107    size_t end = 0;
108    do {
109      end = value.find('\0', start);
110      std::string tval;
111      if (end != value.npos)
112        tval = value.substr(start, (end - start));
113      else
114        tval = value.substr(start);
115      raw_headers.append(it->first);
116      raw_headers.push_back(':');
117      raw_headers.append(tval);
118      raw_headers.push_back('\0');
119      start = end + 1;
120    } while (end != value.npos);
121  }
122
123  response->headers = new HttpResponseHeaders(raw_headers);
124  response->was_fetched_via_spdy = true;
125  return true;
126}
127
128// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
129// a HttpRequestInfo block.
130void CreateFlipHeadersFromHttpRequest(
131    const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) {
132  static const char kHttpProtocolVersion[] = "HTTP/1.1";
133
134  HttpUtil::HeadersIterator it(info.extra_headers.begin(),
135                               info.extra_headers.end(),
136                               "\r\n");
137  while (it.GetNext()) {
138    std::string name = StringToLowerASCII(it.name());
139    if (headers->find(name) == headers->end()) {
140      (*headers)[name] = it.values();
141    } else {
142      std::string new_value = (*headers)[name];
143      new_value += "\0";
144      new_value += it.values();
145      (*headers)[name] = new_value;
146    }
147  }
148
149  // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc)
150  // TODO(mbelshe): Add authentication headers here.
151
152  (*headers)["method"] = info.method;
153  (*headers)["url"] = info.url.spec();
154  (*headers)["version"] = kHttpProtocolVersion;
155  if (info.user_agent.length())
156    (*headers)["user-agent"] = info.user_agent;
157  if (!info.referrer.is_empty())
158    (*headers)["referer"] = info.referrer.spec();
159
160  // Honor load flags that impact proxy caches.
161  if (info.load_flags & LOAD_BYPASS_CACHE) {
162    (*headers)["pragma"] = "no-cache";
163    (*headers)["cache-control"] = "no-cache";
164  } else if (info.load_flags & LOAD_VALIDATE_CACHE) {
165    (*headers)["cache-control"] = "max-age=0";
166  }
167}
168
169void AdjustSocketBufferSizes(ClientSocket* socket) {
170  // Adjust socket buffer sizes.
171  // FLIP uses one socket, and we want a really big buffer.
172  // This greatly helps on links with packet loss - we can even
173  // outperform Vista's dynamic window sizing algorithm.
174  // TODO(mbelshe): more study.
175  const int kSocketBufferSize = 512 * 1024;
176  socket->SetReceiveBufferSize(kSocketBufferSize);
177  socket->SetSendBufferSize(kSocketBufferSize);
178}
179
180}  // namespace
181
182// static
183bool FlipSession::use_ssl_ = true;
184
185FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session)
186    : ALLOW_THIS_IN_INITIALIZER_LIST(
187          connect_callback_(this, &FlipSession::OnTCPConnect)),
188      ALLOW_THIS_IN_INITIALIZER_LIST(
189          ssl_connect_callback_(this, &FlipSession::OnSSLConnect)),
190      ALLOW_THIS_IN_INITIALIZER_LIST(
191          read_callback_(this, &FlipSession::OnReadComplete)),
192      ALLOW_THIS_IN_INITIALIZER_LIST(
193          write_callback_(this, &FlipSession::OnWriteComplete)),
194      domain_(host),
195      session_(session),
196      connection_(new ClientSocketHandle),
197      read_buffer_(new IOBuffer(kReadBufferSize)),
198      read_pending_(false),
199      stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
200      write_pending_(false),
201      delayed_write_pending_(false),
202      is_secure_(false),
203      error_(OK),
204      state_(IDLE),
205      streams_initiated_count_(0),
206      streams_pushed_count_(0),
207      streams_pushed_and_claimed_count_(0),
208      streams_abandoned_count_(0) {
209  // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
210
211  flip_framer_.set_visitor(this);
212
213  session_->ssl_config_service()->GetSSLConfig(&ssl_config_);
214
215  // TODO(agl): This is a temporary hack for testing reasons. In the medium
216  // term we'll want to use NPN for all HTTPS connections and use the protocol
217  // suggested.
218  //
219  // In the event that the server supports Next Protocol Negotiation, but
220  // doesn't support either of these protocols, we'll request the first
221  // protocol in the list. Because of that, HTTP is listed first because it's
222  // what we'll actually fallback to in the case that the server doesn't
223  // support SPDY.
224  ssl_config_.next_protos = "\007http1.1\004spdy";
225}
226
227FlipSession::~FlipSession() {
228  // Cleanup all the streams.
229  CloseAllStreams(net::ERR_ABORTED);
230
231  if (connection_->is_initialized()) {
232    // With Flip we can't recycle sockets.
233    connection_->socket()->Disconnect();
234  }
235
236  // TODO(willchan): Don't hardcode port 80 here.
237  DCHECK(!session_->flip_session_pool()->HasSession(
238      HostResolver::RequestInfo(domain_, 80)));
239
240  // Record per-session histograms here.
241  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
242      streams_initiated_count_,
243      0, 300, 50);
244  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
245      streams_pushed_count_,
246      0, 300, 50);
247  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
248      streams_pushed_and_claimed_count_,
249      0, 300, 50);
250  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
251      streams_abandoned_count_,
252      0, 300, 50);
253}
254
255void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) {
256  static StatsCounter flip_sessions("flip.sessions");
257  flip_sessions.Increment();
258
259  AdjustSocketBufferSizes(connection->socket());
260
261  state_ = CONNECTED;
262  connection_.reset(connection);
263
264  // This is a newly initialized session that no client should have a handle to
265  // yet, so there's no need to start writing data as in OnTCPConnect(), but we
266  // should start reading data.
267  ReadSocket();
268}
269
270net::Error FlipSession::Connect(const std::string& group_name,
271                                const HostResolver::RequestInfo& host,
272                                RequestPriority priority,
273                                LoadLog* load_log) {
274  DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST);
275
276  // If the connect process is started, let the caller continue.
277  if (state_ > IDLE)
278    return net::OK;
279
280  state_ = CONNECTING;
281
282  static StatsCounter flip_sessions("flip.sessions");
283  flip_sessions.Increment();
284
285  int rv = connection_->Init(group_name, host, priority, &connect_callback_,
286                            session_->tcp_socket_pool(), load_log);
287  DCHECK(rv <= 0);
288
289  // If the connect is pending, we still return ok.  The APIs enqueue
290  // work until after the connect completes asynchronously later.
291  if (rv == net::ERR_IO_PENDING)
292    return net::OK;
293  return static_cast<net::Error>(rv);
294}
295
296scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
297    const HttpRequestInfo& request,
298    const UploadDataStream* upload_data,
299    LoadLog* log) {
300  const GURL& url = request.url;
301  const std::string& path = url.PathForRequest();
302
303  scoped_refptr<FlipStream> stream;
304
305  // Check if we have a push stream for this path.
306  if (request.method == "GET") {
307    stream = GetPushStream(path);
308    if (stream) {
309      DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
310      streams_pushed_and_claimed_count_++;
311      return stream;
312    }
313  }
314
315  // Check if we have a pending push stream for this url.
316  PendingStreamMap::iterator it;
317  it = pending_streams_.find(path);
318  if (it != pending_streams_.end()) {
319    DCHECK(!it->second);
320    // Server will assign a stream id when the push stream arrives.  Use 0 for
321    // now.
322    LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM);
323    FlipStream* stream = new FlipStream(this, 0, true, log);
324    stream->set_path(path);
325    it->second = stream;
326    return it->second;
327  }
328
329  const flip::FlipStreamId stream_id = GetNewStreamId();
330
331  // If we still don't have a stream, activate one now.
332  stream = new FlipStream(this, stream_id, false, log);
333  stream->set_priority(request.priority);
334  stream->set_path(path);
335  ActivateStream(stream);
336
337  UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
338      static_cast<int>(request.priority), 0, 10, 11);
339
340  LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url;
341
342  // TODO(mbelshe): Optimize memory allocations
343  DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST &&
344         request.priority <= FLIP_PRIORITY_LOWEST);
345
346  // Convert from HttpRequestHeaders to Flip Headers.
347  flip::FlipHeaderBlock headers;
348  CreateFlipHeadersFromHttpRequest(request, &headers);
349
350  flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE;
351  if (!request.upload_data || !upload_data->size())
352    flags = flip::CONTROL_FLAG_FIN;
353
354  // Create a SYN_STREAM packet and add to the output queue.
355  scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame(
356      flip_framer_.CreateSynStream(stream_id, request.priority, flags, false,
357                                   &headers));
358  int length = flip::FlipFrame::size() + syn_frame->length();
359  IOBuffer* buffer = new IOBuffer(length);
360  memcpy(buffer->data(), syn_frame->data(), length);
361  queue_.push(FlipIOBuffer(buffer, length, request.priority, stream));
362
363  static StatsCounter flip_requests("flip.requests");
364  flip_requests.Increment();
365
366  LOG(INFO) << "FETCHING: " << request.url.spec();
367  streams_initiated_count_++;
368
369  LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------";
370  DumpFlipHeaders(headers);
371
372  // Schedule to write to the socket after we've made it back
373  // to the message loop so that we can aggregate multiple
374  // requests.
375  // TODO(mbelshe): Should we do the "first" request immediately?
376  //                maybe we should only 'do later' for subsequent
377  //                requests.
378  WriteSocketLater();
379
380  return stream;
381}
382
383int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
384                                 net::IOBuffer* data, int len) {
385  LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len
386            << " bytes)";
387  const int kMss = 1430;  // This is somewhat arbitrary and not really fixed,
388                          // but it will always work reasonably with ethernet.
389  // Chop the world into 2-packet chunks.  This is somewhat arbitrary, but
390  // is reasonably small and ensures that we elicit ACKs quickly from TCP
391  // (because TCP tries to only ACK every other packet).
392  const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size();
393
394  // Find our stream
395  DCHECK(IsStreamActive(stream_id));
396  scoped_refptr<FlipStream> stream = active_streams_[stream_id];
397  CHECK(stream->stream_id() == stream_id);
398  if (!stream)
399    return ERR_INVALID_FLIP_STREAM;
400
401  // TODO(mbelshe):  Setting of the FIN is assuming that the caller will pass
402  //                 all data to write in a single chunk.  Is this always true?
403
404  // Set the flags on the upload.
405  flip::FlipDataFlags flags = flip::DATA_FLAG_FIN;
406  if (len > kMaxFlipFrameChunkSize) {
407    len = kMaxFlipFrameChunkSize;
408    flags = flip::DATA_FLAG_NONE;
409  }
410
411  // TODO(mbelshe): reduce memory copies here.
412  scoped_ptr<flip::FlipDataFrame> frame(
413      flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
414  int length = flip::FlipFrame::size() + frame->length();
415  IOBufferWithSize* buffer = new IOBufferWithSize(length);
416  memcpy(buffer->data(), frame->data(), length);
417  queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream));
418
419  // Whenever we queue onto the socket we need to ensure that we will write to
420  // it later.
421  WriteSocketLater();
422
423  return ERR_IO_PENDING;
424}
425
426bool FlipSession::CancelStream(flip::FlipStreamId stream_id) {
427  LOG(INFO) << "Cancelling stream " << stream_id;
428  if (!IsStreamActive(stream_id))
429    return false;
430
431  // TODO(mbelshe): We should send a FIN_STREAM control frame here
432  //                so that the server can cancel a large send.
433
434  // TODO(mbelshe): Write a method for tearing down a stream
435  //                that cleans it out of the active list, the pending list,
436  //                etc.
437  scoped_refptr<FlipStream> stream = active_streams_[stream_id];
438  DeactivateStream(stream_id);
439  return true;
440}
441
442bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const {
443  return ContainsKey(active_streams_, stream_id);
444}
445
446LoadState FlipSession::GetLoadState() const {
447  // NOTE: The application only queries the LoadState via the
448  //       FlipNetworkTransaction, and details are only needed when
449  //       we're in the process of connecting.
450
451  // If we're connecting, defer to the connection to give us the actual
452  // LoadState.
453  if (state_ == CONNECTING)
454    return connection_->GetLoadState();
455
456  // Just report that we're idle since the session could be doing
457  // many things concurrently.
458  return LOAD_STATE_IDLE;
459}
460
461void FlipSession::OnTCPConnect(int result) {
462  LOG(INFO) << "Flip socket connected (result=" << result << ")";
463
464  // We shouldn't be coming through this path if we didn't just open a fresh
465  // socket (or have an error trying to do so).
466  DCHECK(!connection_->socket() || !connection_->is_reused());
467
468  UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0);
469
470  if (result != net::OK) {
471    DCHECK_LT(result, 0);
472    CloseSessionOnError(static_cast<net::Error>(result));
473    return;
474  }
475
476  AdjustSocketBufferSizes(connection_->socket());
477
478  if (use_ssl_) {
479    // Add a SSL socket on top of our existing transport socket.
480    ClientSocket* socket = connection_->release_socket();
481    // TODO(mbelshe): Fix the hostname.  This is BROKEN without having
482    //                a real hostname.
483    socket = session_->socket_factory()->CreateSSLClientSocket(
484        socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_);
485    connection_->set_socket(socket);
486    is_secure_ = true;
487    // TODO(willchan): Plumb LoadLog into FLIP code.
488    int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL);
489    if (status != ERR_IO_PENDING)
490      OnSSLConnect(status);
491  } else {
492    DCHECK_EQ(state_, CONNECTING);
493    state_ = CONNECTED;
494
495    // Make sure we get any pending data sent.
496    WriteSocketLater();
497    // Start reading
498    ReadSocket();
499  }
500}
501
502void FlipSession::OnSSLConnect(int result) {
503  // TODO(mbelshe): We need to replicate the functionality of
504  //   HttpNetworkTransaction::DoSSLConnectComplete here, where it calls
505  //   HandleCertificateError() and such.
506  if (IsCertificateError(result))
507    result = OK;   // TODO(mbelshe): pretend we're happy anyway.
508
509  if (result == OK) {
510    DCHECK_EQ(state_, CONNECTING);
511    state_ = CONNECTED;
512
513    // After we've connected, send any data to the server, and then issue
514    // our read.
515    WriteSocketLater();
516    ReadSocket();
517  } else {
518    DCHECK_LT(result, 0);  // It should be an error, not a byte count.
519    CloseSessionOnError(static_cast<net::Error>(result));
520  }
521}
522
523void FlipSession::OnReadComplete(int bytes_read) {
524  // Parse a frame.  For now this code requires that the frame fit into our
525  // buffer (32KB).
526  // TODO(mbelshe): support arbitrarily large frames!
527
528  LOG(INFO) << "Flip socket read: " << bytes_read << " bytes";
529
530  read_pending_ = false;
531
532  if (bytes_read <= 0) {
533    // Session is tearing down.
534    net::Error error = static_cast<net::Error>(bytes_read);
535    if (error == OK)
536      error = ERR_CONNECTION_CLOSED;
537    CloseSessionOnError(error);
538    return;
539  }
540
541  // The FlipFramer will use callbacks onto |this| as it parses frames.
542  // When errors occur, those callbacks can lead to teardown of all references
543  // to |this|, so maintain a reference to self during this call for safe
544  // cleanup.
545  scoped_refptr<FlipSession> self(this);
546
547  char *data = read_buffer_->data();
548  while (bytes_read &&
549         flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) {
550    uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read);
551    bytes_read -= bytes_processed;
552    data += bytes_processed;
553    if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE)
554      flip_framer_.Reset();
555  }
556
557  if (state_ != CLOSED)
558    ReadSocket();
559}
560
561void FlipSession::OnWriteComplete(int result) {
562  DCHECK(write_pending_);
563  DCHECK(in_flight_write_.size());
564  DCHECK(result != 0);  // This shouldn't happen for write.
565
566  write_pending_ = false;
567
568  LOG(INFO) << "Flip write complete (result=" << result << ") for stream: "
569            << in_flight_write_.stream()->stream_id();
570
571  if (result >= 0) {
572    // It should not be possible to have written more bytes than our
573    // in_flight_write_.
574    DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
575
576    in_flight_write_.buffer()->DidConsume(result);
577
578    // We only notify the stream when we've fully written the pending frame.
579    if (!in_flight_write_.buffer()->BytesRemaining()) {
580      scoped_refptr<FlipStream> stream = in_flight_write_.stream();
581      DCHECK(stream.get());
582
583      // Report the number of bytes written to the caller, but exclude the
584      // frame size overhead.  NOTE:  if this frame was compressed the reported
585      // bytes written is the compressed size, not the original size.
586      if (result > 0) {
587        result = in_flight_write_.buffer()->size();
588        DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size()));
589        result -= static_cast<int>(flip::FlipFrame::size());
590      }
591
592      // It is possible that the stream was cancelled while we were writing
593      // to the socket.
594      if (!stream->cancelled())
595        stream->OnWriteComplete(result);
596
597      // Cleanup the write which just completed.
598      in_flight_write_.release();
599    }
600
601    // Write more data.  We're already in a continuation, so we can
602    // go ahead and write it immediately (without going back to the
603    // message loop).
604    WriteSocketLater();
605  } else {
606    in_flight_write_.release();
607
608    // The stream is now errored.  Close it down.
609    CloseSessionOnError(static_cast<net::Error>(result));
610  }
611}
612
613void FlipSession::ReadSocket() {
614  if (read_pending_)
615    return;
616
617  if (state_ == CLOSED) {
618    NOTREACHED();
619    return;
620  }
621
622  CHECK(connection_.get());
623  CHECK(connection_->socket());
624  int bytes_read = connection_->socket()->Read(read_buffer_.get(),
625                                               kReadBufferSize,
626                                               &read_callback_);
627  switch (bytes_read) {
628    case 0:
629      // Socket is closed!
630      // TODO(mbelshe): Need to abort any active streams here.
631      DCHECK(!active_streams_.size());
632      return;
633    case net::ERR_IO_PENDING:
634      // Waiting for data.  Nothing to do now.
635      read_pending_ = true;
636      return;
637    default:
638      // Data was read, process it.
639      // Schedule the work through the message loop to avoid recursive
640      // callbacks.
641      read_pending_ = true;
642      MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
643          this, &FlipSession::OnReadComplete, bytes_read));
644      break;
645  }
646}
647
648void FlipSession::WriteSocketLater() {
649  if (delayed_write_pending_)
650    return;
651
652  delayed_write_pending_ = true;
653  MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
654      this, &FlipSession::WriteSocket));
655}
656
657void FlipSession::WriteSocket() {
658  // This function should only be called via WriteSocketLater.
659  DCHECK(delayed_write_pending_);
660  delayed_write_pending_ = false;
661
662  // If the socket isn't connected yet, just wait; we'll get called
663  // again when the socket connection completes.  If the socket is
664  // closed, just return.
665  if (state_ < CONNECTED || state_ == CLOSED)
666    return;
667
668  if (write_pending_)   // Another write is in progress still.
669    return;
670
671  // Loop sending frames until we've sent everything or until the write
672  // returns error (or ERR_IO_PENDING).
673  while (in_flight_write_.buffer() || queue_.size()) {
674    if (!in_flight_write_.buffer()) {
675      // Grab the next FlipFrame to send.
676      FlipIOBuffer next_buffer = queue_.top();
677      queue_.pop();
678
679      // We've deferred compression until just before we write it to the socket,
680      // which is now.  At this time, we don't compress our data frames.
681      flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
682      size_t size;
683      if (uncompressed_frame.is_control_frame()) {
684        scoped_ptr<flip::FlipFrame> compressed_frame(
685            flip_framer_.CompressFrame(&uncompressed_frame));
686        size = compressed_frame->length() + flip::FlipFrame::size();
687
688        DCHECK(size > 0);
689
690        // TODO(mbelshe): We have too much copying of data here.
691        IOBufferWithSize* buffer = new IOBufferWithSize(size);
692        memcpy(buffer->data(), compressed_frame->data(), size);
693
694        // Attempt to send the frame.
695        in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream());
696      } else {
697        size = uncompressed_frame.length() + flip::FlipFrame::size();
698        in_flight_write_ = next_buffer;
699      }
700    } else {
701      DCHECK(in_flight_write_.buffer()->BytesRemaining());
702    }
703
704    write_pending_ = true;
705    int rv = connection_->socket()->Write(in_flight_write_.buffer(),
706        in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
707    if (rv == net::ERR_IO_PENDING)
708      break;
709
710    // We sent the frame successfully.
711    OnWriteComplete(rv);
712
713    // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
714    //                 as in an error state.
715    if (rv < 0)
716      break;
717  }
718}
719
720void FlipSession::CloseAllStreams(net::Error code) {
721  LOG(INFO) << "Closing all FLIP Streams";
722
723  static StatsCounter abandoned_streams("flip.abandoned_streams");
724  static StatsCounter abandoned_push_streams("flip.abandoned_push_streams");
725
726  if (active_streams_.size()) {
727    abandoned_streams.Add(active_streams_.size());
728
729    // Create a copy of the list, since aborting streams can invalidate
730    // our list.
731    FlipStream** list = new FlipStream*[active_streams_.size()];
732    ActiveStreamMap::const_iterator it;
733    int index = 0;
734    for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
735      list[index++] = it->second;
736
737    // Issue the aborts.
738    for (--index; index >= 0; index--) {
739      LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
740                 << "): " << list[index]->path();
741      list[index]->OnClose(code);
742    }
743
744    // Clear out anything pending.
745    active_streams_.clear();
746
747    delete[] list;
748  }
749
750  if (pushed_streams_.size()) {
751    streams_abandoned_count_ += pushed_streams_.size();
752    abandoned_push_streams.Add(pushed_streams_.size());
753    pushed_streams_.clear();
754  }
755}
756
757int FlipSession::GetNewStreamId() {
758  int id = stream_hi_water_mark_;
759  stream_hi_water_mark_ += 2;
760  if (stream_hi_water_mark_ > 0x7fff)
761    stream_hi_water_mark_ = 1;
762  return id;
763}
764
765void FlipSession::CloseSessionOnError(net::Error err) {
766  DCHECK_LT(err, OK);
767  LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")";
768
769  // Don't close twice.  This can occur because we can have both
770  // a read and a write outstanding, and each can complete with
771  // an error.
772  if (state_ != CLOSED) {
773    state_ = CLOSED;
774    error_ = err;
775    CloseAllStreams(err);
776    session_->flip_session_pool()->Remove(this);
777  }
778}
779
780void FlipSession::ActivateStream(FlipStream* stream) {
781  const flip::FlipStreamId id = stream->stream_id();
782  DCHECK(!IsStreamActive(id));
783
784  active_streams_[id] = stream;
785}
786
787void FlipSession::DeactivateStream(flip::FlipStreamId id) {
788  DCHECK(IsStreamActive(id));
789
790  // Verify it is not on the pushed_streams_ list.
791  ActiveStreamList::iterator it;
792  for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
793    scoped_refptr<FlipStream> curr = *it;
794    if (id == curr->stream_id()) {
795      pushed_streams_.erase(it);
796      break;
797    }
798  }
799
800  active_streams_.erase(id);
801}
802
803scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) {
804  static StatsCounter used_push_streams("flip.claimed_push_streams");
805
806  LOG(INFO) << "Looking for push stream: " << path;
807
808  scoped_refptr<FlipStream> stream;
809
810  // We just walk a linear list here.
811  ActiveStreamList::iterator it;
812  for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
813    stream = *it;
814    if (path == stream->path()) {
815      CHECK(stream->pushed());
816      pushed_streams_.erase(it);
817      used_push_streams.Increment();
818      LOG(INFO) << "Push Stream Claim for: " << path;
819      break;
820    }
821  }
822
823  return stream;
824}
825
826void FlipSession::GetSSLInfo(SSLInfo* ssl_info) {
827  if (is_secure_) {
828    SSLClientSocket* ssl_socket =
829        reinterpret_cast<SSLClientSocket*>(connection_->socket());
830    ssl_socket->GetSSLInfo(ssl_info);
831  }
832}
833
834void FlipSession::OnError(flip::FlipFramer* framer) {
835  LOG(ERROR) << "FlipSession error: " << framer->error_code();
836  CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR);
837}
838
839void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
840                                    const char* data,
841                                    size_t len) {
842  LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes";
843  bool valid_stream = IsStreamActive(stream_id);
844  if (!valid_stream) {
845    // NOTE:  it may just be that the stream was cancelled.
846    LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
847    return;
848  }
849
850  scoped_refptr<FlipStream> stream = active_streams_[stream_id];
851  bool success = stream->OnDataReceived(data, len);
852  // |len| == 0 implies a closed stream.
853  if (!success || !len)
854    DeactivateStream(stream_id);
855}
856
857void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
858                        const flip::FlipHeaderBlock* headers) {
859  flip::FlipStreamId stream_id = frame->stream_id();
860
861  // Server-initiated streams should have even sequence numbers.
862  if ((stream_id & 0x1) != 0) {
863    LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
864    return;
865  }
866
867  if (IsStreamActive(stream_id)) {
868    LOG(ERROR) << "Received OnSyn for active stream " << stream_id;
869    return;
870  }
871
872  streams_pushed_count_++;
873
874  LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id;
875
876  LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------";
877  DumpFlipHeaders(*headers);
878
879  // TODO(mbelshe): DCHECK that this is a GET method?
880
881  const std::string& path = ContainsKey(*headers, "path") ?
882      headers->find("path")->second : "";
883
884  // Verify that the response had a URL for us.
885  DCHECK(!path.empty());
886  if (path.empty()) {
887    LOG(WARNING) << "Pushed stream did not contain a path.";
888    return;
889  }
890
891  scoped_refptr<FlipStream> stream;
892
893  // Check if we already have a delegate awaiting this stream.
894  PendingStreamMap::iterator it;
895  it = pending_streams_.find(path);
896  if (it != pending_streams_.end()) {
897    stream = it->second;
898    pending_streams_.erase(it);
899    if (stream)
900      pushed_streams_.push_back(stream);
901  } else {
902    pushed_streams_.push_back(stream);
903  }
904
905  if (stream) {
906    CHECK(stream->pushed());
907    CHECK(stream->stream_id() == 0);
908    stream->set_stream_id(stream_id);
909  } else {
910    // TODO(mbelshe): can we figure out how to use a LoadLog here?
911    stream = new FlipStream(this, stream_id, true, NULL);
912  }
913
914  // Activate a stream and parse the headers.
915  ActivateStream(stream);
916
917  stream->set_path(path);
918
919  // TODO(mbelshe): For now we convert from our nice hash map back
920  // to a string of headers; this is because the HttpResponseInfo
921  // is a bit rigid for its http (non-flip) design.
922  HttpResponseInfo response;
923  if (FlipHeadersToHttpResponse(*headers, &response)) {
924    GetSSLInfo(&response.ssl_info);
925    stream->OnResponseReceived(response);
926  } else {
927    stream->OnClose(ERR_INVALID_RESPONSE);
928    DeactivateStream(stream_id);
929    return;
930  }
931
932  LOG(INFO) << "Got pushed stream for " << stream->path();
933
934  static StatsCounter push_requests("flip.pushed_streams");
935  push_requests.Increment();
936}
937
938void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
939                             const flip::FlipHeaderBlock* headers) {
940  DCHECK(headers);
941  flip::FlipStreamId stream_id = frame->stream_id();
942  bool valid_stream = IsStreamActive(stream_id);
943  if (!valid_stream) {
944    // NOTE:  it may just be that the stream was cancelled.
945    LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
946    return;
947  }
948
949  LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id;
950  DumpFlipHeaders(*headers);
951
952  // We record content declared as being pushed so that we don't
953  // request a duplicate stream which is already scheduled to be
954  // sent to us.
955  flip::FlipHeaderBlock::const_iterator it;
956  it = headers->find("X-Associated-Content");
957  if (it != headers->end()) {
958    const std::string& content = it->second;
959    std::string::size_type start = 0;
960    std::string::size_type end = 0;
961    do {
962      end = content.find("||", start);
963      if (end == std::string::npos)
964        end = content.length();
965      std::string url = content.substr(start, end - start);
966      std::string::size_type pos = url.find("??");
967      if (pos == std::string::npos)
968        break;
969      url = url.substr(pos + 2);
970      GURL gurl(url);
971      std::string path = gurl.PathForRequest();
972      if (path.length())
973        pending_streams_[path] = NULL;
974      else
975        LOG(INFO) << "Invalid X-Associated-Content path: " << url;
976      start = end + 2;
977    } while (start < content.length());
978  }
979
980  scoped_refptr<FlipStream> stream = active_streams_[stream_id];
981  CHECK(stream->stream_id() == stream_id);
982  CHECK(!stream->cancelled());
983  HttpResponseInfo response;
984  if (FlipHeadersToHttpResponse(*headers, &response)) {
985    GetSSLInfo(&response.ssl_info);
986    stream->OnResponseReceived(response);
987  } else {
988    stream->OnClose(ERR_INVALID_RESPONSE);
989    DeactivateStream(stream_id);
990  }
991}
992
993void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
994  flip::FlipHeaderBlock headers;
995  uint32 type = frame->type();
996  if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) {
997    if (!flip_framer_.ParseHeaderBlock(frame, &headers)) {
998      LOG(WARNING) << "Could not parse Flip Control Frame Header";
999      // TODO(mbelshe):  Error the session?
1000      return;
1001    }
1002  }
1003
1004  switch (type) {
1005    case flip::SYN_STREAM:
1006      LOG(INFO) << "Flip SynStream for stream " << frame->stream_id();
1007      OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame),
1008            &headers);
1009      break;
1010    case flip::SYN_REPLY:
1011      LOG(INFO) << "Flip SynReply for stream " << frame->stream_id();
1012      OnSynReply(
1013          reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame),
1014          &headers);
1015      break;
1016    case flip::FIN_STREAM:
1017      LOG(INFO) << "Flip Fin for stream " << frame->stream_id();
1018      OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame));
1019      break;
1020    default:
1021      DCHECK(false);  // Error!
1022  }
1023}
1024
1025void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
1026  flip::FlipStreamId stream_id = frame->stream_id();
1027  bool valid_stream = IsStreamActive(stream_id);
1028  if (!valid_stream) {
1029    // NOTE:  it may just be that the stream was cancelled.
1030    LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
1031    return;
1032  }
1033  scoped_refptr<FlipStream> stream = active_streams_[stream_id];
1034  CHECK(stream->stream_id() == stream_id);
1035  CHECK(!stream->cancelled());
1036  if (frame->status() == 0) {
1037    stream->OnDataReceived(NULL, 0);
1038  } else {
1039    LOG(ERROR) << "Flip stream closed: " << frame->status();
1040    // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
1041    //                For now, it doesn't matter much - it is a protocol error.
1042    stream->OnClose(ERR_FAILED);
1043  }
1044
1045  DeactivateStream(stream_id);
1046}
1047
1048}  // namespace net
1049